# Working with CSV Files in Spark DataFrame

1. **Read a CSV file into a Spark DataFrame**  
2. **Read a CSV file with a user-provided schema**  
3. **Read multiple CSV files**  
4. **Read all CSV files from a directory**  
5. **Options while reading a CSV file**  
6. **Write a DataFrame to CSV files**  


In [1]:
# initial setup 
import time
from pyspark.sql import SparkSession

spark = (SparkSession
        .builder
        .master("local[3]")
        .appName("my_spark_app")
        .getOrCreate()
        )
spark

**Read a CSV file into a Spark DataFrame** 

In [2]:
df = spark.read.csv("./source/csv/employee_1.csv")
df.show()

+-----+-----------+---------+---------+---------+--------------------+------------------+--------------------+------------+--------------+------------+-------+--------------------+-----------------+--------------------+----------+-----+--------------------+----------+------------+--------+-----------+-----------------+--------------------+
|  _c0|        _c1|      _c2|      _c3|      _c4|                 _c5|               _c6|                 _c7|         _c8|           _c9|        _c10|   _c11|                _c12|             _c13|                _c14|      _c15| _c16|                _c17|      _c18|        _c19|    _c20|       _c21|             _c22|                _c23|
+-----+-----------+---------+---------+---------+--------------------+------------------+--------------------+------------+--------------+------------+-------+--------------------+-----------------+--------------------+----------+-----+--------------------+----------+------------+--------+-----------+--------------

In [3]:
df = spark.read.format("csv").load("./source/csv/employee_1.csv")
df.show()

+-----+-----------+---------+---------+---------+--------------------+------------------+--------------------+------------+--------------+------------+-------+--------------------+-----------------+--------------------+----------+-----+--------------------+----------+------------+--------+-----------+-----------------+--------------------+
|  _c0|        _c1|      _c2|      _c3|      _c4|                 _c5|               _c6|                 _c7|         _c8|           _c9|        _c10|   _c11|                _c12|             _c13|                _c14|      _c15| _c16|                _c17|      _c18|        _c19|    _c20|       _c21|             _c22|                _c23|
+-----+-----------+---------+---------+---------+--------------------+------------------+--------------------+------------+--------------+------------+-------+--------------------+-----------------+--------------------+----------+-----+--------------------+----------+------------+--------+-----------+--------------

Here, the `spark` is a SparkSession object. `read` is an object of DataFrameReader class and `csv()` is a method in DataFrameReader.  

The above example reads the data into DataFrame column names “_c0” for the first column and “_c1” for the second, and so on. By default, the data type of all these columns would be String.



You can check the schema (column_names and Data_type) with help of `printSchema()` method on dataframe  
if we didn't specify schema then all columns are string type by default

In [4]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)
 |-- _c21: string (nullable = true)
 |-- _c22: string (nullable = true)
 |-- _c23: string (nullable = true)



### Using option `header` as `True`

In [5]:
df = (spark.read
      .format("csv")
      .option("header", True)
      .load("./source/csv/employee_1.csv"))
df.show()

+-----+-----------+---------+---------+---------+--------------------+------------------+--------------------+------------+--------------+------------+-------+--------------------------+-----------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|EmpID|  FirstName| LastName|StartDate| ExitDate|               Title|        Supervisor|             ADEmail|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|   DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|
+-----+-----------+---------+---------+---------+--------------------+------------------+--------------------+------------+--------------+------------+-------+--------------------------+-----------------+--------------------+----------+-----+----------------------+----------+------------+-----

In [7]:
help(spark.read.csv)

Help on method csv in module pyspark.sql.readwriter:

csv(path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None, unescapedQuoteHandling=None) method of pyspark.sql.readwriter.DataFrameReader instance
    Loads a CSV file and returns the result as a  :class:`DataFrame`.
    
    This function will go through the input once to determine the input schema if
    ``inferSchema`` is enabled. To avoid going through the entire data once, di

#### Read Multipal Csv files  

To read multiple CSV files into a PySpark DataFrame, each separated by a comma, you can create a list of file paths and pass it to the spark.read.csv() method.

In [8]:
df = (spark.read
      .format("csv")
      .option("header", True)
      .load(["./source/csv/employee_1.csv", "./source/csv/employee_2.csv"]))
df.count()

1999

#### Read All csv file from folder

To read all CSV files from a directory, specify the directory path as an argument to the csv() method.

In [9]:
df = (spark.read
      .format("csv")
      .option("header", True)
      .load("./source/csv/"))
df.count()

3000

#### We can use wildcard charecter for reading all files

In [10]:
df = (spark.read
      .format("csv")
      .option("header", True)
      .load("./source/csv/employee_*.csv"))
df.count()

3000

### **Reading CSV files with different options**

In [11]:
df = (spark.read
      .format("csv")
      .option("header", True)
      .option("inferSchema", True)
      .load("./source/csv/"))
print(df.count())
print(df.printSchema())

