In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [2]:
spark = (
    SparkSession.builder
                .master('local')
                .appName('Interview_training')
                .getOrCreate()
)

### Q1 While ingesting customer data from an external source, you notice duplicate entries. How would you remove duplicates and retain only the latest entry based on a timestamp column

In [3]:
data = [(101, "2023-12-01", 100), [101, "2023-12-02", 150], 
        [102, "2023-12-01", 200], [102, "2023-12-02", 250]]
schema = 'customer_id int, date string, amount int'
df = spark.createDataFrame(data, schema)
df = df.withColumn('date', col('date').cast(DateType()))
df.printSchema()
df.show()

root
 |-- customer_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: integer (nullable = true)

+-----------+----------+------+
|customer_id|      date|amount|
+-----------+----------+------+
|        101|2023-12-01|   100|
|        101|2023-12-02|   150|
|        102|2023-12-01|   200|
|        102|2023-12-02|   250|
+-----------+----------+------+



In [4]:
df.orderBy(col('date').desc()).dropDuplicates(subset = ['customer_id']).show() #solution1
df.sort(desc('date')).dropDuplicates(subset = ['customer_id']).show() #solution2
df.orderBy(df['date'], ascending = [0]).dropDuplicates(subset = ['customer_id']).show() # solution3
df.orderBy(df['date'], ascending = [False]).dropDuplicates(subset = ['customer_id']).show() # solution4

+-----------+----------+------+
|customer_id|      date|amount|
+-----------+----------+------+
|        101|2023-12-02|   150|
|        102|2023-12-02|   250|
+-----------+----------+------+

+-----------+----------+------+
|customer_id|      date|amount|
+-----------+----------+------+
|        101|2023-12-02|   150|
|        102|2023-12-02|   250|
+-----------+----------+------+

+-----------+----------+------+
|customer_id|      date|amount|
+-----------+----------+------+
|        101|2023-12-02|   150|
|        102|2023-12-02|   250|
+-----------+----------+------+

+-----------+----------+------+
|customer_id|      date|amount|
+-----------+----------+------+
|        101|2023-12-02|   150|
|        102|2023-12-02|   250|
+-----------+----------+------+



In [5]:
#solution 5
Window1 = Window.partitionBy(col('customer_id')).orderBy(col('date').desc())
rn = row_number().over(Window1)
df.withColumn('rank', rn).where('rank = 1').show()

+-----------+----------+------+----+
|customer_id|      date|amount|rank|
+-----------+----------+------+----+
|        101|2023-12-02|   150|   1|
|        102|2023-12-02|   250|   1|
+-----------+----------+------+----+



In [6]:
#solution 6
df.createOrReplaceTempView('table1')

spark.sql('''
WITH cte1 as (
    SELECT customer_id
           ,date
           ,amount
           , ROW_NUMBER() OVER(PARTITION BY customer_id ORDER BY date DESC) as rank
FROM table1
)

SELECT customer_id,
        date,
        amount FROM cte1 
        where rank = 1 

''').show()

+-----------+----------+------+
|customer_id|      date|amount|
+-----------+----------+------+
|        101|2023-12-02|   150|
|        102|2023-12-02|   250|
+-----------+----------+------+



In [7]:
df.rdd.getNumPartitions()

1

In [8]:
import os

files = os.listdir('/')
print(files)

['$Recycle.Bin', 'AMD', 'Documents and Settings', 'DumpStack.log.tmp', 'hadoop', 'hadoop-2.7.1', 'hiberfil.sys', 'Intel', 'MSI', 'mylog.log', 'OneDriveTemp', 'pagefile.sys', 'PerfLogs', 'Program Files', 'Program Files (x86)', 'ProgramData', 'Recovery', 'RHDSetup.log', 'Riot Games', 'swapfile.sys', 'System Volume Information', 'Users', 'Windows']


### Q2 You need to calculate the total number of actions performed by users in a system. How would you calculate the top 5 most active users based on this information?

In [9]:
data  = [("user1", 5), ("user2", 8), ("user3", 2), ("user2", 9),
        ("user5", 5), ("user1", 8), ("user2", 2), ("user3", 9),
        ("user4", 5), ("user6", 8), ("user7", 2), ("user3", 8)]

