PySpark provides csv("path") on DataFrameReader to read a CSV file into PySpark DataFrame and dataframeObj.write.csv("path") to save or write to the CSV file. In this tutorial, you will learn how to read a single file, multiple files, all files from a local directory into DataFrame, applying some transformations, and finally writing DataFrame back to CSV file using PySpark example.

In [19]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains

In [32]:
def create_spark_session():
    
    """Create spark session.Returns:
        spark (SparkSession) - spark session connected to AWS EMR
            cluster
    """
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages",
                "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark
    
def process_text_data(spark, input_path,output_path):
    
    """Process the csv file and write to S3.
       Arguments:
        spark (SparkSession) - spark session connected to AWS EMR
            cluster
        input_path (str) - AWS S3 bucket path for source data
        output_path (str) - AWS S3 bucket for writing processed data
    """

    df= spark.read.csv(input_path)
    df.printSchema()
   #If you have a header with column names on your input file, you need to explicitly specify True for header option using <a href="#header">option("header",True)</a> not mentioning this, 
   #the API treats header as a data record. 
    df2 = spark.read.option("header",True) \
     .csv(input_path)
    df2.printSchema() 
    
    
    df3 = spark.read.options(header='True', delimiter=',') \
  .csv(input_path)
    df3.printSchema()
    schema = StructType() \
          .add("text",StringType(),True) \
          .add("text_id",IntegerType(),True) 
    
    df_with_schema = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load(input_path)
    df_with_schema.printSchema()
    
    df2.write.option("header",True) \
 .csv(output_path)
    
def main():
    spark = create_spark_session()
    input_path = './data/stt.csv'
    output_path='../output/data'
    process_audio_data(spark, input_path,output_path)
    
if __name__ == '__main__':
    main()   
    
    

root
 |-- _c0: string (nullable = true)

root
 |--                                              sentence  id: string (nullable = true)

root
 |--                                              sentence  id: string (nullable = true)

root
 |-- text: string (nullable = true)
 |-- text_id: integer (nullable = true)



                                                                                