3000
root
 |-- EmpID: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- StartDate: string (nullable = true)
 |-- ExitDate: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Supervisor: string (nullable = true)
 |-- ADEmail: string (nullable = true)
 |-- BusinessUnit: string (nullable = true)
 |-- EmployeeStatus: string (nullable = true)
 |-- EmployeeType: string (nullable = true)
 |-- PayZone: string (nullable = true)
 |-- EmployeeClassificationType: string (nullable = true)
 |-- DepartmentType: string (nullable = true)
 |-- Division: string (nullable = true)
 |-- DOB: string (nullable = true)
 |-- State: string (nullable = true)
 |-- JobFunctionDescription: string (nullable = true)
 |-- GenderCode: string (nullable = true)
 |-- LocationCode: integer (nullable = true)
 |-- RaceDesc: string (nullable = true)
 |-- MaritalDesc: string (nullable = true)
 |-- Performance Score: string (nullable = true)
 |-- Curre

you can chain the option as shown above or you can specify all options at once with `options()` method

In [12]:
df = (spark.read
      .format("csv")
      .options(header =True, inferSchema=True)
      .load("./source/csv/"))
print(df.count())
print(df.printSchema())

3000
root
 |-- EmpID: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- StartDate: string (nullable = true)
 |-- ExitDate: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Supervisor: string (nullable = true)
 |-- ADEmail: string (nullable = true)
 |-- BusinessUnit: string (nullable = true)
 |-- EmployeeStatus: string (nullable = true)
 |-- EmployeeType: string (nullable = true)
 |-- PayZone: string (nullable = true)
 |-- EmployeeClassificationType: string (nullable = true)
 |-- DepartmentType: string (nullable = true)
 |-- Division: string (nullable = true)
 |-- DOB: string (nullable = true)
 |-- State: string (nullable = true)
 |-- JobFunctionDescription: string (nullable = true)
 |-- GenderCode: string (nullable = true)
 |-- LocationCode: integer (nullable = true)
 |-- RaceDesc: string (nullable = true)
 |-- MaritalDesc: string (nullable = true)
 |-- Performance Score: string (nullable = true)
 |-- Curre

**most commonly used options are**  

| Option       | Description                                                                 |
|--------------|-----------------------------------------------------------------------------|
| delimiter    | Specifies the character used to separate fields in the CSV file.           |
| inferSchema  | Automatically infers the data types of columns based on the file's content.|
| header       | Indicates whether the first row of the CSV contains column names.          |
| quotes       | Defines the character used for quoting fields containing special characters.|
| nullValues   | Specifies the string that represents null or missing values in the data.   |
| dateFormat   | Sets the format for parsing date columns in the file.                      |


#### **Read dataframe with custom schema**

In [13]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

schema = StructType([
    StructField("EmpID", IntegerType(), True),
    StructField("FirstName", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("StartDate", StringType(), True),
    StructField("ExitDate", StringType(), True),
    StructField("Title", StringType(), True),
    StructField("Supervisor", StringType(), True),
    StructField("ADEmail", StringType(), True),
    StructField("BusinessUnit", StringType(), True),
    StructField("EmployeeStatus", StringType(), True),
    StructField("EmployeeType", StringType(), True),
    StructField("PayZone", StringType(), True),
    StructField("EmployeeClassificationType", StringType(), True),
    StructField("DepartmentType", StringType(), True),
    StructField("Division", StringType(), True),
    StructField("DOB", StringType(), True),
    StructField("State", StringType(), True),
    StructField("JobFunctionDescription", StringType(), True),
    StructField("GenderCode", StringType(), True),
    StructField("LocationCode", IntegerType(), True),
    StructField("RaceDesc", StringType(), True),
    StructField("MaritalDesc", StringType(), True),
    StructField("Performance_Score", StringType(), True),
    StructField("Current_Employee_Rating", IntegerType(), True)
])


In [14]:
df = (spark.read
      .format("csv")
      .option("header", True)
      .schema(schema)
      .load("./source/csv/"))
print(df.printSchema())

root
 |-- EmpID: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- StartDate: string (nullable = true)
 |-- ExitDate: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Supervisor: string (nullable = true)
 |-- ADEmail: string (nullable = true)
 |-- BusinessUnit: string (nullable = true)
 |-- EmployeeStatus: string (nullable = true)
 |-- EmployeeType: string (nullable = true)
 |-- PayZone: string (nullable = true)
 |-- EmployeeClassificationType: string (nullable = true)
 |-- DepartmentType: string (nullable = true)
 |-- Division: string (nullable = true)
 |-- DOB: string (nullable = true)
 |-- State: string (nullable = true)
 |-- JobFunctionDescription: string (nullable = true)
 |-- GenderCode: string (nullable = true)
 |-- LocationCode: integer (nullable = true)
 |-- RaceDesc: string (nullable = true)
 |-- MaritalDesc: string (nullable = true)
 |-- Performance_Score: string (nullable = true)
 |-- Current_Em

### **Writing pyspark dataframe to csv**

To write a PySpark DataFrame to a CSV file, you can use the write.csv() method provided by the DataFrame API. This method takes a path as an argument, where the CSV file will be saved. Optionally, you can specify additional parameters such as the delimiter, header inclusion, and whether to overwrite existing files. Here’s how you can do it:

In [16]:
(df.write
        .format("csv")
        .mode("append")
        .option("header",True)
        .option("delimiter",'|')
        .save("./Sinck/csv/employee"))
        


You can specify different saving modes while writing PySpark DataFrame to disk. These saving modes specify how to write a file to disk.

`overwrite` – Overwrite the existing file if already exists.

`append` – New rows are appended to the existing rows.

`ignore` – When this option is used, it ignores the writing operation when the file already exists.

`error` – This option returns an error when the file already exists. This is a default option.