<a href="https://colab.research.google.com/github/RISHOBGHOSH/Codes-of-mySQL/blob/main/spark_UI_Z2H.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Joins and Data Partitions")
    .master("local[*]")
    .getOrCreate()
)

spark

In [None]:
# Emp Data & Schema

emp_data = [
    ["001","101","John Doe","30","Male","50000","2015-01-01"],
    ["002","101","Jane Smith","25","Female","45000","2016-02-15"],
    ["003","102","Bob Brown","35","Male","55000","2014-05-01"],
    ["004","102","Alice Lee","28","Female","48000","2017-09-30"],
    ["005","103","Jack Chan","40","Male","60000","2013-04-01"],
    ["006","103","Jill Wong","32","Female","52000","2018-07-01"],
    ["007","101","James Johnson","42","Male","70000","2012-03-15"],
    ["008","102","Kate Kim","29","Female","51000","2019-10-01"],
    ["009","103","Tom Tan","33","Male","58000","2016-06-01"],
    ["010","104","Lisa Lee","27","Female","47000","2018-08-01"],
    ["011","104","David Park","38","Male","65000","2015-11-01"],
    ["012","105","Susan Chen","31","Female","54000","2017-02-15"],
    ["013","106","Brian Kim","45","Male","75000","2011-07-01"],
    ["014","107","Emily Lee","26","Female","46000","2019-01-01"],
    ["015","106","Michael Lee","37","Male","63000","2014-09-30"],
    ["016","107","Kelly Zhang","30","Female","49000","2018-04-01"],
    ["017","105","George Wang","34","Male","57000","2016-03-15"],
    ["018","104","Nancy Liu","29","","50000","2017-06-01"],
    ["019","103","Steven Chen","36","Male","62000","2015-08-01"],
    ["020","102","Grace Kim","32","Female","53000","2018-11-01"]
]

emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, hire_date string"

dept_data = [
    ["101", "Sales", "NYC", "US", "1000000"],
    ["102", "Marketing", "LA", "US", "900000"],
    ["103", "Finance", "London", "UK", "1200000"],
    ["104", "Engineering", "Beijing", "China", "1500000"],
    ["105", "Human Resources", "Tokyo", "Japan", "800000"],
    ["106", "Research and Development", "Perth", "Australia", "1100000"],
    ["107", "Customer Service", "Sydney", "Australia", "950000"]
]

dept_schema = "department_id string, department_name string, city string, country string, budget string"

In [None]:
# Create emp & dept DataFrame

emp = spark.createDataFrame(data=emp_data, schema=emp_schema)
dept = spark.createDataFrame(data=dept_data, schema=dept_schema)

In [None]:
# Show emp dataframe (ACTION)

emp.show()
dept.show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|        011|          104|   David Park| 38|  Male| 65000|2015-11-01|
|     

In [None]:
# Print Schema

emp.printSchema()
dept.printSchema()

root
 |-- employee_id: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- hire_date: string (nullable = true)

root
 |-- department_id: string (nullable = true)
 |-- department_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- budget: string (nullable = true)



In [None]:
# Get number of partitions for emp
emp.rdd.getNumPartitions()

2

In [None]:
# Get number of partitions for dept
dept.rdd.getNumPartitions()

2

In [None]:
# Repartition of data using repartition & coalesce
emp_partitioned = emp.repartition(4)

In [None]:
emp_partitioned.rdd.getNumPartitions()

4

In [None]:
# we can increase the partition
emp_partitioned = emp.repartition(10)
emp_partitioned.rdd.getNumPartitions()

10

⭐ 1. coalesce()
✔ Used to reduce the number of partitions

(e.g., from 20 → 5)

✔ NO shuffle (narrow transformation)

Faster because data doesn’t move across nodes.

✔ Best when:

Reducing partitions

After filtering data (less data left)

Before writing to fewer output files

Improving performance by reducing overhead

❌ Not good for increasing partitions

Spark won’t redistribute data evenly.

⭐ 2. repartition()
✔ Used to increase OR decrease partitions

(e.g., 10 → 50 or 50 → 10)

✔ FULL SHUFFLE (wide transformation)

Data is evenly redistributed across partitions.

✔ Best when:

Increasing partitions

Preparing for heavy parallel processing

Fixing skew

Preparing for joins / aggregations

Ensuring balanced partitions

❌ Slower than coalesce (due to shuffle)

In [None]:
emp_partitioned = emp.coalesce(200)
emp_partitioned.rdd.getNumPartitions()
# coalesce cant increase the num of partition

2

In [None]:
emp_partitioned = emp.coalesce(1)
emp_partitioned.rdd.getNumPartitions()
# coalesce cant increase the num of partition

1

In [None]:
# Repartition of data using repartition & coalesce for one column
emp_partitioned = emp.repartition(4, "department_id")

In [None]:
# Find the partition info for partitions and reparition
from pyspark.sql.functions import spark_partition_id

emp_1 = emp.withColumn("partition_num", spark_partition_id())
emp_1.show()

+-----------+-------------+-------------+---+------+------+----------+-------------+
|employee_id|department_id|         name|age|gender|salary| hire_date|partition_num|
+-----------+-------------+-------------+---+------+------+----------+-------------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|            0|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|            0|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|            0|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|            0|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|            0|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|            0|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|            0|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|            0|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-0

In [None]:
# Find the partition info for partitions and reparition
from pyspark.sql.functions import spark_partition_id

emp_1 = emp.repartition(4, "department_id").withColumn("partition_num", spark_partition_id())
emp_1.show()


+-----------+-------------+-------------+---+------+------+----------+-------------+
|employee_id|department_id|         name|age|gender|salary| hire_date|partition_num|
+-----------+-------------+-------------+---+------+------+----------+-------------+
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|            0|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|            0|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|            0|
|        014|          107|    Emily Lee| 26|Female| 46000|2019-01-01|            0|
|        016|          107|  Kelly Zhang| 30|Female| 49000|2018-04-01|            0|
|        020|          102|    Grace Kim| 32|Female| 53000|2018-11-01|            0|
|        012|          105|   Susan Chen| 31|Female| 54000|2017-02-15|            1|
|        017|          105|  George Wang| 34|  Male| 57000|2016-03-15|            1|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-0

In [None]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Reading from CSV Files")
    .master("local[*]")
    .getOrCreate()
)

spark


In [None]:
# Read a csv file into dataframe

# df = spark.read.format("csv").load("data/input/emp.csv")
df = spark.read.format("csv").option("header", True).option("inferSchema", True).load("/content/emp.csv")

# here we have spark jobs and it will ran to process the metadata
#option (header , true) - it will read the 7 coulmns
#option(inferSchema, True) - will try to look at the files and understand the data type
# spark - jobs , then stage and then task

In [None]:
df.printSchema()


root
 |-- employee_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- hire_date: date (nullable = true)



In [None]:
df.show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|          1|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|          2|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|          4|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|          5|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|          6|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|          7|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|          8|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|          9|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|         10|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|         11|          104|   David Park| 38|  Male| 65000|2015-11-01|
|     

In [None]:
# Reading with Schema
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date"

df_schema = spark.read.format("csv").schema(_schema).load("/content/emp.csv")

# here the spark did not created any Jobs to read the schema
# becoz we already gave the schema
# so in production we have schema - and to optimize the code
# becoz there ight me prb while spark reading the file which has data from different sources


In [None]:
df_schema.show() # NUll in first line as we have not specified the header

