In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("PySpark") \
    .master("local[*]") \
    .getOrCreate()

spark


# Data Reading & Writing

## 1. Reading a `.csv` file in Pyspark

In [2]:
# Reading a csv file
# Option - 1
df = spark.read.csv(path='data/emp1.csv')

In [3]:
df.show()
df.printSchema()

+---+-----+---+----------+------+
|_c0|  _c1|_c2|       _c3|   _c4|
+---+-----+---+----------+------+
| ID| Name|Age|Department|Salary|
|101| John| 32|        IT| 45000|
|102|David| 21|        HR| 56000|
|103|Julie| 23|        HR| 50000|
|104|Chris| 25|        IT| 40000|
|105| Mick| 27|        HR| 60000|
+---+-----+---+----------+------+

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)



### Making the first row as header

In [4]:
df = spark.read.csv(path='data/emp1.csv',header=True)
df.show()

+---+-----+---+----------+------+
| ID| Name|Age|Department|Salary|
+---+-----+---+----------+------+
|101| John| 32|        IT| 45000|
|102|David| 21|        HR| 56000|
|103|Julie| 23|        HR| 50000|
|104|Chris| 25|        IT| 40000|
|105| Mick| 27|        HR| 60000|
+---+-----+---+----------+------+



In [5]:
# Option - 2
df = spark.read.format('csv').option(key='header',value=True).load('data/emp1.csv')
df.show()

+---+-----+---+----------+------+
| ID| Name|Age|Department|Salary|
+---+-----+---+----------+------+
|101| John| 32|        IT| 45000|
|102|David| 21|        HR| 56000|
|103|Julie| 23|        HR| 50000|
|104|Chris| 25|        IT| 40000|
|105| Mick| 27|        HR| 60000|
+---+-----+---+----------+------+



In [6]:
# Reading multiple `.csv` file
df = spark.read.csv(path=['data/emp1.csv','data/emp2.csv'],header=True)
df.show()

+---+------+---+----------+------+
| ID|  Name|Age|Department|Salary|
+---+------+---+----------+------+
|201| Harsh| 32|   Account| 55000|
|202| Harry| 21|     Sales| 36000|
|203|  Rick| 23|        IT| 60000|
|204|Miller| 25|     Sales| 20000|
|205|  Yash| 27|        HR| 50000|
|101|  John| 32|        IT| 45000|
|102| David| 21|        HR| 56000|
|103| Julie| 23|        HR| 50000|
|104| Chris| 25|        IT| 40000|
|105|  Mick| 27|        HR| 60000|
+---+------+---+----------+------+



#### Reading all csv files from the directory

In [12]:
df = spark.read.csv(path='data/csvs',header=True)
df.show()
df.printSchema()

+---+------+---+----------+------+
| ID|  Name|Age|Department|Salary|
+---+------+---+----------+------+
|201| Harsh| 32|   Account| 55000|
|202| Harry| 21|     Sales| 36000|
|203|  Rick| 23|        IT| 60000|
|204|Miller| 25|     Sales| 20000|
|205|  Yash| 27|        HR| 50000|
|101|  John| 32|        IT| 45000|
|102| David| 21|        HR| 56000|
|103| Julie| 23|        HR| 50000|
|104| Chris| 25|        IT| 40000|
|105|  Mick| 27|        HR| 60000|
+---+------+---+----------+------+

root
 |-- ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: string (nullable = true)



### Making a Custom Schema

In [14]:
from pyspark.sql.types import *

my_sch = StructType() \
    .add(field='ID', data_type=IntegerType()) \
    .add(field='Name', data_type=StringType()) \
    .add(field='Age', data_type=IntegerType()) \
    .add(field='Department', data_type=StringType()) \
    .add(field='Salary', data_type=StringType())

df = spark.read.csv(path='data/csvs', schema=my_sch, header=True)
df.show()
df.printSchema()

+---+------+---+----------+------+
| ID|  Name|Age|Department|Salary|
+---+------+---+----------+------+
|201| Harsh| 32|   Account| 55000|
|202| Harry| 21|     Sales| 36000|
|203|  Rick| 23|        IT| 60000|
|204|Miller| 25|     Sales| 20000|
|205|  Yash| 27|        HR| 50000|
|101|  John| 32|        IT| 45000|
|102| David| 21|        HR| 56000|
|103| Julie| 23|        HR| 50000|
|104| Chris| 25|        IT| 40000|
|105|  Mick| 27|        HR| 60000|
+---+------+---+----------+------+

root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: string (nullable = true)



## 2. Reading Data from `.json` file

In [18]:
# Reading Single line json file
js_df = spark.read.json(path='data/d11.json')
js_df.printSchema()
js_df.show()


root
 |-- active: boolean (nullable = true)
 |-- email: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- roles: array (nullable = true)
 |    |-- element: string (containsNull = true)

+------+--------------------+---+-------------+---------------+
|active|               email| id|         name|          roles|
+------+--------------------+---+-------------+---------------+
|  true|alice.johnson@exa...|  1|Alice Johnson|[admin, editor]|
+------+--------------------+---+-------------+---------------+



In [19]:
# Reading Multiple line json file
js_df = spark.read.json(path='data/csvs/emp_ML.json',multiLine=True)
js_df.printSchema()
js_df.show()

root
 |-- active: boolean (nullable = true)
 |-- email: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- roles: array (nullable = true)
 |    |-- element: string (containsNull = true)

+------+--------------------+---+-------------+----------------+
|active|               email| id|         name|           roles|
+------+--------------------+---+-------------+----------------+
|  true|alice.johnson@exa...|  1|Alice Johnson| [admin, editor]|
| false|bob.smith@example...|  2|    Bob Smith|        [viewer]|
|  true|charlie.lee@examp...|  3|  Charlie Lee|[editor, viewer]|
+------+--------------------+---+-------------+----------------+



In [20]:
# Reading Multiple json Files
js_df = spark.read.json(path=['data/d11.json','data/d22.json'])
js_df.printSchema()
js_df.show()

root
 |-- active: boolean (nullable = true)
 |-- email: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- roles: array (nullable = true)
 |    |-- element: string (containsNull = true)

+------+--------------------+---+-------------+---------------+
|active|               email| id|         name|          roles|
+------+--------------------+---+-------------+---------------+
|  true|alice.johnson@exa...|  1|Alice Johnson|[admin, editor]|
| false|bob.smith@example...|  2|    Bob Smith|       [viewer]|
+------+--------------------+---+-------------+---------------+



In [21]:
js_df = spark.read.json(path='data/')
js_df.printSchema()
js_df.show()

root
 |-- active: boolean (nullable = true)
 |-- email: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- roles: array (nullable = true)
 |    |-- element: string (containsNull = true)

+------+--------------------+---+-------------+---------------+
|active|               email| id|         name|          roles|
+------+--------------------+---+-------------+---------------+
|  true|alice.johnson@exa...|  1|Alice Johnson|[admin, editor]|
| false|bob.smith@example...|  2|    Bob Smith|       [viewer]|
+------+--------------------+---+-------------+---------------+

