In [1]:
import os
# Spark Session 
# Set Java 17 path
os.environ["JAVA_HOME"] = "C:\\Program Files\\Java\\jdk-17"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "\\bin;" + os.environ["PATH"]

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyLocalSparkSession") \
    .master("local[*]") \
    .getOrCreate()

print("Spark version:", spark.version)


Spark version: 4.0.0


In [2]:
# Reading the data (With inferschema)
df_Flight_Data = spark.read.csv("Flight_Data.csv", header=True, inferSchema=True)
df_Flight_Data.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows


In [3]:
# schema info when infer schema is true 
df_Flight_Data.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



In [4]:
# Reading data without inferschema 
df_Flight_Data_No_infer = spark.read.csv("Flight_Data.csv", header=True, inferSchema=False)
df_Flight_Data_No_infer.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows


In [5]:
#Schema when infer is false 
df_Flight_Data_No_infer.printSchema()
# Here you will see the count is string 

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: string (nullable = true)



# Dealing with Corroupted Record 

## Potential Interview Questions:
I. Have you worked with corrupted records?

II. When do you say that it’s a corrupted record?

III. What happens when we encounter corrupted records in different read modes?

IV. How can we print bad records?

V. Where do you store corrupted records and how can we access them later?) 
1 and 2) yes, Mostly the bad files are jsoj, csv
    Common bad records are missing key value from json file, extra comma or recroed in csv file
    value missing etc....
3) Permissive mode: Fill null in place of bad record
   fail fast: immidiately error out after it gets bad record
   DropMalInformed: Drop the courroupted record
4) Create schema for bad records that includes all data columns and one more for bad record , and then read the data again with passing that schema
5) .option('badrecords', 'Path') - To store the bad record 
 

In [6]:
permissive_flight_data = spark.read.format('csv')\
                        .option('header', 'true')\
                        .option('inferschema' , 'true')\
                        .option('mode','PERMISSIVE')\
                        .load('Flight_Data_Cour.csv')

permissive_flight_data.show()

# allows the record

+--------------------+-------------------+------+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+--------------------+-------------------+------+
|       United States|            Romania| India|
|       United States|            Ireland| India|
|       United States|              India| India|
|               Egypt|      United States|    24|
|   Equatorial Guinea|      United States|     1|
|       United States|          Singapore|    25|
|       United States|            Grenada|    54|
|          Costa Rica|      United States|   477|
|             Senegal|      United States|    29|
|       United States|   Marshall Islands|    44|
|              Guyana|      United States|    17|
|       United States|       Sint Maarten|    53|
|               Malta|      United States|     1|
|             Bolivia|      United States|    46|
|            Anguilla|      United States|    21|
|Turks and Caicos ...|      United States|   136|
|       United States|        Afghanistan|     2|


In [7]:
failfast_flight_data = spark.read.format('csv')\
                        .option('header', 'true')\
                        .option('inferschema' , 'true')\
                        .option('mode','FAILFAST')\
                        .load('Flight_Data_Cour.csv')

failfast_flight_data.show()
# fail while loading

Py4JJavaError: An error occurred while calling o54.showString.
: org.apache.spark.SparkException: [FAILED_READ_FILE.NO_HINT] Encountered error while reading file file:///C:/Users/shant/Desktop/PySpark-DataBricks/Coading/Flight_Data_Cour.csv.  SQLSTATE: KD001
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:856)
	at org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2$.attachFilePath(FileDataSourceV2.scala:142)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:142)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:402)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:842)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2484)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2505)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2524)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:544)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:497)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:58)
	at org.apache.spark.sql.classic.Dataset.collectFromPlan(Dataset.scala:2244)
	at org.apache.spark.sql.classic.Dataset.$anonfun$head$1(Dataset.scala:1379)
	at org.apache.spark.sql.classic.Dataset.$anonfun$withAction$2(Dataset.scala:2234)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.classic.Dataset.$anonfun$withAction$1(Dataset.scala:2232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233)
	at org.apache.spark.sql.classic.Dataset.withAction(Dataset.scala:2232)
	at org.apache.spark.sql.classic.Dataset.head(Dataset.scala:1379)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2810)
	at org.apache.spark.sql.classic.Dataset.getRows(Dataset.scala:339)
	at org.apache.spark.sql.classic.Dataset.showString(Dataset.scala:375)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [United States,Romania, India].
Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.  SQLSTATE: 22023
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1525)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.throwMalformedRecordsDetectedInRecordParsingError(FailureSafeParser.scala:92)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:82)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:474)
	at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:594)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:608)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext0(FileScanRDD.scala:131)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:292)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext0(FileScanRDD.scala:131)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:140)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:402)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: United States,Romania, India,1 SQLSTATE: KD000
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedCSVRecordError(QueryExecutionErrors.scala:1322)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$convert$4(UnivocityParser.scala:350)
	... 30 more


In [None]:
dropmalinformed_flight_data = spark.read.format('csv')\
                        .option('header', 'true')\
                        .option('inferschema' , 'true')\
                        .option("mode", "DROPMALFORMED")\
                        .load('Flight_Data_Cour.csv')

dropmalinformed_flight_data.show()
# drop the miss informed record 

# Transformation and Actions
## Potential Interview questions
I. What is transformation and how many types of transformation do we have?

II. What happens when we use group by or join in transformation?

III. How jobs are created in Spark?




In [None]:
#1) 
# Transformation : Any process that we perform on data is called transformation 
# Action: .show(), .count() - kind of calling function
# Two type: narrow, wide 
# narrow - partitions are not dependant on each other, no shuffle, filter, select etc..
# wide - when depends on each other, shuffle , group by, join , and it is expensive 



In [None]:
#2) 
# wide transformation happens ,
# it is more expensive
# lots of shuffling happens between partitions


In [None]:
#3) 
"""
Bacically action creates job
Every action has its seperate job
job = 
.action 
        - count
        - show
        - collect
            - this creates job 
"""

# DAG and Lazy Evaluation 

Directed Ascyclic Graph:  A flow diagram for each job in spark
In DAG if it is gray , meaning it is already execuated and dont need to execute again 
Lazy Evaluation : Before execution each job is evaluated lazily and then out of many plans best plan will be choosen for execution.

# How to read Json file
## Potential Interview Questions: 
I. What is JSON data and how to read it in Spark?

II. What if JSON has 3 keys in all lines and 4 keys in one line?

III. What is multiline and line-delimited JSON?

IV. Which one works faster — multiline or line-delimited?

V. How to convert nested JSON into Spark DataFrame?

VI. What will happen if JSON has a corrupted JSON file or JSON record?

In [None]:
# 1) Json data is semi structred data, contains key value pair 

json_file = spark.read.format("json") \
    .option("multiline", "true") \
    .option("inferSchema", "true") \
    .option("mode", "PERMISSIVE") \
    .load("sample_flights.json") 
  

json_file.show()

In [None]:
# 2) Then it will fail , or depending the mode of the file reader api, it will either drop it, read it or fail at the time of reading the file 