schema = 'username string, actions int'

df = spark.createDataFrame(data, schema)
df.show()

df.groupBy('username').agg(sum(col('actions')).alias('avg_actions')).orderBy(col('avg_actions').desc()).limit(5).show()

+--------+-------+
|username|actions|
+--------+-------+
|   user1|      5|
|   user2|      8|
|   user3|      2|
|   user2|      9|
|   user5|      5|
|   user1|      8|
|   user2|      2|
|   user3|      9|
|   user4|      5|
|   user6|      8|
|   user7|      2|
|   user3|      8|
+--------+-------+

+--------+-----------+
|username|avg_actions|
+--------+-----------+
|   user3|         19|
|   user2|         19|
|   user1|         13|
|   user6|          8|
|   user5|          5|
+--------+-----------+



### Q3 While processing sales transaction data, you nedd to identify the most recent transaction for each customer. How would you approach this task?

In [10]:
data = [(101, "2023-12-01", 100), (101, "2023-12-02", 150), 
        (102, "2023-12-01", 200), (102, "2023-12-02", 250),
       (103, "2024-12-20", 400), (103, "2022-01-02", 150), 
        (102, "2025-02-10", 200), (105, "2025-01-02", 250)]
schema = 'customer_id string, date string, amount int'
df = spark.createDataFrame(data, schema)
df = df.withColumn('date', col('date').cast(DateType()))

In [11]:
window_spec = Window.partitionBy(col('customer_id')).orderBy(col('date').desc())
df.withColumn('rank', dense_rank().over(window_spec)).where('rank = 1').drop('rank').show()

+-----------+----------+------+
|customer_id|      date|amount|
+-----------+----------+------+
|        101|2023-12-02|   150|
|        102|2025-02-10|   200|
|        103|2024-12-20|   400|
|        105|2025-01-02|   250|
+-----------+----------+------+



### Q4 You need to identify customers who haven't made any purchases in the last 30 days. How would you filter such customers?

In [12]:
data = [("cust1", "2025-03-01"),("cust2", "2025-03-11"), ("cust3", "2025-02-01"),  ("cust1", "2025-01-21")]
schema = 'customer string, date string'
df = spark.createDataFrame(data,schema)
df = df.withColumn('date', col('date').cast(DateType()))
df.printSchema()

df=df.withColumn('gap', date_diff(current_date(),  col('date')))

root
 |-- customer: string (nullable = true)
 |-- date: date (nullable = true)



In [13]:
df.where('gap > 30').show()

+--------+----------+---+
|customer|      date|gap|
+--------+----------+---+
|   cust3|2025-02-01| 40|
|   cust1|2025-01-21| 51|
+--------+----------+---+



### Q5 While analyzing customer reviews, you need to identify the most frequently used words in the feedback. How would you implement this? 

In [14]:
data = [("customer1", "The product is great"), ("customer2", "Great product, fast delivery"), ("customer3", "Not bad, could be better")]
columns = ["customer_id", "feedback"]

df = spark.createDataFrame(data,columns)
df.show()

df=df.withColumn('feedback', lower('feedback'))\
    .withColumn("feedback", regexp_replace(col("feedback"), ",", ""))\
    .withColumn('word', explode(split(col('feedback'), ' ')))

df.groupBy('word').agg(count(col('word')).alias('wordcount')).orderBy(desc('wordcount')).show()

+-----------+--------------------+
|customer_id|            feedback|
+-----------+--------------------+
|  customer1|The product is great|
|  customer2|Great product, fa...|
|  customer3|Not bad, could be...|
+-----------+--------------------+

+--------+---------+
|    word|wordcount|
+--------+---------+
|   great|        2|
| product|        2|
|   could|        1|
|     not|        1|
|      be|        1|
|    fast|        1|
|      is|        1|
|     bad|        1|
|     the|        1|
|delivery|        1|
|  better|        1|
+--------+---------+



### Q6 You need to calculate the cumulative sum of sales over time for each product. How would you approach this?

In [25]:
data = [("product1", "2023-12-01", 100), ("product2", "2023-12-02", 200),
        ("product1", "2023-12-03", 150), ("product2", "2023-12-04", 250),
        ("product3", "2023-12-01", 500), ("product4", "2023-12-02", 200),
        ("product1", "2024-10-03", 250), ("product2", "2023-03-04", 550)]