df_schema1 = spark.read.format("csv").option("header",True).schema(_schema).load("/content/emp.csv")
df_schema1.show()


+-----------+-------------+-------------+----+------+-------+----------+
|employee_id|department_id|         name| age|gender| salary| hire_date|
+-----------+-------------+-------------+----+------+-------+----------+
|       NULL|         NULL|         name|NULL|gender|   NULL|      NULL|
|          1|          101|     John Doe|  30|  Male|50000.0|2015-01-01|
|          2|          101|   Jane Smith|  25|Female|45000.0|2016-02-15|
|          3|          102|    Bob Brown|  35|  Male|55000.0|2014-05-01|
|          4|          102|    Alice Lee|  28|Female|48000.0|2017-09-30|
|          5|          103|    Jack Chan|  40|  Male|60000.0|2013-04-01|
|          6|          103|    Jill Wong|  32|Female|52000.0|2018-07-01|
|          7|          101|James Johnson|  42|  Male|70000.0|2012-03-15|
|          8|          102|     Kate Kim|  29|Female|51000.0|2019-10-01|
|          9|          103|      Tom Tan|  33|  Male|58000.0|2016-06-01|
|         10|          104|     Lisa Lee|  27|Femal

In [None]:
# Handle BAD records - PERMISSIVE (Default mode)
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date ,_corrupt_record string"
#add _corrupt_record string is see the Data which are BAD
df_p = spark.read.format("csv").schema(_schema).load("/content/emp_new.csv")
df_p.printSchema()
df_p.show()

# shows the columns with BAD data
df_p.where("_corrupt_record is not null").show()



root
 |-- employee_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- _corrupt_record: string (nullable = true)

+-----------+-------------+-------------+----+------+-------+----------+--------------------+
|employee_id|department_id|         name| age|gender| salary| hire_date|     _corrupt_record|
+-----------+-------------+-------------+----+------+-------+----------+--------------------+
|       NULL|         NULL|         name|NULL|gender|   NULL|      NULL|employee_id,depar...|
|          1|          101|     John Doe|  30|  Male|50000.0|2015-01-01|                NULL|
|          2|          101|   Jane Smith|  25|Female|45000.0|2016-02-15|                NULL|
|          3|          102|    Bob Brown|  35|  Male|55000.0|2014-05-01|                NULL|
|          4

In [None]:
#columnNameOfCorruptRecord
_schema1 = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date, bad_record string"

df_p1 = spark.read.format("csv").schema(_schema1).option("columnNameOfCorruptRecord", "bad_record").option("header", True).load("/content/emp_new.csv")
df_p1.printSchema()
df_p1.show()


root
 |-- employee_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- bad_record: string (nullable = true)

+-----------+-------------+-------------+---+------+-------+----------+--------------------+
|employee_id|department_id|         name|age|gender| salary| hire_date|          bad_record|
+-----------+-------------+-------------+---+------+-------+----------+--------------------+
|          1|          101|     John Doe| 30|  Male|50000.0|2015-01-01|                NULL|
|          2|          101|   Jane Smith| 25|Female|45000.0|2016-02-15|                NULL|
|          3|          102|    Bob Brown| 35|  Male|55000.0|2014-05-01|                NULL|
|          4|          102|    Alice Lee| 28|Female|48000.0|2017-09-30|                NULL|
|          5|          1

In [None]:
# Handle BAD records - DROPMALFORMED
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date"

df_m = spark.read.format("csv").option("header", True).option("mode", "DROPMALFORMED").schema(_schema).load("/content/emp_new.csv")
df_m.printSchema()

df_m.show()

root
 |-- employee_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- hire_date: date (nullable = true)

+-----------+-------------+-----------+---+------+-------+----------+
|employee_id|department_id|       name|age|gender| salary| hire_date|
+-----------+-------------+-----------+---+------+-------+----------+
|          1|          101|   John Doe| 30|  Male|50000.0|2015-01-01|
|          2|          101| Jane Smith| 25|Female|45000.0|2016-02-15|
|          3|          102|  Bob Brown| 35|  Male|55000.0|2014-05-01|
|          4|          102|  Alice Lee| 28|Female|48000.0|2017-09-30|
|          5|          103|  Jack Chan| 40|  Male|60000.0|2013-04-01|
|          6|          103|  Jill Wong| 32|Female|52000.0|2018-07-01|
|          8|          102|   Kate Kim| 29|Female|51000.0|2019-10-01|
|          9|    

In [None]:
# Handle BAD records - FAILFAST

_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date"

df_m2 = spark.read.format("csv").option("header", True).option("mode", "FAILFAST").schema(_schema).load("/content/emp_new.csv")

df_m2.printSchema()

df_m2.show()


root
 |-- employee_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- hire_date: date (nullable = true)



Py4JJavaError: An error occurred while calling o584.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 23) (503b4c7f3ca9 executor driver): org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [7,101,James Johnson,42,Male,null,15414].
Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. 
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1610)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:79)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:456)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.NumberFormatException: For input string: "Low"
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:365)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:307)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:452)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	... 26 more
Caused by: java.lang.NumberFormatException: For input string: "Low"
	at java.base/jdk.internal.math.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2054)
	at java.base/jdk.internal.math.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
	at java.base/java.lang.Double.parseDouble(Double.java:651)
	at scala.collection.immutable.StringLike.toDouble(StringLike.scala:327)
	at scala.collection.immutable.StringLike.toDouble$(StringLike.scala:327)
	at scala.collection.immutable.StringOps.toDouble(StringOps.scala:33)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$12(UnivocityParser.scala:207)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.nullSafeDatum(UnivocityParser.scala:291)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$11(UnivocityParser.scala:203)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:346)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3314)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3314)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3537)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at jdk.internal.reflect.GeneratedMethodAccessor65.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [7,101,James Johnson,42,Male,null,15414].
Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. 
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1610)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:79)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:456)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.NumberFormatException: For input string: "Low"
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:365)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:307)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:452)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	... 26 more
Caused by: java.lang.NumberFormatException: For input string: "Low"
	at java.base/jdk.internal.math.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2054)
	at java.base/jdk.internal.math.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
	at java.base/java.lang.Double.parseDouble(Double.java:651)
	at scala.collection.immutable.StringLike.toDouble(StringLike.scala:327)
	at scala.collection.immutable.StringLike.toDouble$(StringLike.scala:327)
	at scala.collection.immutable.StringOps.toDouble(StringOps.scala:33)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$12(UnivocityParser.scala:207)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.nullSafeDatum(UnivocityParser.scala:291)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$11(UnivocityParser.scala:203)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:346)
	... 29 more


PERMISSIVE

Keeps bad rows with _corrupt_record, best for data exploration and debugging.

DROPMALFORMED

Drops bad rows; best when only clean data is required.

FAILFAST

Stops on first error; best for strong validation in production.

In [None]:
# BONUS TIP
# Multiple options

_options = {
    "header" : "true",
    "inferSchema" : "true",
    "mode" : "PERMISSIVE"
}

df = (spark.read.format("csv").options(**_options).load("/content/emp_new.csv"))
df.show()


+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|          1|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|          2|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|          3|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|          4|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|          5|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|          6|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|          7|          101|James Johnson| 42|  Male|   Low|2012-03-15|
|          8|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|          9|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|         10|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|         11|          104|   David Park| 38|  Male| 65000|   no date|
|     