In [None]:
# 3) Line deli - data in single line {} (.option("multiline", "false") \ )
#    Multi Line -  data in multiple line  ( .option("multiline", "true") \)
"""
example of line deli:
{key : value, key : value }
example of multi line: 
{
key : value,
key : value,
key : value
}
for performance line deli is good, as it reads the data from one line and moves it to anoter line, for multi line , it has to make sure the data is in one {} , so little hard to read 
"""

In [None]:
# 4),6)  for performance line deli is good, as it reads the data from one line and moves it to anoter line, for multi line , it has to make sure the data is in one {} , so little hard to read 


In [None]:
#5 ) Nested Json reading :

Nested_json_file = spark.read.format("json") \
    .option("multiline", "true") \
    .option("inferSchema", "true") \
    .option("mode", "PERMISSIVE") \
    .load("nestedejson.json") 
  

Nested_json_file.show()
Nested_json_file.printSchema()

# SQL Engine

## Potential Interview questions

I. What is Catalyst Optimizer / Spark SQL Engine?

II. Why do we get Analysis Exception error?

III. What is Catalog?

IV. What is Physical Planning / Spark Plan?

V. Is Spark SQL Engine a compiler?

VI. How many phases are involved in Spark SQL Engine to convert a code into Java bytecode?

I. What is Catalyst Optimizer / Spark SQL Engine?

Catalyst Optimizer is the query optimization framework inside Spark SQL.
It converts logical plans into optimized physical plans for better performance.
It uses techniques like predicate pushdown, constant folding, and join optimization.

II. Why do we get Analysis Exception error?

This occurs when Spark cannot analyze the logical plan.
Common causes: missing columns, invalid paths, unsupported operations, or schema mismatches.
It happens before execution, during the query analysis phase.

III. What is Catalog?

The Catalog stores metadata about databases, tables, columns, and functions in Spark SQL.
It helps Spark resolve table names and manage schemas efficiently.
You can access it using spark.catalog API.

IV. What is Physical Planning / Spark Plan?

Physical Planning is the process where Spark decides how to execute a query.
The Catalyst Optimizer generates multiple physical plans and picks the most efficient one.
You can view it using .explain(mode="extended").

V. Is Spark SQL Engine a compiler?

Yes ✅ Spark SQL Engine acts like a compiler.
It converts high-level queries (SQL or DataFrame API) into optimized RDD operations.
Finally, these are compiled into Java bytecode and executed on the cluster.

VI. How many phases are involved in Spark SQL Engine to convert code into Java bytecode?

There are four main phases:

Parsing → converts query into an unresolved logical plan

Analysis + Optimization → applies Catalyst rules to optimize

Physical Planning & Code Generation → generates Java bytecode for execution

# RDD
Potential Interview questions

1) What is RDD
2) When do we need RDD
3) Features of RDD
4) What is dataframe/dataset
5) Why we should not use an RDD 

1) RDD: Rescilient Distrubuted Dataset (Data Structure) (Immutable)
   R :  In case of faliure, it can recover (wuth the help of DAG)
   D : it is distributed over the cluster
   D : Data
2) Good for unstructured, type safe (compile time error), when we want full control over our data 
3) Fault Tolarance, immutable, lazy, optimization
4) structured API/Dataset Column row format data - data frame, easy to read 
5) disadvantage- no optimization done by spark, complex code for RDD , and slow code, less readability

RDD - How to ? (Meaning we need to tell RDD, how to do? )
Dataframe - What to  ? (Meaning we need to tell dataframe what to do ? it will tell automatically )

# Parque File format 

I. What is Parquet file format? 

II. Why do we need Parquet? fast, optimized, OLAP, Structured, Encrypted 

III. How to read Parquet file? 

IV. What makes Parquet a default choice?

V. What encoding is done on data? RLP (Run lenght encoding) (will convert aaaaaavvvvvvbbce to a6v6b2c1e1 to make it faster)

VI. What compression techniques are used? GZIP, SNAPPY

VII. How to optimize the Parquet file? Use Analyze command 

VIII. What is row group, column, and pages ?  after reading file it will automatically group the data in chunk of row, column and these row and column chunk are in page

IX. How projection Pruning and predicate pushdown works ? 

Predicate Pushdown: Spark optimization where filter conditions are pushed down to the data source (e.g., Parquet, ORC) instead of filtering after data is loaded.
Projection Pruning : The columns which are not required are not involved or called (select id where age< 19 , so if in a group <19 is not available it will stop to scan the data  )

1) It is a file format, columanar based file format, it is structured file format, and it is in binary type. (Actually in hybrid not row or columnar)
   OLAP - Online Analytical processing (Used for analytical , as there is less need to update) (Columnar file format)
   OLTP - Online Transcation Processing (Used where we need to update the record), (Row based file format)

   

In [None]:
# Read parquet file

df_parquet = spark.read.parquet('part-r-00000-1a9822ba-b8fb-4d8e-844a-ea30d0801b9e.gz.parquet')
df_parquet.show()

# How to write data in spark ? 
## Potential Interview questions 

1) What are the modes available in dataframe writer?
2) What is partitionBy and bucketBy?
      A. WWhy we need these two ?
      B. When to use whicn ? 
4) How to write data into multiple partitions?

The general structure of how to write data in spark is 
df = spark.write.format("mention your format")\
    .option('header','true')\
    .option('mode', 'mention mode ')\
    .partitionBy('')\
    .bucketBy('')\
    .option('path','----')\
    .save()


1) Modes are append, overwrite, errorifexist, ignore
   1) Append: It will add the file in the location
   2) Overwrite:  delete the first file and saves the new file
   3) ErrorIfExist : check if location or file exists at the same location, if the file exists in the location it throws error(file exists/location dosent exist)
   4) Ignore: It will keep the existing file in the location and will not write new file (basically it ignores the new file and keep the existing one)
    

In [None]:
# Example:

# first lets read the file

employee_df = spark.read.format('csv')\
            .option('header','true')\
            .option('inferschema','true')\
            .option('mode','PERMISSIVE')\
            .load('employee_data.csv')

employee_df.show(20)

In [None]:
# Partition Example
# writeEmpdf = employee_df.write.format('csv') \
#     .option('header', 'true') \
#     .option('mode', 'overwrite') \
#     .option('path', 'writtenFile.csv') \
#     .partitionBy('ColumnName') \
#     .save()

# Advantages of Partition:
# 1) No need to scan the full data.
# 2) Helps scan data in chunks.
# 3) Fails if the partition column has very few distinct values (uneven partitioning).
# 4) Causes shuffling of data during write.

# Bucket Example
# writeEmpdf = employee_df.write \
#     .bucketBy(4, 'ColumnName') \
#     .sortBy('ColumnName') \
#     .option('header', 'true') \
#     .option('mode', 'overwrite') \
#     .saveAsTable('BucketedTable')

# Advantages of Bucketing:
# 1) No shuffling during write.
# 2) Faster for joins and aggregations.

