In [1]:
## PYSPARK INTERVIEW QUESTIONS - ANSH LAMBA

In [2]:
import findspark
findspark.init()

In [3]:
import os
import sys

# Replace this with your actual Python path
python_path = sys.executable  # This is safe — it auto-detects your current Python path

os.environ["PYSPARK_PYTHON"] = python_path
os.environ["PYSPARK_DRIVER_PYTHON"] = python_path

In [4]:
python_path

'D:\\jupyter_notebooks\\pythonVirtualEnv\\Scripts\\python.exe'

In [10]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *

from pyspark import SparkContext, SparkConf 
from pyspark.conf import SparkConf 
from pyspark.sql import SparkSession, HiveContext,DataFrame
from pyspark.sql.window import Window
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StringType, StructField, StringType,LongType,DecimalType,DateType,TimestampType, IntegerType,DoubleType


In [6]:
## SparkSession
spark = SparkSession.builder \
                        .appName('example-pyspark-read-and-write-from-hive') \
                        .master("local[*]") \
                        .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0,com.crealytics:spark-excel_2.12:3.3.3_0.20.3") \
                        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
                        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
                        .config("spark.driver.memory", "4g") \
                        .config("spark.executor.memory", "2g") \
                        .config("spark.sql.execution.arrow.pyspark.enabled", "false")\
                        .enableHiveSupport() \
                        .getOrCreate()



# .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
#                         .config("hive.metastore.warehouse.dir", "/user/hive/warehouse") \
#                         .config("hive.metastore.uris", "thrift://192.168.66.104:9083") \
#                         .config("spark.hadoop.fs.defaultFS", "hdfs://192.168.66.101:9000") \
#                         .config("spark.executor.extraLibraryPath", "/usr/lib/x86_64-linux-gnu/") \
#                         .config("spark.driver.extraLibraryPath", "/usr/lib/x86_64-linux-gnu/") \
#                         .config("spark.driver.extraClassPath", "/usr/lib/x86_64-linux-gnu/jni/") \
#                         .config("spark.executor.extraClassPath", "/usr/lib/x86_64-linux-gnu/jni/") \
                        

In [7]:
### Q1 While ingesting customer data from an external source, you notice duplicate entries. How would you remove duplicates and 
### retain only the latest entry based on a timestamp column?

In [22]:
data = [("101", "2023-12-01", 100), ("101", "2023-12-02", 150), 
        ("102", "2023-12-01", 200), ("102", "2023-12-02", 250)]
columns = ["product_id", "date", "sales"]

df = spark.createDataFrame(data, columns)
df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- sales: long (nullable = true)



In [19]:
df.show(truncate=False)

+----------+----------+-----+
|product_id|date      |sales|
+----------+----------+-----+
|101       |2023-12-01|100  |
|101       |2023-12-02|150  |
|102       |2023-12-01|200  |
|102       |2023-12-02|250  |
+----------+----------+-----+



In [25]:
# Casting date column from string to datestamp
df = df.withColumn('date',f.col('date').cast(DateType()))
# then sort the data to desc and then drop duplicates with save first and remove last


#df = df.orderBy(f.col('product_id'),f.col('date'), ascending=[1,0]).dropDuplicates()


In [None]:
## dropDuplicates(["product_id"]) keeps the first occurrence of each product_id based on the current row order.

## So, by sorting before it, you control which row gets kept.



In [27]:
df = df.orderBy(f.col('product_id'),f.col('date'), ascending=[1,0]).dropDuplicates(subset=['product_id'])

In [28]:
df.show()

+----------+----------+-----+
|product_id|      date|sales|
+----------+----------+-----+
|       101|2023-12-02|  150|
|       102|2023-12-02|  250|
+----------+----------+-----+



In [30]:
## QUESTION 02

In [None]:
## While processing data from multiple files with inconsistent schemas, you need to merge them into a single DataFrame. 

## How would you handle this inconsistency in PySpark?


In [None]:
sss
df = spark.read.format('parquet')\
                .option('mergeSchema',True)\
                .load('hdfs://yourlocation')

In [32]:
## Question 03