13 Read Complex File Formats | Parquet | ORC | Performance benefit of Parquet |Recursive File Lookup

In [None]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Reading Complex Data Formats")
    .master("local[*]")
    .getOrCreate()
)

spark

In [None]:
# Read Parquet Sales data

df_parquet = spark.read.format("parquet").load("/content/sales_data.parquet")

In [None]:
df_parquet.printSchema()


root
 |-- transacted_at: timestamp (nullable = true)
 |-- trx_id: integer (nullable = true)
 |-- retailer_id: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- city_id: integer (nullable = true)



In [None]:
df_parquet.show()

+-------------------+----------+-----------+--------------------+-------+----------+
|      transacted_at|    trx_id|retailer_id|         description| amount|   city_id|
+-------------------+----------+-----------+--------------------+-------+----------+
|2017-11-24 19:00:00|1995601912| 2077350195|Walgreen       11-25| 197.23| 216510442|
|2017-11-24 19:00:00|1734117021|  644879053|unkn    ppd id: 7...|   8.58| 930259917|
|2017-11-24 19:00:00|1734117022|  847200066|Wal-Mart  ppd id:...|1737.26|1646415505|
|2017-11-24 19:00:00|1734117030| 1953761884|Home Depot     pp...|  384.5| 287177635|
|2017-11-24 19:00:00|1734117089| 1898522855| Target        11-25|  66.33|1855530529|
|2017-11-24 19:00:00|1734117117|  997626433|Sears  ppd id: 85...| 298.87| 957346984|
|2017-11-24 19:00:00|1734117123| 1953761884|unkn   ppd id: 15...|  19.55|  45522086|
|2017-11-24 19:00:00|1734117152| 1429095612|Ikea     arc id: ...|   9.39|1268541279|
|2017-11-24 19:00:00|1734117153|  847200066|unkn        Kings...|

In [None]:
# Read the whole folder with parquet files
df_parquet = spark.read.format("parquet").load("data/input/sales_total_parquet/*.parquet")


In [None]:
# Read ORC Sales data

df_orc = spark.read.format("orc").load("/content/sales_data.orc")

In [None]:
df_orc.printSchema()

root
 |-- transacted_at: timestamp (nullable = true)
 |-- trx_id: integer (nullable = true)
 |-- retailer_id: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- city_id: integer (nullable = true)



In [None]:
df_orc.show()

+-------------------+----------+-----------+--------------------+-------+----------+
|      transacted_at|    trx_id|retailer_id|         description| amount|   city_id|
+-------------------+----------+-----------+--------------------+-------+----------+
|2017-11-24 19:00:00|1995601912| 2077350195|Walgreen       11-25| 197.23| 216510442|
|2017-11-24 19:00:00|1734117021|  644879053|unkn    ppd id: 7...|   8.58| 930259917|
|2017-11-24 19:00:00|1734117022|  847200066|Wal-Mart  ppd id:...|1737.26|1646415505|
|2017-11-24 19:00:00|1734117030| 1953761884|Home Depot     pp...|  384.5| 287177635|
|2017-11-24 19:00:00|1734117089| 1898522855| Target        11-25|  66.33|1855530529|
|2017-11-24 19:00:00|1734117117|  997626433|Sears  ppd id: 85...| 298.87| 957346984|
|2017-11-24 19:00:00|1734117123| 1953761884|unkn   ppd id: 15...|  19.55|  45522086|
|2017-11-24 19:00:00|1734117152| 1429095612|Ikea     arc id: ...|   9.39|1268541279|
|2017-11-24 19:00:00|1734117153|  847200066|unkn        Kings...|

In [None]:
# Read the whole folder with orc files
df_parquet = spark.read.format("orc").load("data/input/sales_total_orc/*.orc")


In [None]:
# Benefits of Columnar Storage

# Lets create a simple Python decorator - {get_time} to get the execution timings
# If you dont know about Python decorators - check out : https://www.geeksforgeeks.org/decorators-in-python/
import time

def get_time(func):
    def inner_get_time() -> str:
        start_time = time.time()
        func()
        end_time = time.time()
        return (f"Execution time: {(end_time - start_time)*1000} ms")
    print(inner_get_time())

In [None]:
@get_time
def x():
    df = spark.read.format("parquet").load("/content/sales_data.parquet")
    df.count()

Execution time: 594.5262908935547 ms


In [None]:
@get_time
def x():
    df = spark.read.format("parquet").load("/content/sales_data.parquet")
    df.select("trx_id").count()

Execution time: 359.3740463256836 ms


In [None]:
# BONUS TIP
# RECURSIVE READ

sales_recursive
|__ sales_1\1.parquet
|__ sales_1\sales_2\2.parquet

In [None]:
df_1 = spark.read.format("parquet").load("/content/sample_data/sales1/1.parquet")
df_1.show()

+-------------------+----------+-----------+--------------------+------+---------+
|      transacted_at|    trx_id|retailer_id|         description|amount|  city_id|
+-------------------+----------+-----------+--------------------+------+---------+
|2017-11-24 19:00:00|1734117021|  644879053|unkn    ppd id: 7...|  8.58|930259917|
+-------------------+----------+-----------+--------------------+------+---------+



In [None]:
df_1 = spark.read.format("parquet").load("/content/sample_data/sales1/sales2/2.parquet")
df_1.show()

+-------------------+----------+-----------+--------------------+------+--------+
|      transacted_at|    trx_id|retailer_id|         description|amount| city_id|
+-------------------+----------+-----------+--------------------+------+--------+
|2017-11-24 19:00:00|1734117123| 1953761884|unkn   ppd id: 15...| 19.55|45522086|
+-------------------+----------+-----------+--------------------+------+--------+



In [None]:
# Both can be read together

df_1 = spark.read.format("parquet").option("recursiveFileLookup", True).load("/content/sample_data/sales1")
df_1.show()

+-------------------+----------+-----------+--------------------+------+---------+
|      transacted_at|    trx_id|retailer_id|         description|amount|  city_id|
+-------------------+----------+-----------+--------------------+------+---------+
|2017-11-24 19:00:00|1734117123| 1953761884|unkn   ppd id: 15...| 19.55| 45522086|
|2017-11-24 19:00:00|1734117021|  644879053|unkn    ppd id: 7...|  8.58|930259917|
+-------------------+----------+-----------+--------------------+------+---------+



How to read JSON files? How to parse JSON data? How to flatten JSON data? What is explode function? What is from_json function ? What is to_json function ? How to write complex schema for JSON ?

In [None]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Reading and Parsing JSON Files/Data")
    .master("local[*]")
    .getOrCreate()
)

spark

In [None]:
# Read Single line JSON file

df_single = spark.read.format("json").load("/content/order_singleline.json")

In [None]:
df_single.printSchema()