# Partition vs Bucketing:
# Partition By                   | Bucket By
# ------------------------------ | ------------------------------
# Divides data by actual values  | Divides data by hash of column
# Creates dynamic number of parts| Creates fixed number of buckets
# Creates separate folders       | All bucket files stored in one folder
# Best for filter-heavy queries  | Best for join-heavy queries

# Spark Session Vs Spark Context 

Both provies entry to spark session.

before spark 1.0
for everything we need to create context 
example
1) sql - sql context
2) spark - sparl context
3) hive - hive context 

now after 1.0 everything is in spark session, no need to do seperately 

# Job, Stage, Task, Applicaion 

Read the book 

# How to create datafram in spark ?

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType

my_data = [(1,2),(1,2),(1,2),(1,2),(1,2),(1,2)]

schema = StructType([
    StructField("ID",  IntegerType(), True),
    StructField("Num", IntegerType(), True),
])

df = spark.createDataFrame(my_data, schema)
df.show()
df.printSchema()


In [None]:
# ✅ Your data
my_data = [
    (1, 2),
    (1, 2),
    (1, 2),
    (1, 2),
    (1, 2),
    (1, 2)
]

# ✅ Create DataFrame directly & rename columns
create_df = spark.createDataFrame(my_data).toDF("ID", "Num")

# ✅ Show DataFrame content
create_df.show()

# ✅ Print schema (optional)
create_df.printSchema()


# Hands On Transformations

## Lecture 1
- What is schema? - column name , data type
- What is dataframe? - tabular form of data , it is made of two things row and columns
- How to select columns? 
- How many ways to select columns?
- What is expression?

In [None]:
df_Flight_Data.printSchema()

In [None]:
# for now we will select flight data, as above cellse are getting error 
# how to select columns
df_Flight_Data.columns
df_Flight_Data.select("count").show()


In [None]:
# multiple ways to selecting columns
from pyspark.sql.functions import *
from pyspark.sql.types import *
 
df_Flight_Data.select("count").show(5) # String method
df_Flight_Data.select(col('count')).show(5) # col method
df_Flight_Data.select(col('count')+5).show(5)
df_Flight_Data.select(col('count') ,col('DEST_COUNTRY_NAME')).show(5)


In [None]:
# expression

# It lets you write SQL-like expressions inside PySpark code.
#df_Flight_Data.select(expr('DEST_COUNTRY_NAME'+'HIIII')).show(5) # getting analysis expression, so filing at catalyst level 


from pyspark.sql.functions import expr

df_Flight_Data.select(expr("DEST_COUNTRY_NAME || ' HIIII'")).show(5)


## Lecture 2

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
# Alies
df_Flight_Data.select(col('count').alias('Total Count')).show()

# Filter columns
df_Flight_Data.filter(col('DEST_COUNTRY_NAME') == 'United States').show()

#literal
df_Flight_Data.select('*', lit('Economy').alias('class')).show()

#Adding columns:
new_df = df_Flight_Data.withColumn('Price',lit('90000')).show()

# Rename Columns:
df_Flight_Data.withColumnRenamed('Count','Trip').show()

# Type casting
df_Flight_Data.withColumn('count',col('count').cast("string")).printSchema()

# Removing columns
df_Flight_Data.drop(col('count')).show()

# Lecture 3 
Union vs Union All 

In [None]:
# ✅ Create two simple DataFrames
data1 = [(1, "India"), (2, "USA"), (3, "UK")]
data2 = [(3, "UK"), (4, "Canada"), (5, "Germany")]
columns = ["ID", "Country"]

df1 = spark.createDataFrame(data1, columns)
df2 = spark.createDataFrame(data2, columns)

# ✅ UNION → Removes duplicates
df_union = df1.union(df2)
print("=== UNION (Removes duplicates) ===")
df_union.show()

# ✅ UNION ALL → Keeps duplicates (deprecated in Spark 3.x, use union instead)
df_union_all = df1.unionAll(df2)  # For Spark 3+, df1.union(df2)
print("=== UNION ALL (Keeps duplicates) ===")
df_union_all.show()


In [8]:
# Potential Interview Questions :

# 1️⃣ Difference between union and unionAll:
# ------------------------------------------------
# ➡ union      → Combines two DataFrames and REMOVES duplicate rows.
# ➡ unionAll   → Combines two DataFrames and KEEPS duplicate rows.
# ⚡ Note: In Spark 3.x, unionAll() is deprecated, and union() behaves like unionAll() by default.
#
# Example:
# df1.union(df2)           # Removes duplicates in older versions (<3.0)
# df1.unionAll(df2)        # Keeps duplicates (deprecated in Spark 3.x)
# In Spark 3.x, use:
# df1.union(df2)           # Always keeps duplicates unless you add .distinct()

# 2️⃣ What will happen if the number of columns is different while unioning?
# --------------------------------------------------------------------------
# ❌ Spark will throw an AnalysisException if the number of columns in df1 and df2 don't match.
#
# Example:
# df1 has 2 columns (ID, Name)
# df2 has 3 columns (ID, Name, Age)
# df1.union(df2)   # ❌ Will fail with: "Union can only be performed on tables with the same number of columns"

# 3️⃣ What if column names are different?
# ---------------------------------------
# ✅ Column names DON'T matter for union / unionAll.
# Spark unions based on POSITION, not NAME.
#
# Example:
# df1 columns → ["ID", "Name"]
# df2 columns → ["UserID", "FullName"]
# df1.union(df2)  ✅ Works fine, but column order must match.

# 4️⃣ What is unionByName?
# -------------------------
# ➡ unionByName matches columns by NAME instead of POSITION.
# This is useful when column names are the same but in a different order.
#
# Example:
# df1 → columns: ["ID", "Name"]
# df2 → columns: ["Name", "ID"]
#
# df1.unionByName(df2) ✅ Works perfectly.
# Spark aligns columns by name internally.
#
# ⚡ unionByName(ignoreNulls=True) can also handle missing columns gracefully by filling them with nulls.


# Lecture 4
## Repartation and Coalesce

## Potentail Interview Questions 

1) What is reapartation?
2) What is coalesce
3) Which one will you choose and why ?
4) Difference 

In [9]:
# Example: 

df_Flight_Data.show()


+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

In [10]:
# partition can be done but first , df need to be converted to RDD then repartatiojn is done 
# df -------------> RDD ----------------> Repartition 
# how ? 

df_Flight_Data.rdd.getNumPartitions() # this will give number of partition


1

In [11]:
# lets repartition 
df_Flight_Data_in_3_partition = df_Flight_Data.repartition(3)
df_Flight_Data_in_3_partition.rdd.getNumPartitions() # now data is in 3 partition 


3

In [12]:
# lets see how the data is devided in the partition
from pyspark.sql.functions import spark_partition_id, expr

df_Flight_Data_in_3_partition.withColumn('partitionID',spark_partition_id()).groupby('partitionID').count().show()
# so when done repartition the data is evenly distributed


