# PYSPARK INTERVIEW QUESTIONS - ANSH LAMBA

In [0]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *

#### **Q1 While ingesting customer data from an external source, you notice duplicate entries. How would you remove duplicates and retain only the latest entry based on a timestamp column?**

In [0]:
data = [("101", "2023-12-01", 100), ("101", "2023-12-02", 150), 
        ("102", "2023-12-01", 200), ("102", "2023-12-02", 250)]
columns = ["product_id", "date", "sales"]

df = spark.createDataFrame(data, columns)
df.display()



product_id,date,sales
101,2023-12-01,100
101,2023-12-02,150
102,2023-12-01,200
102,2023-12-02,250


In [0]:
# date column is in string so casting to date column

df = df.withColumn('date', col('date').cast(DateType()))
df.display()

product_id,date,sales
101,2023-12-01,100
101,2023-12-02,150
102,2023-12-01,200
102,2023-12-02,250


In [0]:
# we want to keep the latest record and delete the previous record so using order by desc and dropDuplicates

df = df.orderBy(col('date').desc()).dropDuplicates(subset= ['product_id'])
df.display()


product_id,date,sales
101,2023-12-02,150
102,2023-12-02,250


### **2. While processing data from multiple files with inconsistent schemas, you need to merge them into a single DataFrame. How would you handle this inconsistency in PySpark?**

When reading Parquet or Avro files, Spark has a built-in feature to automatically merge schemas from different files. If file A has columns [ID, Name] and file B has [ID, Age], the resulting DataFrame will have [ID, Name, Age]. 

In [0]:
# We use merge Schema to handle this scenario. this will merge multiple schema and save the data as a single table

df = spark.read.format('parquet').option('mergeSchema', 'true').load('path/to/files')

# This works only for parquet file

In case the file is csv or json merge schema wont works

In [0]:
# DataFrame 1: Sales Data
data1 = [
    Row(product="Laptop", category="Electronics", revenue=1200),
    Row(product="Mouse", category="Electronics", revenue=25)
]
df1 = spark.createDataFrame(data1)

# DataFrame 2: Returns Data (Different columns)
data2 = [
    Row(product="Phone", category="Electronics", return_reason="Defective"),
    Row(product="Desk", category="Furniture", return_reason="Wrong size")
]
df2 = spark.createDataFrame(data2)

print("Schema of DF1 (Sales):")
df1.printSchema()

print("Schema of DF2 (Returns):")
df2.printSchema()

Schema of DF1 (Sales):
root
 |-- product: string (nullable = true)
 |-- category: string (nullable = true)
 |-- revenue: long (nullable = true)

Schema of DF2 (Returns):
root
 |-- product: string (nullable = true)
 |-- category: string (nullable = true)
 |-- return_reason: string (nullable = true)



In [0]:
# Merging the two DataFrames
combined_df = df1.unionByName(df2, allowMissingColumns= True)

combined_df.display()

product,category,revenue,return_reason
Laptop,Electronics,1200.0,
Mouse,Electronics,25.0,
Phone,Electronics,,Defective
Desk,Furniture,,Wrong size


#### **3. Key difference between Map Reduce and Spark**

1. Data Processing Style (Memory vs. Disk)
MapReduce: It is disk-based. After the "Map" phase, it writes data to the disk, and after the "Reduce" phase, it writes to the disk again. This constant I/O (Input/Output) creates a massive bottleneck.

Spark: It is memory-based. It processes data in-memory (RAM) whenever possible. It only writes to the disk when the memory is full or the job is finished.

2. Performance (Speed)
MapReduce: Slower due to frequent disk reads and writes. It is best suited for batch processing where time isn't the primary concern.

Spark: Can be up to 100x faster than MapReduce for in-memory processing and about 10x faster on disk.

3. Iterative Algorithms & Machine Learning
MapReduce: Struggles with iterative tasks. Every time an algorithm needs to "loop" over data, MapReduce must reload the data from the disk.

Spark: Designed for iteration. Since data stays in RAM, Spark can loop over the same dataset millions of times very efficiently, making it the industry standard for Machine Learning (MLlib).

4. Fault Tolerance
MapReduce: achieves fault tolerance by replication. If a node fails, it just looks at the replicated data on the disk.

Spark: uses RDDs (Resilient Distributed Datasets) and a DAG (Directed Acyclic Graph). If a piece of data is lost, Spark looks at the "Lineage" (the history of transformations) and re-computes only the missing part from the original source.