## 4. You are working with a real-time data pipeline, and you notice missing values in your streaming data Column - Category. How would you handle 
## null or missing values in such a scenario?

## df_stream = spark.readStream.schema("id INT, value STRING").csv("path/to/stream")

In [None]:
sss
df = df.fillNa({'Category':'N/A'})

In [40]:
## 5. You need to calculate the total number of actions performed by users in a system. How would you calculate the top 5 most active users 
## based on this information?**

In [74]:
data = [("user1", 5), ("user2", 8), ("user3", 2), ("user4", 10), ("user2", 3)]
columns = ["user_id", "actions"]

df = spark.createDataFrame(data, columns)
#df.show()

+-------+-------+
|user_id|actions|
+-------+-------+
|  user1|      5|
|  user2|      8|
|  user3|      2|
|  user4|     10|
|  user2|      3|
+-------+-------+



In [75]:
df = df.groupBy(f.col('user_id')).agg(sum('actions').alias("totalActions")).orderBy(f.col("totalActions"),ascending=False).limit(5)


In [76]:
#df=df.orderBy(f.col("totalActions"),descending=False)

In [77]:
df.show()

+-------+------------+
|user_id|totalActions|
+-------+------------+
|  user2|          11|
|  user4|          10|
|  user1|           5|
|  user3|           2|
+-------+------------+



In [None]:
## 6. While processing sales transaction data, you need to identify the most recent transaction for each customer. How would you approach this task?

## Hint: Window Function

In [96]:
data = [("cust1", "2023-12-01", 100), ("cust2", "2023-12-02", 150),
        ("cust1", "2023-12-03", 200), ("cust2", "2023-12-04", 250)]
columns = ["customer_id", "transaction_date", "sales"]
df = spark.createDataFrame(data, columns)

In [80]:
df.show()

+-----------+----------------+-----+
|customer_id|transaction_date|sales|
+-----------+----------------+-----+
|      cust1|      2023-12-01|  100|
|      cust2|      2023-12-02|  150|
|      cust1|      2023-12-03|  200|
|      cust2|      2023-12-04|  250|
+-----------+----------------+-----+



In [97]:
# Cast transaction_date to datestamp from string

df = df.withColumn('transaction_date', f.col('transaction_date').cast(DateType()))

In [98]:
df = df.withColumn('flag',dense_rank().over(Window.partitionBy('customer_id').orderBy(f.col('transaction_date').desc()))) \
        .filter(f.col('flag') == 1) \
        .select("customer_id","transaction_date","sales") \
        .show(2,False)

+-----------+----------------+-----+
|customer_id|transaction_date|sales|
+-----------+----------------+-----+
|cust1      |2023-12-03      |200  |
|cust2      |2023-12-04      |250  |
+-----------+----------------+-----+



In [None]:
### 7. You need to identify customers who haven’t made any purchases in the last 30 days. How would you filter such customers?**

In [114]:
data = [("cust1", "2025-04-01"), ("cust2", "2024-11-20"), ("cust3", "2024-03-25")]
columns = ["customer_id", "last_purchase_date"]

df = spark.createDataFrame(data, columns)

In [115]:
df.show()

+-----------+------------------+
|customer_id|last_purchase_date|
+-----------+------------------+
|      cust1|        2025-04-01|
|      cust2|        2024-11-20|
|      cust3|        2024-03-25|
+-----------+------------------+



In [104]:
df.printSchema() # itis string so I need to convert it

root
 |-- customer_id: string (nullable = true)
 |-- last_purchase_date: string (nullable = true)



In [116]:
df = df.withColumn("last_purchase_date", f.col("last_purchase_date").cast(DateType()))


In [119]:
## In between of this data and -30 days from todays date

#df.withColumn('gap', datediff(current_date(),'last_purchase_date')).filter(f.col('gap')>30).show()


df  = df.withColumn("gap",f.datediff(current_date(), f.col('last_purchase_date'))).filter(f.col('gap') > 30)

In [120]:
df.show()

+-----------+------------------+---+
|customer_id|last_purchase_date|gap|
+-----------+------------------+---+
|      cust2|        2024-11-20|140|
|      cust3|        2024-03-25|380|
+-----------+------------------+---+