+-----------+-----+
|partitionID|count|
+-----------+-----+
|          0|   85|
|          1|   85|
|          2|   85|
+-----------+-----+



In [13]:
# Coalesec
# first lets do repartation into 8 then merge them in 3 
df_Flight_Data.rdd.getNumPartitions()

1

In [14]:
df_Flight_Data_in_8_partitions = df_Flight_Data.repartition(8)

In [15]:
df_Flight_Data_in_8_partitions.withColumn('partitio_id',spark_partition_id()).groupby('partitio_id').count().show()

+-----------+-----+
|partitio_id|count|
+-----------+-----+
|          0|   32|
|          1|   31|
|          2|   32|
|          3|   32|
|          4|   32|
|          5|   32|
|          6|   32|
|          7|   32|
+-----------+-----+



In [16]:
df_Flight_Data_in_8_partitions_in_3_coalesce=df_Flight_Data_in_8_partitions.coalesce(3)

In [17]:
df_Flight_Data_in_8_partitions_in_3_coalesce.withColumn('Partition_id',spark_partition_id()).groupby('Partition_id').count().show()
# so this is not even

+------------+-----+
|Partition_id|count|
+------------+-----+
|           0|   64|
|           1|   95|
|           2|   96|
+------------+-----+



# Lecture 5
## If-else/when-otherwise/case when 

## potential interview questions
1) case when in saprk
2) otherwise in saprk
3) how to deal with null values
4) case when , if else with multiple and or conditions 

In [18]:
# lets read the data
people_df = spark.read.format('csv')\
            .option('header','true')\
            .option('inferschema','true')\
            .option('mode','PERMISSIVE')\
            .load('people_data.csv')
people_df.show()



+---------------+----+-----------------+--------------+--------+-------------------+
|           name| age|           gender|       country|  salary|         occupation|
+---------------+----+-----------------+--------------+--------+-------------------+
|    Casey Patel|NULL|           Female|          NULL| 74144.0|   Business Analyst|
|      Casey Lee|55.0|             Male|        Mexico|120302.0|  Financial Analyst|
|   Alex Johnson|34.0|           Female|         Japan| 99453.0|       Data Analyst|
|   Reese Garcia|62.0|Prefer not to say|United Kingdom| 89439.0|       Data Analyst|
| Cameron Harris|21.0|           Female|     Australia|109597.0|Mechanical Engineer|
|   Riley Garcia|42.0|             NULL|         India| 79898.0|         Accountant|
|   Jessie Clark|43.0|       Non-binary| United States|175647.0|  Software Engineer|
| Reese Williams|45.0|             Male|         Japan|    NULL|     Data Scientist|
|Shanaya Jackson|44.0|             NULL|         India| 53003.0| 

In [19]:
# if age is > 18 adult or else no
from pyspark.sql.functions import when, col  # (optionally lit)
# 1,2) 
people_df.withColumn('Adult',when(col('age')>25,'YES')
                            .when(col('age')<25,'NO')
                            .otherwise('Bad Value')).show()
                            

+---------------+----+-----------------+--------------+--------+-------------------+---------+
|           name| age|           gender|       country|  salary|         occupation|    Adult|
+---------------+----+-----------------+--------------+--------+-------------------+---------+
|    Casey Patel|NULL|           Female|          NULL| 74144.0|   Business Analyst|Bad Value|
|      Casey Lee|55.0|             Male|        Mexico|120302.0|  Financial Analyst|      YES|
|   Alex Johnson|34.0|           Female|         Japan| 99453.0|       Data Analyst|      YES|
|   Reese Garcia|62.0|Prefer not to say|United Kingdom| 89439.0|       Data Analyst|      YES|
| Cameron Harris|21.0|           Female|     Australia|109597.0|Mechanical Engineer|       NO|
|   Riley Garcia|42.0|             NULL|         India| 79898.0|         Accountant|      YES|
|   Jessie Clark|43.0|       Non-binary| United States|175647.0|  Software Engineer|      YES|
| Reese Williams|45.0|             Male|         J

In [20]:
#3) Null valuse 
people_df.withColumn('age', when(col('age').isNull(), 19).otherwise(col('age'))).show()

+---------------+----+-----------------+--------------+--------+-------------------+
|           name| age|           gender|       country|  salary|         occupation|
+---------------+----+-----------------+--------------+--------+-------------------+
|    Casey Patel|19.0|           Female|          NULL| 74144.0|   Business Analyst|
|      Casey Lee|55.0|             Male|        Mexico|120302.0|  Financial Analyst|
|   Alex Johnson|34.0|           Female|         Japan| 99453.0|       Data Analyst|
|   Reese Garcia|62.0|Prefer not to say|United Kingdom| 89439.0|       Data Analyst|
| Cameron Harris|21.0|           Female|     Australia|109597.0|Mechanical Engineer|
|   Riley Garcia|42.0|             NULL|         India| 79898.0|         Accountant|
|   Jessie Clark|43.0|       Non-binary| United States|175647.0|  Software Engineer|
| Reese Williams|45.0|             Male|         Japan|    NULL|     Data Scientist|
|Shanaya Jackson|44.0|             NULL|         India| 53003.0| 

In [21]:
#4) 
people_df.withColumn('Senority',when((col('age')>18) & (col('age')<30), 'Adult')
                               .when((col('age')>30)&(col('age')<60),'Mid')
                               .otherwise(col('age'),'Not Applicable')).show

TypeError: Column.otherwise() takes 2 positional arguments but 3 were given

# Lecture 6
## Unique and Sorted Records 

## Potential Interview Questions 

1) How to find unique rows ?
2) How to drop duplicate ?
3) How to sort data ?


In [None]:
#1) 
people_df.distinct().show()

In [None]:
people_df.distinct().count()

In [None]:
#2)
people_df.drop_duplicates().show()

In [None]:
#3) 
people_df.sort(col('salary').desc()).show()

# Lecture 7 
## Agg funcitons

## Potential Int questions 
1) count  -- count is both action and transformation (interview tip )
2) min
3) max
4) avg


In [None]:
people_df.count()

In [22]:
from pyspark.sql.functions import when, col  # (optionally lit)


people_df.agg({"salary": "max"}).show()

+-----------+
|max(salary)|
+-----------+
|   175647.0|
+-----------+



In [23]:
from pyspark.sql.functions import max, sum, min, avg
people_df.select(max('salary').alias('Maximum Sal'), sum('salary').alias('Sum Of Salary'),min('salary').alias('Minimum salary'), avg('salary').alias('Average Salary')).show()

+-----------+-------------+--------------+-----------------+
|Maximum Sal|Sum Of Salary|Minimum salary|   Average Salary|
+-----------+-------------+--------------+-----------------+
|   175647.0|    8000392.0|       46624.0|95242.76190476191|
+-----------+-------------+--------------+-----------------+



# Lecture 8
## Group By
## Potential Interview questions

1) How group by works ? - groups the data based on the mentioned column
2) how to implement it in spark ? 

In [24]:
people_df.show()