#### **4. You are working with a real-time data pipeline, and you notice missing values in your streaming data Column - Category. How would you handle null or missing values in such a scenario?**
#### df_stream = spark.readStream.schema("id INT, value STRING").csv("path/to/stream")

In [0]:
# In combined df we have some null records lets fill those null places with 'N/A' for return_reason and 10000 for revenue

combined_df = combined_df.fillna({'revenue': 10000, 'return_reason': 'N/A'})
combined_df.display()

product,category,revenue,return_reason
Laptop,Electronics,1200,
Mouse,Electronics,25,
Phone,Electronics,10000,Defective
Desk,Furniture,10000,Wrong size


#### **5. You need to calculate the total number of actions performed by users in a system. How would you calculate the top 5 most active users based on this information?**

In [0]:
data = [("user1", 5), ("user2", 8), ("user3", 2), ("user4", 10), ("user2", 3)]
columns = ["user_id", "actions"]

df = spark.createDataFrame(data, columns)
df.display()

user_id,actions
user1,5
user2,8
user3,2
user4,10
user2,3


In [0]:
df = df.groupBy("user_id").agg(sum(col("actions")).alias('Total_actions'))
df.sort("Total_actions", ascending=False).limit(5).display()

user_id,Total_actions
user2,11
user4,10
user1,5
user3,2


#### **6. While processing sales transaction data, you need to identify the most recent transaction for each customer. How would you approach this task?**

In [0]:
data = [("cust1", "2023-12-01", 100), ("cust2", "2023-12-02", 150),
        ("cust1", "2023-12-03", 200), ("cust2", "2023-12-04", 250)]
columns = ["customer_id", "transaction_date", "sales"]
df = spark.createDataFrame(data, columns)
df.display()

customer_id,transaction_date,sales
cust1,2023-12-01,100
cust2,2023-12-02,150
cust1,2023-12-03,200
cust2,2023-12-04,250


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, col

# desc is used in order by to rank based on recent records
df_ranked = df.withColumn('rank', dense_rank().over(Window.partitionBy('customer_id').orderBy(col('transaction_date').desc())))
df_ranked.display()

customer_id,transaction_date,sales,rank
cust1,2023-12-03,200,1
cust1,2023-12-01,100,2
cust2,2023-12-04,250,1
cust2,2023-12-02,150,2


In [0]:
df_ranked.filter(df_ranked.rank == 1).display()

# we will get only the recent records

customer_id,transaction_date,sales,rank
cust1,2023-12-03,200,1
cust2,2023-12-04,250,1


####**7. You need to identify customers who havenâ€™t made any purchases in the last 30 days. How would you filter such customers?**

In [0]:
data = [("cust1", "2025-12-01"), ("cust2", "2024-11-20"), ("cust3", "2024-11-25")]
columns = ["customer_id", "last_purchase_date"]

df = spark.createDataFrame(data, columns)

df.display()


customer_id,last_purchase_date
cust1,2025-12-01
cust2,2024-11-20
cust3,2024-11-25


In [0]:
df = df.withColumn("last_purchase_date", to_date(col("last_purchase_date"), "yyyy-MM-dd") )
df.display()

customer_id,last_purchase_date
cust1,2025-12-01
cust2,2024-11-20
cust3,2024-11-25


In [0]:
df.withColumn('date_diff', date_diff(current_date(), col('last_purchase_date'))).filter(col('date_diff')>30).display()

customer_id,last_purchase_date,date_diff
cust1,2025-12-01,74
cust2,2024-11-20,450
cust3,2024-11-25,445


#### **8. While analyzing customer reviews, you need to identify the most frequently used words in the feedback. How would you implement this?**

In [0]:
data = [("customer1", "The product is great"), ("customer2", "Great product, fast delivery"), ("customer3", "Not bad, could be better")]
columns = ["customer_id", "feedback"]

df = spark.createDataFrame(data, columns)

df.display()

customer_id,feedback
customer1,The product is great
customer2,"Great product, fast delivery"
customer3,"Not bad, could be better"


In [0]:
df = df.withColumn('feedback',split(col('feedback'), ' '))
df.display()

customer_id,feedback
customer1,"List(The, product, is, great)"
customer2,"List(Great, product,, fast, delivery)"
customer3,"List(Not, bad,, could, be, better)"


In [0]:
df = df.withColumn('feedback',explode('feedback'))
df.display()

customer_id,feedback
customer1,The
customer1,product
customer1,is
customer1,great
customer2,Great
customer2,"product,"
customer2,fast
customer2,delivery
customer3,Not
customer3,"bad,"


In [0]:
df.groupby("feedback").agg(count('feedback').alias('word_count')).display()