root
 |-- contact: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_line_items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: double (nullable = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- qty: long (nullable = true)



In [None]:
df_single.show()

+--------------------+-----------+--------+--------------------+
|             contact|customer_id|order_id|    order_line_items|
+--------------------+-----------+--------+--------------------+
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|
+--------------------+-----------+--------+--------------------+



In [None]:
# Read Multiline JSON file

df_multi = spark.read.format("json").option("multiLine", True).load("/content/order_multiline.json")

In [None]:
df_multi.printSchema()


root
 |-- contact: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_line_items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: double (nullable = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- qty: long (nullable = true)



In [None]:
df_multi.show()


+--------------------+-----------+--------+--------------------+
|             contact|customer_id|order_id|    order_line_items|
+--------------------+-----------+--------+--------------------+
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|
+--------------------+-----------+--------+--------------------+



In [None]:
df = spark.read.format("text").load("/content/order_singleline.json")

In [None]:
df.printSchema()

root
 |-- value: string (nullable = true)



In [None]:
df.show()

+--------------------+
|               value|
+--------------------+
|{"order_id":"O101...|
+--------------------+



In [None]:
df.show(truncate = False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                              |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"order_id":"O101","customer_id":"C001","order_line_items":[{"item_id":"I001","qty":6,"amount":102.45},{"item_id":"I003","qty":2,"amount":2.01}],"contact":[9000010000,9000010001]}|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



In [None]:
# With Schema

_schema = "customer_id string, order_id string, contact array<long>"

df_schema = spark.read.format("json").schema(_schema).load("/content/order_singleline.json")
df_schema.show()

+-----------+--------+--------------------+
|customer_id|order_id|             contact|
+-----------+--------+--------------------+
|       C001|    O101|[9000010000, 9000...|
+-----------+--------+--------------------+



In [None]:
_schema1 = "contact array<string>, customer_id string, order_id string, order_line_items array<struct<amount double, item_id string, qty long>>"
df_schema_new = spark.read.format("json").schema(_schema1).load("/content/order_singleline.json")
df_schema_new.printSchema()
df_schema_new.show()

root
 |-- contact: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_line_items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: double (nullable = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- qty: long (nullable = true)

+--------------------+-----------+--------+--------------------+
|             contact|customer_id|order_id|    order_line_items|
+--------------------+-----------+--------+--------------------+
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|
+--------------------+-----------+--------+--------------------+



In [None]:
# Function from_json to read from a column

_schema = "contact array<string>, customer_id string, order_id string, order_line_items array<struct<amount double, item_id string, qty long>>"

from pyspark.sql.functions import from_json

df_expanded = df.withColumn("parsed", from_json(df.value, _schema))

df_expanded.printSchema()
df_expanded.show()

root
 |-- value: string (nullable = true)
 |-- parsed: struct (nullable = true)
 |    |-- contact: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- customer_id: string (nullable = true)
 |    |-- order_id: string (nullable = true)
 |    |-- order_line_items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- amount: double (nullable = true)
 |    |    |    |-- item_id: string (nullable = true)
 |    |    |    |-- qty: long (nullable = true)

+--------------------+--------------------+
|               value|              parsed|
+--------------------+--------------------+
|{"order_id":"O101...|{[9000010000, 900...|
+--------------------+--------------------+



In [None]:
# Function to_json to parse a JSON string
from pyspark.sql.functions import to_json

df_unparsed = df_expanded.withColumn("unparsed", to_json(df_expanded.parsed))
df_unparsed.printSchema()
df_unparsed.show()

root
 |-- value: string (nullable = true)
 |-- parsed: struct (nullable = true)
 |    |-- contact: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- customer_id: string (nullable = true)
 |    |-- order_id: string (nullable = true)
 |    |-- order_line_items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- amount: double (nullable = true)
 |    |    |    |-- item_id: string (nullable = true)
 |    |    |    |-- qty: long (nullable = true)
 |-- unparsed: string (nullable = true)

+--------------------+--------------------+--------------------+
|               value|              parsed|            unparsed|
+--------------------+--------------------+--------------------+
|{"order_id":"O101...|{[9000010000, 900...|{"contact":["9000...|
+--------------------+--------------------+--------------------+



In [None]:
df_unparsed.select("unparsed").show(truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|unparsed                                                                                                                                                                               |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"contact":["9000010000","9000010001"],"customer_id":"C001","order_id":"O101","order_line_items":[{"amount":102.45,"item_id":"I001","qty":6},{"amount":2.01,"item_id":"I003","qty":2}]}|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



In [None]:
# Get values from Parsed JSON
df_expanded.show()
df_1 = df_expanded.select("parsed.*")

+--------------------+--------------------+
|               value|              parsed|
+--------------------+--------------------+
|{"order_id":"O101...|{[9000010000, 900...|
+--------------------+--------------------+



In [None]:
df_1.show()

+--------------------+-----------+--------+--------------------+
|             contact|customer_id|order_id|    order_line_items|
+--------------------+-----------+--------+--------------------+
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|
+--------------------+-----------+--------+--------------------+



In [None]:
from pyspark.sql.functions import explode

df_2 = df_1.withColumn("expanded_line_items", explode("order_line_items"))

df_2.show()

+--------------------+-----------+--------+--------------------+-------------------+
|             contact|customer_id|order_id|    order_line_items|expanded_line_items|
+--------------------+-----------+--------+--------------------+-------------------+
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|  {102.45, I001, 6}|
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|    {2.01, I003, 2}|
+--------------------+-----------+--------+--------------------+-------------------+



In [None]:
df_3 = df_2.select("contact", "customer_id", "order_id", "expanded_line_items.*")
df_3.show()

+--------------------+-----------+--------+------+-------+---+
|             contact|customer_id|order_id|amount|item_id|qty|
+--------------------+-----------+--------+------+-------+---+
|[9000010000, 9000...|       C001|    O101|102.45|   I001|  6|
|[9000010000, 9000...|       C001|    O101|  2.01|   I003|  2|
+--------------------+-----------+--------+------+-------+---+



In [None]:
df_4 = df_2.select("contact", "customer_id", "order_id", "expanded_line_items.qty")
df_4.show()


+--------------------+-----------+--------+---+
|             contact|customer_id|order_id|qty|
+--------------------+-----------+--------+---+
|[9000010000, 9000...|       C001|    O101|  6|
|[9000010000, 9000...|       C001|    O101|  2|
+--------------------+-----------+--------+---+



In [None]:
# Explode Array fields
df_final = df_3.withColumn("contact_expanded", explode("contact"))
df_final.printSchema()

df_final.show()


root
 |-- contact: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- item_id: string (nullable = true)
 |-- qty: long (nullable = true)
 |-- contact_expanded: string (nullable = true)

+--------------------+-----------+--------+------+-------+---+----------------+
|             contact|customer_id|order_id|amount|item_id|qty|contact_expanded|
+--------------------+-----------+--------+------+-------+---+----------------+
|[9000010000, 9000...|       C001|    O101|102.45|   I001|  6|      9000010000|
|[9000010000, 9000...|       C001|    O101|102.45|   I001|  6|      9000010001|
|[9000010000, 9000...|       C001|    O101|  2.01|   I003|  2|      9000010000|
|[9000010000, 9000...|       C001|    O101|  2.01|   I003|  2|      9000010001|
+--------------------+-----------+--------+------+-------+---+----------------+



In [None]:
df_final.drop("contact").show()


+-----------+--------+------+-------+---+----------------+
|customer_id|order_id|amount|item_id|qty|contact_expanded|
+-----------+--------+------+-------+---+----------------+
|       C001|    O101|102.45|   I001|  6|      9000010000|
|       C001|    O101|102.45|   I001|  6|      9000010001|
|       C001|    O101|  2.01|   I003|  2|      9000010000|
|       C001|    O101|  2.01|   I003|  2|      9000010001|
+-----------+--------+------+-------+---+----------------+



15 How Spark Writes data | Write modes in Spark | Write data with Partition | Default Parallelism

In [None]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Writing data")
    .master("local[*]")
    .getOrCreate()
)

spark

In [None]:
# Spark available cores with defaultParallism in Spark UI

spark.sparkContext.defaultParallelism

2

In [None]:
# Emp Data & Schema

emp_data = [
    ["001","101","John Doe","30","Male","50000","2015-01-01"],
    ["002","101","Jane Smith","25","Female","45000","2016-02-15"],
    ["003","102","Bob Brown","35","Male","55000","2014-05-01"],
    ["004","102","Alice Lee","28","Female","48000","2017-09-30"],
    ["005","103","Jack Chan","40","Male","60000","2013-04-01"],
    ["006","103","Jill Wong","32","Female","52000","2018-07-01"],
    ["007","101","James Johnson","42","Male","70000","2012-03-15"],
    ["008","102","Kate Kim","29","Female","51000","2019-10-01"],
    ["009","103","Tom Tan","33","Male","58000","2016-06-01"],
    ["010","104","Lisa Lee","27","Female","47000","2018-08-01"],
    ["011","104","David Park","38","Male","65000","2015-11-01"],
    ["012","105","Susan Chen","31","Female","54000","2017-02-15"],
    ["013","106","Brian Kim","45","Male","75000","2011-07-01"],
    ["014","107","Emily Lee","26","Female","46000","2019-01-01"],
    ["015","106","Michael Lee","37","Male","63000","2014-09-30"],
    ["016","107","Kelly Zhang","30","Female","49000","2018-04-01"],
    ["017","105","George Wang","34","Male","57000","2016-03-15"],
    ["018","104","Nancy Liu","29","Female","50000","2017-06-01"],
    ["019","103","Steven Chen","36","Male","62000","2015-08-01"],
    ["020","102","Grace Kim","32","Female","53000","2018-11-01"]
]

emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, hire_date string"

In [None]:
# Create emp DataFrame

emp = spark.createDataFrame(data=emp_data, schema=emp_schema)

In [None]:
# Get number of partitions and show data

emp.rdd.getNumPartitions()


2

In [None]:
emp.show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|        011|          104|   David Park| 38|  Male| 65000|2015-11-01|
|     

In [None]:
# Write the data in parquet format

emp.write.format("parquet").save("/content/sample_data/emp.parquet")

In [None]:
# View data partition information
from pyspark.sql.functions import spark_partition_id

emp.withColumn("partition_id", spark_partition_id()).show()

+-----------+-------------+-------------+---+------+------+----------+------------+
|employee_id|department_id|         name|age|gender|salary| hire_date|partition_id|
+-----------+-------------+-------------+---+------+------+----------+------------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|           0|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|           0|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|           0|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|           0|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|           0|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|           0|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|           0|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|           0|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|      

In [None]:
emp.write.format("csv").option("header", True).save("/content/sample_data/11/3/emp.csv")


In [None]:
# Write the data with Partition to output location

emp.write.format("csv").partitionBy("department_id").option("header", True).save("/content/sample_data/11/4/emp.csv")

|employee\_id|name|age|gender|salary|hire\_date|
|---|---|---|---|---|---|
|012|Susan Chen|31|Female|54000|2017-02-15|
|017|George Wang|34|Male|57000|2016-03-15|

In [None]:
# Write Modes - append, overwrite, ignore and error

emp.write.format("csv").mode("append").option("header", True).save("data/output/11/3/emp.csv")

created 2 more csv's

In [None]:
emp.write.format("csv").mode("overwrite").option("header", True).save("data/output/11/3/emp.csv")

it will clean and create the

In [None]:
emp.write.format("csv").mode("ignore").option("header", True).save("data/output/11/3/emp.csv")

it will ignore if the data is available

In [None]:
emp.write.format("csv").mode("error").option("header", True).save("data/output/11/3/emp.csv")

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/content/data/output/11/3/emp.csv already exists. Set mode as "overwrite" to overwrite the existing path.

it will fail whne data is there

In [None]:
# Bonus TIP
# What if we need to write only 1 output file to share with DownStream?

emp.repartition(1).write.format("csv").option("header", True).save("data/output/11/5/emp.csv")

# here all the 2 partition is reduced to 1 and only 1 file was create , shuffling of data happened

16 Understand Spark Execution on Cluster | Cluster Manager | Cluster Deployment Modes | Spark Submit

In [None]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Cluster Execution")
    .master("spark://17e348267994:7077")
    .getOrCreate()
)

spark

changing the excutor to 4 from 8

In [None]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Cluster Execution")
    .master("spark://17e348267994:7077")
    .config("spark.executor.instances", 4)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

In [None]:
# Create a sample data frame

df = spark.range(10)

In [None]:
# Write the data of the data frame

df.write.format("csv").option("header", True).save("/data/output/15/3/range.csv")


In [None]:
# Stop Spark Settion

spark.stop()

'''
##### To setup PySpark Cluster with 2 worker, 1 master and 1 history server
Follow the below instructions carefully 👇🏻
- Clone or Download the docker images repository from `https://github.com/subhamkharwal/docker-images`
- Open CMD prompt (on Windows) or Terminal (on Mac) and move the cloned/downloaded folder
- Change to folder `pyspark-cluster-with-jupyter` and run command `docker compose up`
- The above command would setup a group of containers with 1 Jupyter Lab, 1 Master node, 2 worker nodes and 1 history server
- Run all the containers, go into the logs of Jupyter Lab container in Docker Desktop and copy the token from the URL which looks like `http://127.0.0.1:8888/lab?token=c23436751add815d6fce10071c3958aac7b4f8ebbcf05255`
- Open Jupyter Lab on url `https://localhost:8888`, paste the token and setup a new password.
- [IMPORTANT] Make sure to place your file to read or write your files to location `/data/<your path>` in order to work with cluster. (/data is important, this path is mounted across the cluster to access files or data)
- To see all your data files after execution, run the below command
```shell
%%sh
ls -ltr /data/
```
'''

# Spark Session

# file name - 12_understand_cluster.py
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Cluster Execution")
    .getOrCreate()
)

df = spark.range(10)

df.write.format("csv").option("header", True).save("/data/output/15/6/range.csv")

In [None]:
#spark submit

root@4c1bcb0312:/spark# ./bin/spark-submit --master spark://17e345857838373:7077 --num-executors 3 --executor-cores 2 --executor-memory 512M /home/jupyter/pyspark-zero-to-hero/12_understand_cluster.py

17 User Defined Function (UDF) | How Spark works with UDF | How to register Python UDF

UDF (User-Defined Function)
→ A UDF is a custom function that you define in Python/Scala/SQL and register with Spark so that you can use it on Spark DataFrame columns.

In simple words:

⭐ A UDF lets you apply your own custom logic that Spark does NOT provide natively.

⭐ Interview One-Liner

A UDF in Spark is a User-Defined Function that lets you apply custom logic on DataFrame columns when built-in Spark functions are not enough. However, UDFs are slower and should be used only when necessary.

In [None]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("User Defined Functions")
    .master("local[*]")
    .config("spark.executor.cores", 2)
    .config("spark.cores.max", 6)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

In [None]:
# Read employee data

emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, hire_date string"

emp = spark.read.format("csv").option("header", True).schema(emp_schema).load("/content/emp.txt")

emp.rdd.getNumPartitions()

1

In [None]:
emp.show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|        011|          104|   David Park| 38|  Male| 65000|2015-11-01|
|     

In [None]:
# Create a function to generate 10% of Salary as Bonus

def bonus(salary):
    return int(salary) * 0.1

In [None]:
from pyspark.sql.functions import udf

bonus_udf = udf(bonus) # it can we used fro pyspark API

# Here spark will create a python process to generate the Data
# then selarialization and de-serialization works

In [None]:
emp.withColumn("bonus", bonus_udf("salary")).show()

+-----------+-------------+-------------+---+------+------+----------+------+
|employee_id|department_id|         name|age|gender|salary| hire_date| bonus|
+-----------+-------------+-------------+---+------+------+----------+------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|5000.0|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|4500.0|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|5500.0|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|4800.0|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|6000.0|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|5200.0|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|7000.0|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|5100.0|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|5800.0|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-

In [None]:
# Register as UDF
from pyspark.sql.functions import udf

bonus_udf = udf(bonus)

spark.udf.register("bonus_sql_udf", bonus, "double")# so that we can use it for Spark SQL

In [None]:
# Create new column as bonus using UDF
from pyspark.sql.functions import expr

emp.withColumn("bonus", expr("bonus_sql_udf(salary)")).show()

+-----------+-------------+-------------+---+------+------+----------+------+
|employee_id|department_id|         name|age|gender|salary| hire_date| bonus|
+-----------+-------------+-------------+---+------+------+----------+------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|5000.0|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|4500.0|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|5500.0|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|4800.0|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|6000.0|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|5200.0|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|7000.0|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|5100.0|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|5800.0|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-

In [None]:
# Create new column as bonus without UDF
# this is happening IN JVM

emp.withColumn("bonus", expr("salary * 0.1")).show()

+-----------+-------------+-------------+---+------+------+----------+------+
|employee_id|department_id|         name|age|gender|salary| hire_date| bonus|
+-----------+-------------+-------------+---+------+------+----------+------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|5000.0|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|4500.0|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|5500.0|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|4800.0|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|6000.0|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|5200.0|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|7000.0|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|5100.0|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|5800.0|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-

18 Understand DAG, Explain Plans & Spark Shuffle with Tasks |Skipped Stage |Benefit of Shuffle Write

In [None]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Understand Plans and DAG")
    .master("local[*]")
    .getOrCreate()
)

spark

In [None]:
# Disable AQE and Broadcast join

spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [None]:
# Check default Parallism

spark.sparkContext.defaultParallelism

2

In [None]:
# Create dataframes

df_1 = spark.range(4, 200, 2)
df_2 = spark.range(2, 200, 4)

In [None]:
df_2.rdd.getNumPartitions()


2

In [None]:
# Re-partition data

df_3 = df_1.repartition(5)
df_4 = df_2.repartition(7)

In [None]:
df_4.rdd.getNumPartitions()


7

In [None]:
# Join the dataframes
# default shuffle partition is 200

df_joined = df_3.join(df_4, on="id")

In [None]:
# Get the sum of ids

df_sum = df_joined.selectExpr("sum(id) as total_sum")

In [None]:

# View data
df_sum.show()

+---------+
|total_sum|
+---------+
|     4998|
+---------+



In [None]:
df_sum.explain()


== Physical Plan ==
*(6) HashAggregate(keys=[], functions=[sum(id#217L)])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=261]
   +- *(5) HashAggregate(keys=[], functions=[partial_sum(id#217L)])
      +- *(5) Project [id#217L]
         +- *(5) SortMergeJoin [id#217L], [id#219L], Inner
            :- *(2) Sort [id#217L ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(id#217L, 200), ENSURE_REQUIREMENTS, [plan_id=245]
            :     +- Exchange RoundRobinPartitioning(5), REPARTITION_BY_NUM, [plan_id=244]
            :        +- *(1) Range (4, 200, step=2, splits=2)
            +- *(4) Sort [id#219L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(id#219L, 200), ENSURE_REQUIREMENTS, [plan_id=252]
                  +- Exchange RoundRobinPartitioning(7), REPARTITION_BY_NUM, [plan_id=251]
                     +- *(3) Range (2, 200, step=4, splits=2)




In [None]:
# Union the data again to see the skipped stages

df_union = df_sum.union(df_4)

In [None]:
df_union.show()


+---------+
|total_sum|
+---------+
|     4998|
|       14|
|       86|
|       42|
|      146|
|      134|
|      142|
|      162|
|       74|
|       94|
|       34|
|      198|
|      182|
|      126|
|      174|
|       98|
|       10|
|       82|
|      122|
|      186|
+---------+
only showing top 20 rows



In [None]:
# Explain plan

df_union.explain()

== Physical Plan ==
Union
:- *(6) HashAggregate(keys=[], functions=[sum(id#217L)])
:  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=499]
:     +- *(5) HashAggregate(keys=[], functions=[partial_sum(id#217L)])
:        +- *(5) Project [id#217L]
:           +- *(5) SortMergeJoin [id#217L], [id#219L], Inner
:              :- *(2) Sort [id#217L ASC NULLS FIRST], false, 0
:              :  +- Exchange hashpartitioning(id#217L, 200), ENSURE_REQUIREMENTS, [plan_id=483]
:              :     +- Exchange RoundRobinPartitioning(5), REPARTITION_BY_NUM, [plan_id=482]
:              :        +- *(1) Range (4, 200, step=2, splits=2)
:              +- *(4) Sort [id#219L ASC NULLS FIRST], false, 0
:                 +- Exchange hashpartitioning(id#219L, 200), ENSURE_REQUIREMENTS, [plan_id=490]
:                    +- Exchange RoundRobinPartitioning(7), REPARTITION_BY_NUM, [plan_id=489]
:                       +- *(3) Range (2, 200, step=4, splits=2)
+- ReusedExchange [id#238L], Exchange Roun

As we knowspark optimizes its Dag by skiping , In shuffling or excahnge data - spark will write the data and next step it will read the data

The benefit is - if it fails in subsequent or next stage , it can read the data from last shuffle and does not have to trigger the stage

In [None]:
# DataFrame to RDD

df_1.rdd

# any dataframe that you see is an abstraction of RDD

# RDD can be used only when we have to distribute the
# data physically with the help of code or you have to
# work extensively with spark core APIs


MapPartitionsRDD[112] at javaToPython at NativeMethodAccessorImpl.java:0

19 Understand and Optimize Shuffle in Spark


In [None]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Optimizing Shuffles")
    .master("local[*]")
    .getOrCreate()
)

spark

# Spark Session
# from pyspark.sql import SparkSession

# spark = (
#     SparkSession
#     .builder
#     .appName("Optimizing Shuffles")
#     .master("spark://17e348267994:7077")
#     .config("spark.cores.max", 16)
#     .config("spark.executor.cores", 4)
#     .config("spark.executor.memory", "512M")
#     .getOrCreate()
# )

# spark

In [None]:
# Check Spark defaultParallelism

spark.sparkContext.defaultParallelism

2

In [None]:

# Disable AQE and Broadcast join

spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [None]:
# Read EMP CSV file with 10M records

_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

emp = spark.read.format("csv").schema(_schema).option("header", True).load("/content/employee_records.csv")

In [None]:
# Find out avg salary as per dept
from pyspark.sql.functions import avg

emp_avg = emp.groupBy("department_id").agg(avg("salary").alias("avg_sal"))

In [None]:
# Write data for performance Benchmarking

emp_avg.write.format("noop").mode("overwrite").save()

#NOOP - it will be used for benchmarking , it will not write the data and will only traverse the benchmarking - only reads the data not writing

In [None]:
# Check Spark Shuffle Partition setting

spark.conf.get("spark.sql.shuffle.partitions")

'16'

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", 16)


In [None]:
from pyspark.sql.functions import spark_partition_id

emp.withColumn("partition_id", spark_partition_id()).where("partition_id = 0").show()

+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+------------+
|first_name| last_name|           job_title|       dob|               email|               phone|  salary|department_id|partition_id|
+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+------------+
|   Richard|  Morrison|Public relations ...|1973-05-05|melissagarcia@exa...|       (699)525-4827|512653.0|            8|           0|
|     Bobby|  Mccarthy|   Barrister's clerk|1974-04-25|   llara@example.net|  (750)846-1602x7458|999836.0|            7|           0|
|    Dennis|    Norman|Land/geomatics su...|1990-06-24| jturner@example.net|    873.820.0518x825|131900.0|           10|           0|
|      John|    Monroe|        Retail buyer|1968-06-16|  erik33@example.net|    820-813-0557x624|485506.0|            1|           0|
|  Michelle|   Elliott|      Air cabin crew|1975-03-31|tiffany

In [None]:
emp.write \
   .mode("overwrite") \
   .partitionBy("department_id") \
   .parquet("/content/output/emp_partitioned")


In [None]:
# Read the partitioned data

emp_part = spark.read.format("parquet").schema(_schema).option("header", True).load("/content/output/emp_partitioned")

In [None]:
emp_part.show()

+-----------+---------+--------------------+----------+--------------------+--------------------+--------+-------------+
| first_name|last_name|           job_title|       dob|               email|               phone|  salary|department_id|
+-----------+---------+--------------------+----------+--------------------+--------------------+--------+-------------+
|       Tara|      Liu|   Financial adviser|1998-10-12|alexandraobrien@e...|        216.696.6061|399503.0|            3|
|    Matthew|  Mueller|    Catering manager|1986-08-24|smithscott@exampl...|        322.415.4939| 74506.0|            3|
|    Anthony|    Moses|Geophysicist/fiel...|1999-10-01|selenawilliams@ex...|       (936)736-1889|348389.0|            3|
|    Stanley|   Berger|Clinical molecula...|1980-05-13|robert60@example.org|        725.845.0840|495918.0|            3|
|Christopher|    White|Tourist informati...|1967-11-02|clarence47@exampl...|        979-757-4546|986858.0|            3|
|       Mary| Marshall|Learning 

In [None]:
emp_avg = emp_part.groupBy("department_id").agg(avg("salary").alias("avg_sal"))


In [None]:
emp_avg.write.format("noop").mode("overwrite").save()


Here we are reading the parttition data - which will be faster as the shuffle write was small

20 Data Caching in Spark | Cache vs Persist | Spark Storage Level with Persist |Partial Data Caching


In [None]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Understand Caching")
    .master("local[*]")
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

In [None]:
# Read Sales CSV Data - 752MB Size ~ 7.2M Records

_schema = "transacted_at string, trx_id string, retailer_id string, description string, amount double, city_id string"

df = spark.read.format("csv").schema(_schema).option("header", True).load("/content/new_sales.csv")

In [None]:
df.where("amount > 300").show()


+--------------------+----------+-----------+--------------------+-------+----------+
|       transacted_at|    trx_id|retailer_id|         description| amount|   city_id|
+--------------------+----------+-----------+--------------------+-------+----------+
|2017-11-24T19:00:...|1734117022|  847200066|Wal-Mart  ppd id:...|1737.26|1646415505|
|2017-11-24T19:00:...|1734117030| 1953761884|Home Depot     pp...|  384.5| 287177635|
|2017-11-24T19:00:...|1734117153|  847200066|unkn        Kings...|2907.57|1483931123|
|2017-11-24T19:00:...|1734117241|  486576507|              iTunes|2912.67|1663872965|
|2017-11-24T19:00:...|2076947146|  511877722|unkn     ccd id: ...|1915.35|1698762556|
|2017-11-24T19:00:...|2076947113| 1996661856|AutoZone  arc id:...| 1523.6|1759612211|
|2017-11-24T19:00:...|2076946994| 1898522855|Target    ppd id:...|2589.93|2074005445|
|2017-11-24T19:00:...|2076946121|  562903918|unkn    ccd id: 5...| 315.86|1773943669|
|2017-11-24T19:00:...|2076946063| 1070485878|Amazon.co

In [None]:
# Cache DataFrame (cache or persist)
#df_cache().count() # count and write method will always preferred as they will scan the whole dataset resulting proper caching

# storage is Disk & memory and deserialized for Cache
# since cache will be created and it will be running faster
df_cache = df.where("amount > 100").cache().show()

+--------------------+----------+-----------+--------------------+-------+----------+
|       transacted_at|    trx_id|retailer_id|         description| amount|   city_id|
+--------------------+----------+-----------+--------------------+-------+----------+
|2017-11-24T19:00:...|1995601912| 2077350195|Walgreen       11-25| 197.23| 216510442|
|2017-11-24T19:00:...|1734117022|  847200066|Wal-Mart  ppd id:...|1737.26|1646415505|
|2017-11-24T19:00:...|1734117030| 1953761884|Home Depot     pp...|  384.5| 287177635|
|2017-11-24T19:00:...|1734117117|  997626433|Sears  ppd id: 85...| 298.87| 957346984|
|2017-11-24T19:00:...|1734117153|  847200066|unkn        Kings...|2907.57|1483931123|
|2017-11-24T19:00:...|1734117212| 1996661856|unkn    ppd id: 4...| 140.38| 336763936|
|2017-11-24T19:00:...|1734117241|  486576507|              iTunes|2912.67|1663872965|
|2017-11-24T19:00:...|2076947146|  511877722|unkn     ccd id: ...|1915.35|1698762556|
|2017-11-24T19:00:...|2076947113| 1996661856|AutoZone 

In [None]:
# Remove Cache

df.unpersist()

DataFrame[transacted_at: string, trx_id: string, retailer_id: string, description: string, amount: double, city_id: string]

In [None]:
df_cache = df.cache()

In [None]:
df_cache.count()

374079

In [None]:
df.where("amount > 300").show() # here also Spark is reading the date from Cache

+--------------------+----------+-----------+--------------------+-------+----------+
|       transacted_at|    trx_id|retailer_id|         description| amount|   city_id|
+--------------------+----------+-----------+--------------------+-------+----------+
|2017-11-24T19:00:...|1734117022|  847200066|Wal-Mart  ppd id:...|1737.26|1646415505|
|2017-11-24T19:00:...|1734117030| 1953761884|Home Depot     pp...|  384.5| 287177635|
|2017-11-24T19:00:...|1734117153|  847200066|unkn        Kings...|2907.57|1483931123|
|2017-11-24T19:00:...|1734117241|  486576507|              iTunes|2912.67|1663872965|
|2017-11-24T19:00:...|2076947146|  511877722|unkn     ccd id: ...|1915.35|1698762556|
|2017-11-24T19:00:...|2076947113| 1996661856|AutoZone  arc id:...| 1523.6|1759612211|
|2017-11-24T19:00:...|2076946994| 1898522855|Target    ppd id:...|2589.93|2074005445|
|2017-11-24T19:00:...|2076946121|  562903918|unkn    ccd id: 5...| 315.86|1773943669|
|2017-11-24T19:00:...|2076946063| 1070485878|Amazon.co

In [None]:
df.unpersist()


DataFrame[transacted_at: string, trx_id: string, retailer_id: string, description: string, amount: double, city_id: string]

In [None]:
df_cache = df.where("amount > 100").cache()

In [None]:
df_cache.count()

132517

In [None]:
df.where("amount > 50").show() # here it will doing dataset scan

+--------------------+----------+-----------+--------------------+-------+----------+
|       transacted_at|    trx_id|retailer_id|         description| amount|   city_id|
+--------------------+----------+-----------+--------------------+-------+----------+
|2017-11-24T19:00:...|1995601912| 2077350195|Walgreen       11-25| 197.23| 216510442|
|2017-11-24T19:00:...|1734117022|  847200066|Wal-Mart  ppd id:...|1737.26|1646415505|
|2017-11-24T19:00:...|1734117030| 1953761884|Home Depot     pp...|  384.5| 287177635|
|2017-11-24T19:00:...|1734117089| 1898522855| Target        11-25|  66.33|1855530529|
|2017-11-24T19:00:...|1734117117|  997626433|Sears  ppd id: 85...| 298.87| 957346984|
|2017-11-24T19:00:...|1734117153|  847200066|unkn        Kings...|2907.57|1483931123|
|2017-11-24T19:00:...|1734117212| 1996661856|unkn    ppd id: 4...| 140.38| 336763936|
|2017-11-24T19:00:...|1734117241|  486576507|              iTunes|2912.67|1663872965|
|2017-11-24T19:00:...|2076947148|  847200066|Wal-Mart 

In [None]:
# MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK_2
import pyspark

df_persist = df.persist(pyspark.StorageLevel.MEMORY_ONLY)

In [None]:
df_persist.write.format("noop").mode("overwrite").save() # Memory is serialized meaning nothing in Disc
# if spark storage setting is Memory_ONLY and data is not able to fit in memory then we will See OUT of memory error


In [None]:
# Remove All the Cache

spark.catalog.clearCache()

In Cache - default storage methed is Memory and Disc and data is deserialized
In persist - default storage methed we choose to use

**21 Broadcast Variable and Accumulators in Spark | How to use Spark Broadcast Variables**

In [None]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Distributed Shared Variables")
    .master("spark://17e348267994:7077")
    .config("spark.cores.max", 16)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

In [None]:
# Read EMP CSV data

_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

emp = spark.read.format("csv").schema(_schema).option("header", True).load("/content/employee_records.csv")

emp.show()


+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+
|first_name| last_name|           job_title|       dob|               email|               phone|  salary|department_id|
+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+
|   Richard|  Morrison|Public relations ...|1973-05-05|melissagarcia@exa...|       (699)525-4827|512653.0|            8|
|     Bobby|  Mccarthy|   Barrister's clerk|1974-04-25|   llara@example.net|  (750)846-1602x7458|999836.0|            7|
|    Dennis|    Norman|Land/geomatics su...|1990-06-24| jturner@example.net|    873.820.0518x825|131900.0|           10|
|      John|    Monroe|        Retail buyer|1968-06-16|  erik33@example.net|    820-813-0557x624|485506.0|            1|
|  Michelle|   Elliott|      Air cabin crew|1975-03-31|tiffanyjohnston@e...|       (705)900-5337|604738.0|            8|
|    Ashley|   Montoya|        C

In [None]:
# Variable (Lookup)
dept_names = {1 : 'Department 1',
              2 : 'Department 2',
              3 : 'Department 3',
              4 : 'Department 4',
              5 : 'Department 5',
              6 : 'Department 6',
              7 : 'Department 7',
              8 : 'Department 8',
              9 : 'Department 9',
              10 : 'Department 10'}

Problems
1. If we use a dept_df and Join with Emp_df - if will have shuffle
2. UDF can we created with a loopup variable but here variable will be serized with the task , everytime this deserialization and serialization will be happening row by row

🔶 Serialization (SER) — “Convert Object → Bytes”

Meaning:
Serialization is the process of converting a data object into a sequence of bytes (a transferable/storable format).

Why:
So it can be:

sent over a network

saved to a file

stored in memory efficiently

🔶 Deserialization (DESER) — “Convert Bytes → Object”

Meaning:
Deserialization is the process of reconstructing the original object from the byte sequence.

Why:
So a program can use the object again.

🔶 What is a Broadcast Variable in Spark?

A broadcast variable is a read-only shared variable that Spark sends only once to all executors, instead of sending it again and again with every task.

It is used to efficiently share small lookup data across all worker nodes.

🧠 Why Is It Needed?

Without broadcast variables:

Spark sends the same data with every task → waste of network + memory

For large jobs, this becomes slow and expensive

With broadcast variables:

Spark sends the data only once to each executor

All tasks on that executor use the same copy

Much faster and more memory-efficient

In [None]:
# Broadcast the variable

broadcast_dept_names = spark.sparkContext.broadcast(dept_names)

In [None]:
type(broadcast_dept_names)

In [None]:
# Check the value of the variable
broadcast_dept_names.value

{1: 'Department 1',
 2: 'Department 2',
 3: 'Department 3',
 4: 'Department 4',
 5: 'Department 5',
 6: 'Department 6',
 7: 'Department 7',
 8: 'Department 8',
 9: 'Department 9',
 10: 'Department 10'}

In [None]:
# Create UDF to return Department name

from pyspark.sql.functions import udf, col

@udf
def get_dept_names(dept_id):
    return broadcast_dept_names.value.get(dept_id)

In [None]:
emp_final = emp.withColumn("dept_name", get_dept_names(col("department_id")))


In [None]:
emp_final.show()


+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+-------------+
|first_name| last_name|           job_title|       dob|               email|               phone|  salary|department_id|    dept_name|
+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+-------------+
|   Richard|  Morrison|Public relations ...|1973-05-05|melissagarcia@exa...|       (699)525-4827|512653.0|            8| Department 8|
|     Bobby|  Mccarthy|   Barrister's clerk|1974-04-25|   llara@example.net|  (750)846-1602x7458|999836.0|            7| Department 7|
|    Dennis|    Norman|Land/geomatics su...|1990-06-24| jturner@example.net|    873.820.0518x825|131900.0|           10|Department 10|
|      John|    Monroe|        Retail buyer|1968-06-16|  erik33@example.net|    820-813-0557x624|485506.0|            1| Department 1|
|  Michelle|   Elliott|      Air cabin crew|1975-03-31|

In [None]:
# Calculate total salary of Department 6

from pyspark.sql.functions import sum

emp.where("department_id = 6").groupBy("department_id").agg(sum("salary").cast("long")).show()

# there will be shuffle here,

+-------------+---------------------------+
|department_id|CAST(sum(salary) AS BIGINT)|
+-------------+---------------------------+
|            6|                50294510721|
+-------------+---------------------------+



What Are Accumulators in Spark?

Accumulators are variables that workers (executors) can only add to, and only the driver can read the final value.

They are mostly used for:

Counters (count how many rows matched a condition)

Sums (add up values across workers)

Debugging & monitoring

In [None]:
# Accumulators # distributed variables

dept_sal = spark.sparkContext.accumulator(0)

In [None]:
# Use foreach

def calculate_salary(department_id, salary):
    if department_id == 6:
        dept_sal.add(salary)

emp.foreach(lambda row : calculate_salary(row.department_id, row.salary))

In [None]:
# View total value

dept_sal.value

50294510721.0

In [None]:
# Stop Spark Session

spark.stop()