+---------------+----+-----------------+--------------+--------+-------------------+
|           name| age|           gender|       country|  salary|         occupation|
+---------------+----+-----------------+--------------+--------+-------------------+
|    Casey Patel|NULL|           Female|          NULL| 74144.0|   Business Analyst|
|      Casey Lee|55.0|             Male|        Mexico|120302.0|  Financial Analyst|
|   Alex Johnson|34.0|           Female|         Japan| 99453.0|       Data Analyst|
|   Reese Garcia|62.0|Prefer not to say|United Kingdom| 89439.0|       Data Analyst|
| Cameron Harris|21.0|           Female|     Australia|109597.0|Mechanical Engineer|
|   Riley Garcia|42.0|             NULL|         India| 79898.0|         Accountant|
|   Jessie Clark|43.0|       Non-binary| United States|175647.0|  Software Engineer|
| Reese Williams|45.0|             Male|         Japan|    NULL|     Data Scientist|
|Shanaya Jackson|44.0|             NULL|         India| 53003.0| 

In [25]:
# group by occupation and take the sum of salary
from pyspark.sql.functions import col

people_df.groupby('occupation').agg(sum('salary')).show()

+--------------------+-----------+
|          occupation|sum(salary)|
+--------------------+-----------+
|   Financial Analyst|   324887.0|
|               Nurse|   491628.0|
|             Teacher|   571511.0|
|                NULL|   471463.0|
|Marketing Specialist|   177853.0|
|    Business Analyst|   584234.0|
|      Data Scientist|   872635.0|
|  Operations Analyst|   269706.0|
|        Data Analyst|   526066.0|
|       HR Specialist|   454296.0|
|       Sales Manager|   348607.0|
| Mechanical Engineer|   367932.0|
|          Accountant|   555216.0|
|  Research Assistant|   501655.0|
|   Software Engineer|   838459.0|
|     Product Manager|   644244.0|
+--------------------+-----------+



In [26]:
# Business wants to know country wise payments for every occupation
people_df.groupby('country','occupation').agg(sum('salary')).show()

+--------------+--------------------+-----------+
|       country|          occupation|sum(salary)|
+--------------+--------------------+-----------+
|        Mexico|          Accountant|   119477.0|
|        Mexico|       Sales Manager|   143227.0|
|     Australia|       HR Specialist|   101495.0|
|       Germany|  Research Assistant|    61203.0|
|       Germany|      Data Scientist|   134020.0|
|        Brazil|  Research Assistant|    71191.0|
|     Australia| Mechanical Engineer|   109597.0|
|        Mexico|               Nurse|    83077.0|
|         Japan|       HR Specialist|    79223.0|
|     Australia|               Nurse|    80009.0|
|       Germany|                NULL|   147362.0|
|          NULL|               Nurse|    79867.0|
|United Kingdom|       Sales Manager|   115069.0|
| United States|   Software Engineer|   175647.0|
|        France|                NULL|    86290.0|
|       Germany|Marketing Specialist|       NULL|
|United Kingdom|        Data Analyst|   224755.0|


# Lecture 9
## Join

## Potential Interview questions

1) How join works? Matches the ID or specific key to other table and based on the match it will return the data.
2) Why do we need joins? ---> If some data is needed and it is not in single table , then joins can be used to get the data from two or more tables. 
3) What do we do after joining tow tables ? -- see if the table explodes if it is then make sure to use distinct 
4) What if two tables have same column name ? -- Ambigious error / mention df and then column then it willnot error out
5) Join on tow or more columns ?  df1.join(df2, ((expr1) & (expr2)), jointype)
6) Types of join?  7 - inner, outer, left, right, left anti, left semi, cross

In [27]:
# # Not sure why it is throwing error
# # Creating demo data for join operation:



# # -------------------------------
# # 1️⃣ Customer DataFrame
# # -------------------------------
# customer_data = [
#     (1,'manish','patna',"30-05-2022"),
#     (2,'vikash','kolkata',"12-03-2023"),
#     (3,'nikita','delhi',"25-06-2023"),
#     (4,'rahul','ranchi',"24-03-2023"),
#     (5,'mahesh','jaipur',"22-03-2023"),
#     (6,'prantosh','kolkata',"18-10-2022"),
#     (7,'raman','patna',"30-12-2022"),
#     (8,'prakash','ranchi',"24-02-2023"),
#     (9,'ragini','kolkata',"03-03-2023"),
#     (10,'raushan','jaipur',"05-02-2023")
# ]

# customer_schema = ['customer_id','customer_name','address','date_of_joining']

# customer_df = spark.createDataFrame(customer_data, schema=customer_schema)

# # -------------------------------
# # 2️⃣ Sales DataFrame
# # -------------------------------
# sales_data = [
#     (1,22,10,"01-06-2022"),
#     (1,27,5,"03-02-2023"),
#     (2,5,3,"01-06-2023"),
#     (5,22,1,"22-03-2023"),
#     (7,22,4,"03-02-2023"),
#     (9,5,6,"03-03-2023"),
#     (2,1,12,"15-06-2023"),
#     (1,56,2,"25-06-2023"),
#     (5,12,5,"15-04-2023"),
#     (11,12,76,"12-03-2023")
# ]

# sales_schema = ['customer_id','product_id','quantity','date_of_purchase']

# sales_df = spark.createDataFrame(sales_data, schema=sales_schema)

# # -------------------------------
# # 3️⃣ Product DataFrame
# # -------------------------------
# product_data = [
#     (1, 'fanta',20),
#     (2, 'dew',22),
#     (5, 'sprite',40),
#     (7, 'redbull',100),
#     (12,'mazza',45),
#     (22,'coke',27),
#     (25,'limca',21),
#     (27,'pepsi',14),
#     (56,'sting',10)
# ]

# product_schema = ['id','name','price']

# product_df = spark.createDataFrame(product_data, schema=product_schema)

# # -------------------------------
# customer_df.show()
# sales_df.show()
# product_df.show()


    

In [28]:
#Join syntax

#df1.join(df2, expression, jointype)

# Lecture 10 
## Join Strategy

## Potential interview questions 

1) what are the join strategy in Spark
       A. Shuffle sort-merge
       B. Shuffle hash join
       C. Broadcast Jpoin
       D. Cartesion join
       E. Broadcast nested loop join  
3) Why joins are expensive/ wide dependency transformation ?
4) Difference between shuffle hash join and sort merge join ?
5) when do we need brodcast join ? 

# Lecture 11
## Broadcast Hash join 

## Potential Interview questions 
1) why do we need broadcast hash join - Basically to avoide shuffeling
2) How does broadcast join works - Driver will broadcast the smaller table to all executors
3) Difference between broadcast hash join and shuffle hash join - hash tere will be shuffeling , broadcast there will not be shuffeling
4) How can we change the broadcast size of the table - set it while creating session
5) When the broadcast join will fail  - when the size of the table is too big 


In [29]:
# Syntax for broad cast join 
#Broadcast_df =  df.join(broadcast(df2), (expression))