feedback,word_count
product,1
is,1
The,1
great,1
delivery,1
"product,",1
Great,1
fast,1
be,1
"bad,",1


In [0]:
# we have great and Great as two different words, we need to make them same
df = df.withColumn('feedback', lower(col('feedback')))
df.groupby("feedback").agg(count('feedback').alias('word_count')).orderBy('word_count', ascending=False).display()


feedback,word_count
great,2
product,1
is,1
the,1
delivery,1
"product,",1
fast,1
be,1
"bad,",1
could,1


#### **9. You need to calculate the cumulative sum of sales over time for each product. How would you approach this?**

In [0]:
data = [("product1", "2023-12-01", 100), ("product2", "2023-12-02", 200),
        ("product1", "2023-12-03", 150), ("product2", "2023-12-04", 250)]
columns = ["product_id", "date", "sales"]
df = spark.createDataFrame(data, columns)
df.display()

product_id,date,sales
product1,2023-12-01,100
product2,2023-12-02,200
product1,2023-12-03,150
product2,2023-12-04,250


In [0]:
from pyspark.sql.window import Window
df.withColumn('cumSum', sum(col('sales')).over(Window.partitionBy('product_id').orderBy(col('date').asc() ) 
                                               )).display()

product_id,date,sales,cumSum
product1,2023-12-01,100,100
product1,2023-12-03,150,250
product2,2023-12-02,200,200
product2,2023-12-04,250,450


#### **10. While preparing a data pipeline, you notice some duplicate rows in a dataset. How would you remove the duplicates without affecting the original order?**

In [0]:
data = [("John", 25), ("Jane", 30), ("John", 25), ("Alice", 22)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)
df.display()

name,age
John,25
Jane,30
John,25
Alice,22


In [0]:
df.withColumn('rowNum', row_number().over(Window.partitionBy('name').orderBy(col('age').asc()))).\
    filter(col('rowNum')==1).display()

name,age,rowNum
Alice,22,1
Jane,30,1
John,25,1


#### **11. You are working with user activity data and need to calculate the average session duration per user. How would you implement this?**

In [0]:
data = [("user1", "2023-12-01", 50), ("user1", "2023-12-02", 60), 
        ("user2", "2023-12-01", 45), ("user2", "2023-12-03", 75)]
columns = ["user_id", "session_date", "duration"]
df = spark.createDataFrame(data, columns)

df.display()

user_id,session_date,duration
user1,2023-12-01,50
user1,2023-12-02,60
user2,2023-12-01,45
user2,2023-12-03,75


In [0]:
df.groupBy('user_id').agg(avg( col('duration')).alias('avg_duration')  ).display()

user_id,avg_duration
user1,55.0
user2,60.0


#### **12. While analyzing sales data, you need to find the product with the highest sales for each month. How would you accomplish this?**

In [0]:
data = [("product1", "2023-12-01", 100), ("product2", "2023-12-01", 150), 
        ("product1", "2023-12-02", 200), ("product2", "2023-12-02", 250)]
columns = ["product_id", "date", "sales"]
df = spark.createDataFrame(data, columns)
df.display()

product_id,date,sales
product1,2023-12-01,100
product2,2023-12-01,150
product1,2023-12-02,200
product2,2023-12-02,250


In [0]:
from pyspark.sql.functions import col, cast
df = df.withColumn('date', to_date(col('date')))

df.display()


product_id,date,sales
product1,2023-12-01,100
product2,2023-12-01,150
product1,2023-12-02,200
product2,2023-12-02,250


In [0]:
df.withColumn('month', month(col('date'))).\
    groupBy('product_id','month').agg(sum('sales').alias('SalesSum')).display()

product_id,month,SalesSum
product1,12,300
product2,12,400


#### **13. You are working with a large Delta table that is frequently updated by multiple users. The data is stored in partitions, and sometimes updates can cause inconsistent reads due to concurrent transactions. How would you ensure ACID compliance and avoid data corruption in PySpark?**

In [0]:
df = spark.read.format('parquet').load('path')

from delta.tables import DeltaTable

deltatable = DeltaTable.forPath('path') # Reference of existing table with data for merge query

deltatable.alias('tgt')\
    .merge(df.alias('src'), 'tgt.id == src.id')\
        .whenNotMatchedInsertAll()\
            .whenMatchedUpdateAll()\
                .execute()

#### **14. You need to process a large dataset stored in PARQUET format and ensure that all columns have the right schema (Almost). How would you do this?**

In [0]:
spark.read.format('parquet').option('inferSchema','true').load('path')