columns = ["product_id", "date", "sales"]
df = spark.createDataFrame(data, columns)
df.show()
df.printSchema()

window_spec = Window.partitionBy(col('product_id')).orderBy('date')
df.withColumn('cum_sum', sum('sales').over(window_spec)).show()

+----------+----------+-----+
|product_id|      date|sales|
+----------+----------+-----+
|  product1|2023-12-01|  100|
|  product2|2023-12-02|  200|
|  product1|2023-12-03|  150|
|  product2|2023-12-04|  250|
|  product3|2023-12-01|  500|
|  product4|2023-12-02|  200|
|  product1|2024-10-03|  250|
|  product2|2023-03-04|  550|
+----------+----------+-----+

root
 |-- product_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- sales: long (nullable = true)

+----------+----------+-----+-------+
|product_id|      date|sales|cum_sum|
+----------+----------+-----+-------+
|  product1|2023-12-01|  100|    100|
|  product1|2023-12-03|  150|    250|
|  product1|2024-10-03|  250|    500|
|  product2|2023-03-04|  550|    550|
|  product2|2023-12-02|  200|    750|
|  product2|2023-12-04|  250|   1000|
|  product3|2023-12-01|  500|    500|
|  product4|2023-12-02|  200|    200|
+----------+----------+-----+-------+



### Q7 While analyzing sales data, you need to find the product with the highest sales for each month. How would you accomplish this?

In [32]:
df = spark.createDataFrame(data, columns)
df = df.withColumn('year', year('date')).withColumn('month', month('date'))
df.show()
window_spec = Window.partitionBy('year', 'month').orderBy(desc('sales'))
df = df.withColumn('dr', dense_rank().over(window_spec)).where("dr = 1 ")  
df.show()

+----------+----------+-----+----+-----+
|product_id|      date|sales|year|month|
+----------+----------+-----+----+-----+
|  product1|2023-12-01|  100|2023|   12|
|  product2|2023-12-02|  200|2023|   12|
|  product1|2023-12-03|  150|2023|   12|
|  product2|2023-12-04|  250|2023|   12|
|  product3|2023-12-01|  500|2023|   12|
|  product4|2023-12-02|  200|2023|   12|
|  product1|2024-10-03|  250|2024|   10|
|  product2|2023-03-04|  550|2023|    3|
+----------+----------+-----+----+-----+

+----------+----------+-----+----+-----+---+
|product_id|      date|sales|year|month| dr|
+----------+----------+-----+----+-----+---+
|  product2|2023-03-04|  550|2023|    3|  1|
|  product3|2023-12-01|  500|2023|   12|  1|
|  product1|2024-10-03|  250|2024|   10|  1|
+----------+----------+-----+----+-----+---+



### Q8 While processing sales data, you need to classify each transaction as either 'High' or 'Low' based on its amount. How would you achieve this using a when condition

In [40]:
data = [("product1", 100) , ("product2", 300), ("product3", 50)]
columns = ["product_id", "sales"]
df = spark.createDataFrame(data,columns)
df.show()

df.withColumn("classification", when(col('sales') >100, 'High')
                                .when(col('sales') > 50, 'Medium')
                                .otherwise('Low')).show()

df.createOrReplaceTempView('table1')
spark.sql('''
    Select product_id, sales,
            CASE
                WHEN sales > 100 THEN 'High'
                WHEN sales > 50 THEN 'Medium'
            ELSE 'Low' 
            END as classification
    FROM table1
''').show()

+----------+-----+
|product_id|sales|
+----------+-----+
|  product1|  100|
|  product2|  300|
|  product3|   50|
+----------+-----+

+----------+-----+--------------+
|product_id|sales|classification|
+----------+-----+--------------+
|  product1|  100|        Medium|
|  product2|  300|          High|
|  product3|   50|           Low|
+----------+-----+--------------+

+----------+-----+--------------+
|product_id|sales|classification|
+----------+-----+--------------+
|  product1|  100|        Medium|
|  product2|  300|          High|
|  product3|   50|           Low|
+----------+-----+--------------+