# lecture 12 
## Window Function 

In [30]:
# window = window.partitionBy('dept').orderBy('salary')

# df = df.withcolumn('RowNumner',rownumber().over(window))\
#        .withcolumn('DenseRank',denserank().over(window))\
#        .withcolumn('Rank',rank().over(window))\
#        .show()




# Lecture 13 Lead and Lag 
## Potential interview questions

1) What is lead ? 
2) What is lag ? 


In [31]:
# lets read a df first

sales_df = spark.read.format('csv')\
            .option('header','true')\
            .option('inferschema','true')\
            .option('mode','permissive')\
            .load('sales_window_data.csv')

sales_df.show(30)

+----------+------------+----------+-----+
|product_id|product_name|sales_date|sales|
+----------+------------+----------+-----+
|      P001| Apex Widget|2025-06-01|  106|
|      P001| Apex Widget|2025-06-02|  100|
|      P001| Apex Widget|2025-06-03|  111|
|      P001| Apex Widget|2025-06-04|  123|
|      P001| Apex Widget|2025-06-05|  103|
|      P001| Apex Widget|2025-06-06|  104|
|      P001| Apex Widget|2025-06-07|  127|
|      P001| Apex Widget|2025-06-08|  119|
|      P001| Apex Widget|2025-06-09|  106|
|      P001| Apex Widget|2025-06-10|  119|
|      P001| Apex Widget|2025-06-11|  109|
|      P001| Apex Widget|2025-06-12|  110|
|      P001| Apex Widget|2025-06-13|  120|
|      P001| Apex Widget|2025-06-14|   96|
|      P001| Apex Widget|2025-06-15|   99|
|      P001| Apex Widget|2025-06-16|  115|
|      P001| Apex Widget|2025-06-17|  111|
|      P001| Apex Widget|2025-06-18|  128|
|      P001| Apex Widget|2025-06-19|  115|
|      P001| Apex Widget|2025-06-20|  110|
|      P002

In [32]:
# first lets get window for product ID and order it by date 
# then get pervious month sales 
from pyspark.sql import Window
from pyspark.sql.functions import lag, lead, col


window = Window.partitionBy('product_id').orderBy('sales_date')

# now lets calculate previous month sales 
previous_month_sales = sales_df.withColumn('Previous_Month_Sales',lag(col('sales'),1).over(window))
previous_month_sales.show()


+----------+------------+----------+-----+--------------------+
|product_id|product_name|sales_date|sales|Previous_Month_Sales|
+----------+------------+----------+-----+--------------------+
|      P001| Apex Widget|2025-06-01|  106|                NULL|
|      P001| Apex Widget|2025-06-02|  100|                 106|
|      P001| Apex Widget|2025-06-03|  111|                 100|
|      P001| Apex Widget|2025-06-04|  123|                 111|
|      P001| Apex Widget|2025-06-05|  103|                 123|
|      P001| Apex Widget|2025-06-06|  104|                 103|
|      P001| Apex Widget|2025-06-07|  127|                 104|
|      P001| Apex Widget|2025-06-08|  119|                 127|
|      P001| Apex Widget|2025-06-09|  106|                 119|
|      P001| Apex Widget|2025-06-10|  119|                 106|
|      P001| Apex Widget|2025-06-11|  109|                 119|
|      P001| Apex Widget|2025-06-12|  110|                 109|
|      P001| Apex Widget|2025-06-13|  12

In [33]:
# lets calculate next month sales

next_month_sales = sales_df.withColumn('Next_Month_Sales',lead(col('sales'),1).over(window))
next_month_sales.show()

+----------+------------+----------+-----+----------------+
|product_id|product_name|sales_date|sales|Next_Month_Sales|
+----------+------------+----------+-----+----------------+
|      P001| Apex Widget|2025-06-01|  106|             100|
|      P001| Apex Widget|2025-06-02|  100|             111|
|      P001| Apex Widget|2025-06-03|  111|             123|
|      P001| Apex Widget|2025-06-04|  123|             103|
|      P001| Apex Widget|2025-06-05|  103|             104|
|      P001| Apex Widget|2025-06-06|  104|             127|
|      P001| Apex Widget|2025-06-07|  127|             119|
|      P001| Apex Widget|2025-06-08|  119|             106|
|      P001| Apex Widget|2025-06-09|  106|             119|
|      P001| Apex Widget|2025-06-10|  119|             109|
|      P001| Apex Widget|2025-06-11|  109|             110|
|      P001| Apex Widget|2025-06-12|  110|             120|
|      P001| Apex Widget|2025-06-13|  120|              96|
|      P001| Apex Widget|2025-06-14|   9

In [34]:
# now how to calculate % increase of decrease 
# ((sales - previousMonthsales)/sales)*100
from pyspark.sql.functions import col, round


percentage_sales = previous_month_sales.withColumn('% Change', round(((col('sales') - col('Previous_Month_Sales')) / col('Previous_Month_Sales')) * 100, 2)).show()


+----------+------------+----------+-----+--------------------+--------+
|product_id|product_name|sales_date|sales|Previous_Month_Sales|% Change|
+----------+------------+----------+-----+--------------------+--------+
|      P001| Apex Widget|2025-06-01|  106|                NULL|    NULL|
|      P001| Apex Widget|2025-06-02|  100|                 106|   -5.66|
|      P001| Apex Widget|2025-06-03|  111|                 100|    11.0|
|      P001| Apex Widget|2025-06-04|  123|                 111|   10.81|
|      P001| Apex Widget|2025-06-05|  103|                 123|  -16.26|
|      P001| Apex Widget|2025-06-06|  104|                 103|    0.97|
|      P001| Apex Widget|2025-06-07|  127|                 104|   22.12|
|      P001| Apex Widget|2025-06-08|  119|                 127|    -6.3|
|      P001| Apex Widget|2025-06-09|  106|                 119|  -10.92|
|      P001| Apex Widget|2025-06-10|  119|                 106|   12.26|
|      P001| Apex Widget|2025-06-11|  109|         

# Lecture 14 : Flatten Json 

In [35]:
# read json file

restaurent_Data = spark.read.format('json')\
                .option('multiline','true')\
                .option('inferschema','true')\
                .load('resturant_json_data.json')

restaurent_Data.show()