# We can explicitely provide schema by using the below way

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType, DoubleType, TimestampType

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("salary", DoubleType(), True),
    StructField("created_at", TimestampType(), True)
])

df = spark.read.schema(schema).parquet("/data/input/")


#### **15. You are reading a CSV file and need to handle corrupt records gracefully by skipping them. How would you configure this in PySpark?**

In [0]:
spark.read.format('parquet').option('mode', 'DROPMALFORMED').load('path')

#### **22. You have a dataset containing the names of employees and their departments. You need to find the department with the most employees.**

In [0]:
data = [("Alice", "HR"), ("Bob", "Finance"), ("Charlie", "HR"), ("David", "Engineering"), ("Eve", "Finance")]
columns = ["employee_name", "department"]

df = spark.createDataFrame(data, columns)
df.display()

employee_name,department
Alice,HR
Bob,Finance
Charlie,HR
David,Engineering
Eve,Finance


In [0]:
df.groupBy('department').agg(count('employee_name').alias('NoOfEmployees')).sort('NoOfEmployees', ascending = False).display()

department,NoOfEmployees
HR,2
Finance,2
Engineering,1


#### **23. While processing sales data, you need to classify each transaction as either 'High' or 'Low' based on its amount. How would you achieve this using a when condition**

In [0]:
data = [("product1", 200), ("product2", 300), ("product3", 50)]
columns = ["product_id", "sales"]

df = spark.createDataFrame(data, columns)
df.display()

product_id,sales
product1,200
product2,300
product3,50


In [0]:
df.withColumn('rating', when( (col('sales')>=101) & (col('sales')<300), 'Medium')\
    .when(col('sales')>=300, 'High')\
        .otherwise('Low')).display()

product_id,sales,rating
product1,200,Medium
product2,300,High
product3,50,Low


#### **24. While analyzing a large dataset, you need to create a new column that holds a timestamp of when the record was processed. How would you implement this and what can be the best USE CASE?**

In [0]:
data = [("product1", 100), ("product2", 200), ("product3", 300)]
columns = ["product_id", "sales"]

df = spark.createDataFrame(data, columns)
df.display()

product_id,sales
product1,100
product2,200
product3,300


In [0]:
df.withColumn('ProcessedTime', current_timestamp()).display() 

product_id,sales,ProcessedTime
product1,100,2026-02-14T10:48:38.980Z
product2,200,2026-02-14T10:48:38.980Z
product3,300,2026-02-14T10:48:38.980Z


#### **25. You need to register this PySpark DataFrame as a temporary SQL object and run a query on it. How would you achieve this?**

In [0]:
data = [("product1", 100), ("product2", 200), ("product3", 300)]
columns = ["product_id", "sales"]

df = spark.createDataFrame(data, columns)
df.display()

product_id,sales
product1,100
product2,200
product3,300


In [0]:
df.createTempView('logisticsTBL')

In [0]:
%sql
Select * from logisticsTBL;

product_id,sales
product1,100
product2,200
product3,300


In [0]:
spark.sql("select * from logisticsTBL").display()

product_id,sales
product1,100
product2,200
product3,300


#### **26. You need to register this PySpark DataFrame as a temporary SQL object and run a query on it (FROM DIFFERENT NOTEBOOKS AS WELL)?**

In [0]:
df.createGlobalTempView('logistics')

spark.sql("select * from global_temp.logistics").display()
# global_teemp is important to acces the data from other notebooks but within same cluster

#### **27. You need to query data from a PySpark DataFrame using SQL, but the data includes a nested structure. How would you flatten the data for easier querying?**

In [0]:
data = [("product1", {"price": 100, "quantity": 2}), 
        ("product2", {"price": 200, "quantity": 3})]
columns = ["product_id", "product_info"]

df = spark.createDataFrame(data, columns)
df.display()

product_id,product_info
product1,"Map(price -> 100, quantity -> 2)"
product2,"Map(price -> 200, quantity -> 3)"


In [0]:
df_flat = df.select('product_id',
                    col('product_info.price').alias('price'),
                    col("product_info.quantity").alias('quantity'))
df_flat.display()

product_id,price,quantity
product1,100,2
product2,200,3


### 28. You are ingesting data from an external API in JSON format where the schema is inconsistent. How would you handle this situation to ensure a robust pipeline?

In [0]:
df = df.read.format('parquet').option('mergeSchema', 'True')

#### **29. While reading data from Parquet, you need to optimize performance by partitioning the data based on a column. How would you implement this?**

In [0]:
df.write.format('parquet').mode('append').partitionBy('column').save('path')