In [1]:
from pyspark.sql import SparkSession

In [2]:
spark=SparkSession.builder.appName("Perf_Opt").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/01 12:16:26 WARN Utils: Your hostname, bhuvaneshwaran-Latitude-5420, resolves to a loopback address: 127.0.1.1; using 192.168.1.17 instead (on interface wlp0s20f3)
26/01/01 12:16:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/01 12:16:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

In [4]:
df=spark.read.parquet(r"/home/bhuvaneshwaran/Desktop/Medium/parquetFiles/Employees.parquet")
df.show(3)

+---+------------+---------------+--------------------+--------------------+-----------------+-----+----------+------+--------------------+-------+--------------------+------------------+--------------------+----------+-------------+
| id|        name|      full_name|             address|         json_string|dob_string_format|score|department|salary|        hobbies_list|user_id|            txn_date|         txn_value|              street|      city|        state|
+---+------------+---------------+--------------------+--------------------+-----------------+-----+----------+------+--------------------+-------+--------------------+------------------+--------------------+----------+-------------+
|  1|Advik Halder|     Janya Bala|H.No. 916, Choksh...|{"city": "Bengalu...|       1974-10-29|   74|     Sales| 95186|       [Photography]|      2|2025-11-29 13:20:...|132.71083639903748|   20/662\nDua Nagar|Coimbatore|    Rajasthan|
|  2| Omaja Baria|   Manya Bhakta|492, Dass Road, B...|{"city": 

## üî• 1. Partitions ‚Äî The Most Important Performance Concept
‚ùì What is a partition?

A partition is a chunk of data processed by one task.
    
    * More partitions ‚Üí more parallelism
    
    * Fewer partitions ‚Üí less parallelism
    
Think like this:
    
    One partition = one worker doing one task

## üîç Check number of partitions


In [5]:
df.rdd.getNumPartitions()

1

Explanation:

df.rdd ‚Üí converts DataFrame to underlying RDD

getNumPartitions() ‚Üí returns number of partitions

‚ùì Why partitions matter?

If:

    * Partitions are too few ‚Üí CPUs sit idle

    * Partitions are too many ‚Üí overhead increases

## üî• Default behavior (Important)

    * Reading CSV ‚Üí usually 1 partition
    
    * Reading Parquet ‚Üí partitioned automatically
    
    * spark.sql.shuffle.partitions default = 200

This default is often wrong for local machines.

## üî• 2. Repartition vs Coalesce (VERY COMMON CONFUSION)
üîπ repartition()

In [6]:
df2 = df.repartition(10)

What it does:

    * Increases or decreases partitions
    
    * Causes a shuffle
    
    * Redistributes data evenly

When to use:

    * After heavy filtering
    
    * Before joins
    
    * When data is skewed
    
üîπ coalesce()

In [7]:
df2 = df.coalesce(2)

What it does:

    * Only reduces partitions
    
    * Avoids shuffle
    
    * Faster than repartition

When to use:

    * Before writing output
    
    * When reducing file count

| Method      | Shuffle | Can Increase Partitions | Speed  |
| ----------- | ------- | ----------------------- | ------ |
| repartition | ‚úÖ Yes   | ‚úÖ Yes                   | Slower |
| coalesce    | ‚ùå No    | ‚ùå No                    | Faster |

## üî• 3. Shuffles ‚Äî The Biggest Performance Killer
‚ùì What is a shuffle?

A shuffle happens when Spark needs to move data across partitions.

This happens during:

    * groupBy
    
    * join
    
    * distinct
    
    * orderBy
    
    * repartition

## üîç Example that causes shuffle

In [8]:
df.columns

['id',
 'name',
 'full_name',
 'address',
 'json_string',
 'dob_string_format',
 'score',
 'department',
 'salary',
 'hobbies_list',
 'user_id',
 'txn_date',
 'txn_value',
 'street',
 'city',
 'state']

In [9]:
df.groupBy("department").count()

DataFrame[department: string, count: bigint]

Spark must:

    * Move all same departments together
    
    * This requires network transfer
    
    * Disk + memory + CPU ‚Üí expensive

‚ùó Why shuffles are slow?

    * Network IO
    
    * Disk spill
    
    * Serialization cost
    
    * Memory pressure

üß† How to reduce shuffle impact?

    * Filter early
    
    * Reduce columns before groupBy
    
    * Use broadcast joins
    
    * Reduce spark.sql.shuffle.partitions

## üî• 4. Broadcast Joins ‚Äî Huge Performance Boost
‚ùì When joining large + small tables

In [10]:
dim_df=df.select("id","salary")

In [11]:
from pyspark.sql.functions import when
dim_df=dim_df.withColumn("Sal_Cat",when(dim_df.salary>80000 , "Good").
                 when(dim_df.salary>50000,"Medium").
                 otherwise( "Low")).select("id","Sal_Cat")

In [12]:
dim_df.show()

+---+-------+
| id|Sal_Cat|
+---+-------+
|  1|   Good|
|  2| Medium|
|  3|   Good|
|  4|   Good|
|  5| Medium|
|  6|   Good|
|  7|    Low|
|  8|   Good|
|  9|   Good|
| 10|    Low|
| 11| Medium|
| 12|   Good|
| 13|   Good|
| 14| Medium|
| 15|   Good|
| 16|   Good|
| 17|   Good|
| 18|   Good|
| 19|   Good|
| 20|   Good|
+---+-------+
only showing top 20 rows


In [13]:
df.columns

['id',
 'name',
 'full_name',
 'address',
 'json_string',
 'dob_string_format',
 'score',
 'department',
 'salary',
 'hobbies_list',
 'user_id',
 'txn_date',
 'txn_value',
 'street',
 'city',
 'state']

In [14]:
from pyspark.sql.functions import broadcast

df_new=df.join(broadcast(dim_df), "id")
df_new.columns

['id',
 'name',
 'full_name',
 'address',
 'json_string',
 'dob_string_format',
 'score',
 'department',
 'salary',
 'hobbies_list',
 'user_id',
 'txn_date',
 'txn_value',
 'street',
 'city',
 'state',
 'Sal_Cat']

Explanation of broadcast()

    * Tells Spark: ‚ÄúThis table is small‚Äù
    
    * Spark sends it to all executors
    
    * Avoids shuffle of large table

When to use:

    * Lookup tables
    
    * Dimension tables
    
    * Reference data

When NOT to use:

    * If table is large
    
    * If memory is limited

## üî• 5. Caching & Persistence (Use Carefully)
üîπ cache()

In [15]:
df.cache()

DataFrame[id: bigint, name: string, full_name: string, address: string, json_string: string, dob_string_format: string, score: bigint, department: string, salary: bigint, hobbies_list: array<string>, user_id: bigint, txn_date: timestamp_ntz, txn_value: double, street: string, city: string, state: string]

In [16]:
df_new.cache()

DataFrame[id: bigint, name: string, full_name: string, address: string, json_string: string, dob_string_format: string, score: bigint, department: string, salary: bigint, hobbies_list: array<string>, user_id: bigint, txn_date: timestamp_ntz, txn_value: double, street: string, city: string, state: string, Sal_Cat: string]

In [17]:
df.count()

                                                                                

20000

Explanation:

    * Stores DataFrame in memory
    
    * Only happens after an action
    
    * Speeds up repeated usage

üîπ persist()

In [18]:
df.persist()

26/01/01 12:16:35 WARN CacheManager: Asked to cache already cached data.


DataFrame[id: bigint, name: string, full_name: string, address: string, json_string: string, dob_string_format: string, score: bigint, department: string, salary: bigint, hobbies_list: array<string>, user_id: bigint, txn_date: timestamp_ntz, txn_value: double, street: string, city: string, state: string]

Allows different storage levels (memory + disk).

‚ùó Common mistake:

Caching everything ‚ùå
Caching unused data ‚ùå

When to cache:

    * Same DataFrame used multiple times
    
    * After expensive transformations

## üî• 6. collect() ‚Äî The Silent Killer

In [19]:
df.collect()

[Row(id=1, name='Advik Halder', full_name='Janya Bala', address='H.No. 916, Chokshi Road, Mumbai, Himachal Pradesh, 383408', json_string='{"city": "Bengaluru", "zipcode": "724094", "is_capital": false}', dob_string_format='1974-10-29', score=74, department='Sales', salary=95186, hobbies_list=['Photography'], user_id=2, txn_date=datetime.datetime(2025, 11, 29, 13, 20, 36, 61574), txn_value=132.71083639903748, street='20/662\nDua Nagar', city='Coimbatore', state='Rajasthan'),
 Row(id=2, name='Omaja Baria', full_name='Manya Bhakta', address='492, Dass Road, Bengaluru, Mizoram, 103082', json_string='{"city": "Coimbatore", "zipcode": "494894", "is_capital": true}', dob_string_format='1978-04-26', score=33, department='Finance', salary=54210, hobbies_list=['Gaming', 'Cooking', 'Photography', 'Reading'], user_id=3, txn_date=datetime.datetime(2025, 11, 28, 13, 20, 36, 61679), txn_value=829.31653998402, street='H.No. 913, Chander Path', city='Bengaluru', state='Uttar Pradesh'),
 Row(id=3, name=

What it does:

    * Brings ALL data to driver memory

Why dangerous:

    * Driver OOM crash
    
    * Works on small data, fails in production
    
Safe alternatives:

In [20]:
df.show(3)

+---+------------+---------------+--------------------+--------------------+-----------------+-----+----------+------+--------------------+-------+--------------------+------------------+--------------------+----------+-------------+
| id|        name|      full_name|             address|         json_string|dob_string_format|score|department|salary|        hobbies_list|user_id|            txn_date|         txn_value|              street|      city|        state|
+---+------------+---------------+--------------------+--------------------+-----------------+-----+----------+------+--------------------+-------+--------------------+------------------+--------------------+----------+-------------+
|  1|Advik Halder|     Janya Bala|H.No. 916, Choksh...|{"city": "Bengalu...|       1974-10-29|   74|     Sales| 95186|       [Photography]|      2|2025-11-29 13:20:...|132.71083639903748|   20/662\nDua Nagar|Coimbatore|    Rajasthan|
|  2| Omaja Baria|   Manya Bhakta|492, Dass Road, B...|{"city": 

In [21]:
df.take(10)

[Row(id=1, name='Advik Halder', full_name='Janya Bala', address='H.No. 916, Chokshi Road, Mumbai, Himachal Pradesh, 383408', json_string='{"city": "Bengaluru", "zipcode": "724094", "is_capital": false}', dob_string_format='1974-10-29', score=74, department='Sales', salary=95186, hobbies_list=['Photography'], user_id=2, txn_date=datetime.datetime(2025, 11, 29, 13, 20, 36, 61574), txn_value=132.71083639903748, street='20/662\nDua Nagar', city='Coimbatore', state='Rajasthan'),
 Row(id=2, name='Omaja Baria', full_name='Manya Bhakta', address='492, Dass Road, Bengaluru, Mizoram, 103082', json_string='{"city": "Coimbatore", "zipcode": "494894", "is_capital": true}', dob_string_format='1978-04-26', score=33, department='Finance', salary=54210, hobbies_list=['Gaming', 'Cooking', 'Photography', 'Reading'], user_id=3, txn_date=datetime.datetime(2025, 11, 28, 13, 20, 36, 61679), txn_value=829.31653998402, street='H.No. 913, Chander Path', city='Bengaluru', state='Uttar Pradesh'),
 Row(id=3, name=

In [22]:
df.limit(4).collect()

[Row(id=1, name='Advik Halder', full_name='Janya Bala', address='H.No. 916, Chokshi Road, Mumbai, Himachal Pradesh, 383408', json_string='{"city": "Bengaluru", "zipcode": "724094", "is_capital": false}', dob_string_format='1974-10-29', score=74, department='Sales', salary=95186, hobbies_list=['Photography'], user_id=2, txn_date=datetime.datetime(2025, 11, 29, 13, 20, 36, 61574), txn_value=132.71083639903748, street='20/662\nDua Nagar', city='Coimbatore', state='Rajasthan'),
 Row(id=2, name='Omaja Baria', full_name='Manya Bhakta', address='492, Dass Road, Bengaluru, Mizoram, 103082', json_string='{"city": "Coimbatore", "zipcode": "494894", "is_capital": true}', dob_string_format='1978-04-26', score=33, department='Finance', salary=54210, hobbies_list=['Gaming', 'Cooking', 'Photography', 'Reading'], user_id=3, txn_date=datetime.datetime(2025, 11, 28, 13, 20, 36, 61679), txn_value=829.31653998402, street='H.No. 913, Chander Path', city='Bengaluru', state='Uttar Pradesh'),
 Row(id=3, name=

## üî• 7. Avoid UDFs Whenever Possible
‚ùå Python UDF

In [23]:
from pyspark.sql.functions import udf

Why bad:

    * Runs row by row
    
    * Breaks optimization
    
    * Slower execution

‚úÖ Prefer built-in functions

Spark functions are:

    * JVM optimized
    
    * Vectorized
    
    * Catalyst-aware

Alternative:

    * pandas_udf (vectorized)
    
    * Built-in functions (best)

## üî• 8. Filter Early, Reduce Data Early (Golden Rule)

‚ùå Bad practice:

In [24]:
df.groupBy("city").count().filter(df.city == "Chennai").show()

+-------+-----+
|   city|count|
+-------+-----+
|Chennai| 3974|
+-------+-----+



## ‚úÖ Good practice:

In [25]:
df.filter(df.city == "Chennai").select("id","city").show(5)

+---+-------+
| id|   city|
+---+-------+
| 12|Chennai|
| 21|Chennai|
| 31|Chennai|
| 40|Chennai|
| 43|Chennai|
+---+-------+
only showing top 5 rows


In [26]:
df.filter(df.city == "Chennai").groupBy("city").count().show()

+-------+-----+
|   city|count|
+-------+-----+
|Chennai| 3974|
+-------+-----+



Why?

    * Less data shuffled
    
    * Faster execution
    
    * Less memory usage

## üî• 9. File Format Matters More Than Code
‚ùå CSV

    * No schema
    
    * No compression
    
    * Slow

‚úÖ Parquet

    * Columnar
    
    * Compressed
    
    * Faster reads
    
    * Predicate pushdown

In [28]:
df.write.parquet("Emp_wtn.parquet")

## üî• 10. Understanding .explain() (Very Important)

In [29]:
df.explain()

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [id#0L,name#1,full_name#2,address#3,json_string#4,dob_string_format#5,score#6L,department#7,salary#8L,hobbies_list#9,user_id#10L,txn_date#11,txn_value#12,street#13,city#14,state#15] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/bhuvaneshwaran/Desktop/Medium/parquetFiles/Employees.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,name:string,full_name:string,address:string,json_string:string,dob_string_format...




What it shows:

    * Logical plan
    
    * Physical plan
    
    * Shuffles
    
    * Joins
    
    * Filters

Reading execution plans makes you a real Spark engineer.

üî• 11. Spark UI ‚Äî Your Best Debug Tool

When Spark is running:

* Open http://localhost:4040

* Check:

    * Stages
    
    * Tasks
    
    * Shuffle read/write
    
    * Execution time
### ‚ùó Common Beginner Mistakes

| Mistake             | Impact           |
| ------------------- | ---------------- |
| Using collect()     | Driver crash     |
| Too many partitions | Overhead         |
| Too few partitions  | Poor parallelism |
| Using CSV           | Slow IO          |
| Overusing UDFs      | Slow execution   |
| Ignoring Spark UI   | Blind debugging  |