+----+-------+--------------------+-------------+-------------+-------------+------+
|code|message|         restaurants|results_found|results_shown|results_start|status|
+----+-------+--------------------+-------------+-------------+-------------+------+
|NULL|   NULL|                  []|            0|            0|            1|  NULL|
|NULL|   NULL|[{{{17066603}, b9...|         6835|           20|            1|  NULL|
|NULL|   NULL|                  []|            0|            0|            1|  NULL|
|NULL|   NULL|                  []|            0|            0|            1|  NULL|
|NULL|   NULL|[{{{17093124}, b9...|         8680|           20|            1|  NULL|
|NULL|   NULL|                  []|            0|            0|            1|  NULL|
|NULL|   NULL|                  []|            0|            0|            1|  NULL|
|NULL|   NULL|[{{{17580142}, b9...|          943|           20|            1|  NULL|
|NULL|   NULL|                  []|            0|            0|  

In [36]:
restaurent_Data.printSchema()

root
 |-- code: long (nullable = true)
 |-- message: string (nullable = true)
 |-- restaurants: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- restaurant: struct (nullable = true)
 |    |    |    |-- R: struct (nullable = true)
 |    |    |    |    |-- res_id: long (nullable = true)
 |    |    |    |-- apikey: string (nullable = true)
 |    |    |    |-- average_cost_for_two: long (nullable = true)
 |    |    |    |-- cuisines: string (nullable = true)
 |    |    |    |-- currency: string (nullable = true)
 |    |    |    |-- deeplink: string (nullable = true)
 |    |    |    |-- establishment_types: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- events_url: string (nullable = true)
 |    |    |    |-- featured_image: string (nullable = true)
 |    |    |    |-- has_online_delivery: long (nullable = true)
 |    |    |    |-- has_table_booking: long (nullable = true)
 |    |    |    |-- i

In [38]:
# explode elements that you want 
from pyspark.sql.functions import explode

restaurent_Data.select('*',explode('restaurants').alias('restaurants_new')).printSchema()

root
 |-- code: long (nullable = true)
 |-- message: string (nullable = true)
 |-- restaurants: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- restaurant: struct (nullable = true)
 |    |    |    |-- R: struct (nullable = true)
 |    |    |    |    |-- res_id: long (nullable = true)
 |    |    |    |-- apikey: string (nullable = true)
 |    |    |    |-- average_cost_for_two: long (nullable = true)
 |    |    |    |-- cuisines: string (nullable = true)
 |    |    |    |-- currency: string (nullable = true)
 |    |    |    |-- deeplink: string (nullable = true)
 |    |    |    |-- establishment_types: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- events_url: string (nullable = true)
 |    |    |    |-- featured_image: string (nullable = true)
 |    |    |    |-- has_online_delivery: long (nullable = true)
 |    |    |    |-- has_table_booking: long (nullable = true)
 |    |    |    |-- i

In [None]:
restaurent_Data.select('*',explode('restaurants').alias('restaurants_new')).printSchema()

In [54]:
restaurent_Data.select('*', explode('restaurants').alias('restaurants_new')) \
    .drop('restaurants') \
    .select('*','restaurants_new.restaurant.R.res_id',
            explode('restaurants_new.restaurant.establishment_types').alias('new_establishment_types')
           , 'restaurants_new.restaurant.name').drop('restaurants_new').printSchema()

root
 |-- code: long (nullable = true)
 |-- message: string (nullable = true)
 |-- results_found: long (nullable = true)
 |-- results_shown: long (nullable = true)
 |-- results_start: string (nullable = true)
 |-- status: string (nullable = true)
 |-- res_id: long (nullable = true)
 |-- new_establishment_types: string (nullable = true)
 |-- name: string (nullable = true)



In [57]:
from pyspark.sql.functions import explode_outer, col

restaurent_Data.select('*', explode('restaurants').alias('restaurants_new')) \
    .drop('restaurants') \
    .select('*','restaurants_new.restaurant.R.res_id',
            explode_outer('restaurants_new.restaurant.establishment_types').alias('new_establishment_types')
           , 'restaurants_new.restaurant.name').drop('restaurants_new').show(truncate=False)

+----+-------+-------------+-------------+-------------+------+--------+-----------------------+------------------------------------+
|code|message|results_found|results_shown|results_start|status|res_id  |new_establishment_types|name                                |
+----+-------+-------------+-------------+-------------+------+--------+-----------------------+------------------------------------+
|NULL|NULL   |6835         |20           |1            |NULL  |17066603|NULL                   |The Coop                            |
|NULL|NULL   |6835         |20           |1            |NULL  |17059541|NULL                   |Maggiano's Little Italy             |
|NULL|NULL   |6835         |20           |1            |NULL  |17064405|NULL                   |Tako Cheena by Pom Pom              |
|NULL|NULL   |6835         |20           |1            |NULL  |17057797|NULL                   |Bosphorous Turkish Cuisine          |
|NULL|NULL   |6835         |20           |1            |NULL  

# Lecture 15 : Spark Submit 

## Potential Interview Questions
1) What is spark submit ?
2) How do you run your job ob spark cluster ?
3) Where is your spark cluster ?
4) What is deploy mode in spark cluster ?
5) What is master in spark submit ?
6) How do you provide memory config and why do you use that much memory ?
7) How do you update configuration like broadcast, threshold, timeouot, dynamic etc .. 

# Lecture 16 : Deployment Mode in Spark 
## Potential Interview Questions? 

1) Whar are all deployment modes in spark ?
2) What is edge node ?
3) Why do we need client and cluster modes ?
4) What will happen if i close my edge node ?

# Spark Deploy Modes — Client vs Cluster

| Aspect | Client mode | Cluster mode |
|---|---|---|
| Where the driver runs | On the submit node (your laptop/edge node) | On the cluster (YARN AM / K8s driver pod / Standalone worker) |
| If submit node disconnects | **Job dies** (driver is gone) | **Job keeps running** on the cluster |
| Driver logs | Your terminal (stdout/stderr) & local logs | Cluster logs (YARN container logs / `kubectl logs` / Standalone worker logs) |
| Network latency | Higher (driver far from executors) | Lower (driver near executors) |
| Driver OOM risk | Possible; sized on your machine | Possible; sized on cluster; often more stable |
| Best for | Dev, ad-hoc, interactive, easy local debugging | Production, scheduled jobs, resiliency, central logs |
| Example run | `spark-submit --master yarn --deploy-mode client app.jar` | `spark-submit --master yarn --deploy-mode cluster app.jar` |
| Fetch logs | (local terminal/files) | YARN: `yarn logs -applicationId <app_id>` · K8s: `kubectl logs <driver-pod>` |

### Handy configs (both modes)
- `--driver-memory 2g  --driver-cores 2`
- `--conf spark.dynamicAllocation.enabled=true`
- `--conf spark.dynamicAllocation.minExecutors=1`
- `--conf spark.dynamicAllocation.maxExecutors=10`
- `--conf spark.sql.adaptive.enabled=true`
- `--conf spark.sql.autoBroadcastJoinThreshold=100m`
- `--conf spark.sql.broadcastTimeout=3600`


# Lecture 18

## Potential Interview Questions
1) What is AQE ?
2) Why do we need AQE ?
   

In [None]:
1) Allow to change the query while runtime 
2) Cnahge query dynamically while run time 

Above spark 3.0 and above 

Features of AQE:
Dynamically Coalescing shuffeling partition
Dynamically switching join strategies
Dynamically Optimizing Skew join 


# Union / Union All / Union By Name

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("dept_id", IntegerType(), True)
])

