# PySpark Read CSV file into DataFrame

## &copy;  [Omkar Mehta](omehta2@illinois.edu) ##
### Industrial and Enterprise Systems Engineering, The Grainger College of Engineering,  UIUC ###

<hr style="border:2px solid blue"> </hr>

PySpark supports reading a CSV file with a pipe, comma, tab, space, or any other delimiter/separator files.

PySpark provides csv("path") on DataFrameReader to read a CSV file into PySpark DataFrame and dataframeObj.write.csv("path") to save or write to the CSV file.

In [0]:
# PySpark Read CSV File into DataFrame
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains

spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate() #.builder().master("local[1]")
df = spark.read.csv("/FileStore/tables/zipcodes.csv")
df.printSchema()

In [0]:
# Using .format().load() way to read csv
df = spark.read.format("csv").load("/FileStore/tables/zipcodes.csv")

#df = spark.read.format("org.apache.spark.sql.csv").load("/FileStore/tables/zipcodes.csv")
df.printSchema()

In [0]:
# Using Header Record For Column Names

df2 = spark.read.option("header",True).csv("/FileStore/tables/zipcodes.csv")

In [0]:
# Read Multiple CSV Files

# df = spark.read.csv("path1,path2,path3")

In [0]:
# Read all CSV Files in a Directory
df = spark.read.csv("/FileStore/tables/")


## Options While Reading CSV File

You can either use chaining option(self, key, value) to use multiple options or use alternate options(self, **options) method.

In [0]:
# delimiter
df3 = spark.read.options(delimiter=',').csv("/FileStore/tables/zipcodes.csv")

In [0]:
# infer schema
df4 = spark.read.options(inferSchema='True', delimiter=',').csv("/FileStore/tables/zipcodes.csv")

In [0]:
# Alternatively, infer schema by
df4 = spark.read.option("inferSchema",True) \
                .option("delimiter",",") \
  .csv("/FileStore/tables/zipcodes.csv")

In [0]:
# header
df5 = spark.read.options(header='True', inferSchema='True', delimiter=',').csv("/FileStore/tables/zipcodes.csv")

### quotes

When you have a column with a delimiter that used to split the columns, use `quotes` option to specify the quote character, by default it is ” and delimiters inside quotes are ignored. but using this option you can set any character.

### nullValues

Using `nullValues` option you can specify the string in a CSV to consider as null. For example, if you want to consider a date column with a value `"1900-01-01"` set null on DataFrame.

### dateFormat

`dateFormat` option to used to set the format of the input DateType and TimestampType columns. Supports all java.text.SimpleDateFormat formats

In [0]:
# Reading CSV files with a user-specified custom schema
schema = StructType() \
      .add("RecordNumber",IntegerType(),True) \
      .add("Zipcode",IntegerType(),True) \
      .add("ZipCodeType",StringType(),True) \
      .add("City",StringType(),True) \
      .add("State",StringType(),True) \
      .add("LocationType",StringType(),True) \
      .add("Lat",DoubleType(),True) \
      .add("Long",DoubleType(),True) \
      .add("Xaxis",IntegerType(),True) \
      .add("Yaxis",DoubleType(),True) \
      .add("Zaxis",DoubleType(),True) \
      .add("WorldRegion",StringType(),True) \
      .add("Country",StringType(),True) \
      .add("LocationText",StringType(),True) \
      .add("Location",StringType(),True) \
      .add("Decommisioned",BooleanType(),True) \
      .add("TaxReturnsFiled",StringType(),True) \
      .add("EstimatedPopulation",IntegerType(),True) \
      .add("TotalWages",IntegerType(),True) \
      .add("Notes",StringType(),True)
      
df_with_schema = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load("/FileStore/tables/zipcodes.csv")

In [0]:
# Applying DataFrame transformations
## Once you have created DataFrame from the CSV file, you can apply all transformation and actions DataFrame support.

# Write PySpark DataFrame to CSV file

# df.write.option("header",True).csv("/tmp/spark_output/zipcodes")

# Options
## Other options available quote,escape,nullValue,dateFormat,quoteMode .


In [0]:
# Saving modes - overwrite, append, ignore, error

# df2.write.mode('overwrite').csv("/tmp/spark_output/zipcodes")
#//you can also use this
# df2.write.format("csv").mode('overwrite').save("/tmp/spark_output/zipcodes")