In [1]:
!ls /opt/spark/data

Fire_Department_Calls_For_Service__2016__20240816.csv


In [2]:
from pyspark.sql import SparkSession

spark = (SparkSession
             .builder
             #.enableHiveSupport()
             .master("spark://spark-master:7077")
             .config("spark.sql.warehouse.dir", "/opt/spark/spark-warehouse")
             .getOrCreate()
             )

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/30 15:30:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
raw_df=(
    spark
    .read
    .format("csv")
    .option("header", True)
    .option("inferSchema", False)
    .load("/opt/spark/data/Fire_Department_Calls_For_Service__2016__20240816.csv")
)

### Reading CSV without explict schema will trigger a job, it read 1st record of the file.

In [4]:
raw_df.rdd.getNumPartitions()

10

### To avoid reading of files for each action, let's cache it
### Count will trigger 3 jobs, 
1. For File scan 
2. File scan and cache
3. Read from cache and find count

In [5]:
raw_df.cache()
raw_df.count() 

25/04/30 15:31:09 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

2748979

In [8]:
print(f"Original columns names:\n{raw_df.columns}")
renamed_df=raw_df
for cols in renamed_df.columns:
    renamed_df=renamed_df.withColumnRenamed(cols, cols.lower().replace(" ","_"))

print(f"Formatted columns names:\n{renamed_df.columns}")

Original columns names:
['Call Number', 'Unit ID', 'Incident Number', 'Call Date', 'Watch Date', 'Received DtTm', 'Entry DtTm', 'Dispatch DtTm', 'Response DtTm', 'On Scene DtTm', 'Transport DtTm', 'Hospital DtTm', 'Call Final Disposition', 'Available DtTm', 'Address', 'City', 'Zipcode of Incident', 'Battalion', 'Station Area', 'Box', 'Original Priority', 'Priority', 'Final Priority', 'ALS Unit', 'Call Type Group', 'Number of Alarms', 'Unit Type', 'Unit sequence in call dispatch', 'Fire Prevention District', 'Supervisor District', 'Neighborhooods - Analysis Boundaries', 'RowID']
Formatted columns names:
['call_number', 'unit_id', 'incident_number', 'call_date', 'watch_date', 'received_dttm', 'entry_dttm', 'dispatch_dttm', 'response_dttm', 'on_scene_dttm', 'transport_dttm', 'hospital_dttm', 'call_final_disposition', 'available_dttm', 'address', 'city', 'zipcode_of_incident', 'battalion', 'station_area', 'box', 'original_priority', 'priority', 'final_priority', 'als_unit', 'call_type_grou

In [9]:
unit_types_df=renamed_df.select("unit_type").drop_duplicates()

In [10]:
unit_types_df.rdd.getNumPartitions() # --> This will create a new job

[Stage 5:>                                                        (0 + 10) / 10]

1

### Why .rdd.getNumPartitions() causes a Spark job:
- .drop_duplicates() requires a shuffle
- To remove duplicates, Spark must group similar records together.
- This grouping causes a shuffle, which redistributes data across partitions.
- Shuffles create new partitions.
- After shuffling, Spark reorganizes the data into new partitions.
- Accessing .rdd is lazy, Simply calling .rdd on a DataFrame does not trigger computation. It just converts the logical plan into an RDD lineage.
- Calling .rdd.getNumPartitions() forces execution
- Spark needs to materialize the RDD to know the actual number of partitions.
- To do this, Spark finalizes the logical plan, optimizes it, and builds a physical execution plan.
- This process triggers a Spark job
- Since Spark must execute the shuffle and physically create partitions in memory, a Spark job is triggered.

In [11]:
renamed_df.rdd.getNumPartitions()

10

In [12]:
from pyspark.sql.functions import count
agg_df=renamed_df.groupBy("unit_type").agg(count("*").alias("cnt_of_unit_type"))

In [13]:
agg_df.rdd.getNumPartitions()



1

In [12]:
spark.conf.get("spark.sql.shuffle.partitions")

'200'

In [13]:
spark.conf.get("spark.sql.adaptive.enabled")

'true'

In [14]:
spark.conf.set("spark.sql.adaptive.enabled", "false")

In [20]:
agg_df.rdd.getNumPartitions()

20

In [18]:
spark.conf.set("spark.sql.shuffle.partitions", 20)

In [21]:
agg_df.rdd.getNumPartitions()

20