# ✅ df1
data = [
    (10, 'Anil', 50000, 18),
    (11, 'Vikas', 75000, 16),
    (12, 'Nisha', 40000, 18),
    (13, 'Nidhi', 60000, 17),
    (14, 'Priya', 80000, 18),
    (15, 'Mohit', 45000, 18),
    (16, 'Rajesh', 90000, 10),
    (17, 'Raman', 55000, 16),
    (18, 'Sam', 65000, 17)
]
df1 = spark.createDataFrame(data, schema)

# ✅ df2 (same schema, can be used for union or unionAll)
data1 = [
    (19, 'Sohan', 50000, 18),
    (20, 'Sima', 75000, 17)
]
df2 = spark.createDataFrame(data1, schema)

# ✅ df3_wrong_col_order (same values but different column order, for unionByName)
wrong_column_data = [
    (19, 50000, 18, 'Sohan'),
    (20, 75000, 17, 'Sima')
]
# Note the changed column order in schema
wrong_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("salary", IntegerType(), True),
    StructField("dept_id", IntegerType(), True),
    StructField("name", StringType(), True)
])
df3_wrong_col_order = spark.createDataFrame(wrong_column_data, wrong_schema)

# ✅ df4_extra_column (extra column → to demonstrate schema mismatch error)
extra_column_data = [
    (19, 50000, 18, 'Sohan', 10),
    (20, 75000, 17, 'Sima', 20)
]
extra_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("salary", IntegerType(), True),
    StructField("dept_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("bonus", IntegerType(), True)
])
df4_extra_column = spark.createDataFrame(extra_column_data, extra_schema)

In [None]:


# # Union (same schema)
# df1.union(df2).show()

# # Union By Name (works even if columns are ordered differently)
# df1.unionByName(df3_wrong_col_order).show()

# # This will cause error: schema mismatch
# df1.union(df4_extra_column).show()  # ❌ Error: differing column count
# # so in this case we can select column manually and then we can perform the union operation

# Repartition and Coalesec 

In [None]:
df_Flight_Data.show()

In [None]:
df_Flight_Data.count()

In [None]:
# to get the partition we need to convert it to RDD first 
df_Flight_Data.rdd.getNumPartitions()

In [None]:
# now we will makle 4 partitions 
df_Flight_Data_partitions= df_Flight_Data.repartition(4)

In [None]:
# check how the allocation happned 
from pyspark.sql.functions import spark_partition_id

df_Flight_Data_partitions.withColumn("PartitionID",spark_partition_id()).groupBy("PartitionID").count().show()
# Evenly Disrtributed

In [None]:
# we can also create partition based on columns and also give number of partitions we need, if there is no value then null will be assigned 

partitioned_by_column= df_Flight_Data.repartition(300,'ORIGIN_COUNTRY_NAME')



In [None]:
partitioned_by_column.rdd.getNumPartitions()

In [None]:
partitioned_by_column.withColumn("PartitionID",spark_partition_id()).groupBy("PartitionID").count().show()
# we were having less values 255 and number of parttitions werer 300 , so null in remeaning values


# Coalsence

In [None]:
coalesce_flight_df = df_Flight_Data.repartition(8)

In [None]:
coalesce_flight_df.withColumn("PartitionID",spark_partition_id()).groupBy("PartitionID").count().show()


In [None]:
# reduce to 3 

three_col_partition = coalesce_flight_df.coalesce(3)


In [None]:
three_col_partition.withColumn("PartitionID",spark_partition_id()).groupBy("PartitionID").count().show()
# not evenly distributed, 

# Case When, When Otherwise/ Null values dealing 


In [None]:
df_Flight_Data.show()

In [None]:
from pyspark.sql.functions import when, col
df_Flight_Data.withColumn('Frequent Travel', when(col('count') > 100, 'Medium Yes')
                                             .when(col('count') > 200,'Too Frequent')
                                             .otherwise ('Need to Improve')).show()

In [None]:
# null values 


from pyspark.sql.functions import when, col, lit

df_Flight_Data.withColumn(
    'count', 
    when(col('count').isNull(), lit(1)).otherwise(col('count'))
).withColumn(
    'Null Details', 
    when(col('ORIGIN_COUNTRY_NAME') == 'Afghanistan', 'cancle Flight')
    .otherwise('KEEP FLIGHT')
).show()


In [None]:
df_Flight_Data.createOrReplaceTempView('FlightSQLWhenOtherWise')

In [None]:
spark.sql("""
SELECT  
  CASE 
    WHEN ORIGIN_COUNTRY_NAME = 'Afghanistan' THEN 'Cancel Flight'
    ELSE 'Keep Flight'
  END AS FlightDetails
FROM FlightSQLWhenOtherWise
""").show()


In [None]:
spark.sql("""
SELECT  
  CASE 
    WHEN ORIGIN_COUNTRY_NAME = 'Afghanistan' THEN 'Cancel Flight' 
    WHEN ORIGIN_COUNTRY_NAME = 'Sint Maarten' THEN 'Rerrange Flight'
    ELSE 'Keep Flight'
  END AS FlightDetails
FROM FlightSQLWhenOtherWise
""").show()


# Unbique Value, Drop Duplicates , Sort Data In Asc and Desc value. 

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

data = [
    (10 ,'Anil',50000, 18),
    (11 ,'Vikas',75000, 16),
    (12 ,'Nisha',40000, 18),
    (13 ,'Nidhi',60000, 17),
    (14 ,'Priya',80000, 18),
    (15 ,'Mohit',45000, 18),
    (16 ,'Rajesh',90000, 10),
    (17 ,'Raman',55000, 16),
    (18 ,'Sam',65000, 17),
    (15 ,'Mohit',45000, 18),
    (13 ,'Nidhi',60000, 17),      
    (14 ,'Priya',90000, 18),  
    (18 ,'Sam',65000, 17)
]

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("age", IntegerType(), True)
])

df = spark.createDataFrame(data, schema)
df.show()


In [None]:
# find the unique values
df.distinct().show()


In [None]:
df.select("id",'name').distinct().show()


In [None]:
# drop duplicates
df.drop_duplicates(subset=['id'])


In [None]:
# Sort 
df.sort(col('id'),asc).show()
df.sort(col('id'),desc).show()

# Aggregation 

In [None]:
# count, action and transformation 
df.select(count('id')).show() # for perticular column
df.count() # for all dataframe
df.select(count(*))


In [None]:
# min
df.select(sum('salary').alies ('Total Salary'), max('salary').alies('Max Salary'),min('Salary').alias('Min Salary')).show()

In [None]:
df.select(sum('salary').alies('Total Salary'),count('salary').alias('Count of Salary'), avg ('salary').alias('AverageSalary')).show()

# Group By|

In [None]:
df.groupBy(col('dept').agg(sum('salary').alias('Total Salary')))

df.groupBy


In [None]:
spark.sql("""

select Dept, sum(salary) as total_salary
from table
group by dept

""")

# Join 

In [None]:
# inner Join 

df1.join(df2, df1['id']= df2['id'], 'inner').show()

In [None]:
# left join 

df1.join(df2, df1['id']=df2['id'],'left').show()