## Question 1

In [0]:
# 1. Write PySpark code to get top 2 products by amount per user.

# +--------+----------+-------+
# |user_id |product_id|amount |
# +--------+----------+-------+
# |1       |101       |200    |
# |1       |102       |500    |
# |1       |103       |300    |
# |2       |101       |400    |
# |2       |102       |100    |
# +--------+----------+-------+

- Step 1: Created DataFrame using sample data given.
- Step 2: Window partitioned by user, ordered by amount desc.
- Step 3: created Rank using row_number and filtered rank < 3.

In [0]:
data=[
    (1,101,200),
    (1,102,500),
    (1,103,300),
    (2,101,400),
    (2,102,100)
]
columns=['user_id','product_id','amount']
df = spark.createDataFrame(data,columns)
df.display()

user_id,product_id,amount
1,101,200
1,102,500
1,103,300
2,101,400
2,102,100


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col,row_number
window_spec = Window.partitionBy("user_id").orderBy(col("amount").desc())
df_rank = df.withColumn("rank",row_number().over(window_spec))
df_rank.display()

user_id,product_id,amount,rank
1,102,500,1
1,103,300,2
1,101,200,3
2,101,400,1
2,102,100,2


In [0]:
df_final = df_rank.filter(col("rank") < 3).select("user_id","product_id")
df_final.display()

user_id,product_id
1,102
1,103
2,101
2,102


## Question 2

In [0]:
# 2. Given Sales data 

# +--------+-------+------+
# |region  |month  |sales |
# +--------+-------+------+
# |East    |Jan    |100   |
# |East    |Feb    |120   |
# |West    |Jan    |80    |
# +--------+-------+------+

# Transform to this 

# +--------+-----+-----+
# |region  |Jan  |Feb  |
# +--------+-----+-----+
# |East    |100  |120  |
# |West    |80   |NULL |
# +--------+-----+-----+

- Step 1: Created DataFrame using sample data given.
- Step 2: Using SQL **case `when`** and **GroupBY**
- Step 3: Using Pyspark Built in **PIVOT**  function

In [0]:
data=[
    ('East','Jan',100),
    ('East','Feb',120),
    ('West','Jan',80)
]
columns = ['region','month','sales']
df = spark.createDataFrame(data,columns)
df.display()

region,month,sales
East,Jan,100
East,Feb,120
West,Jan,80


**Group By** region and for each region if Month sum up all sales for partitcular month and create month column using **CASE WHEN** 

In [0]:
df.createOrReplaceTempView("df")
df_final = spark.sql("""
                     SELECT
  region,
  SUM(CASE WHEN month = 'Jan' THEN sales ELSE NULL END) AS Jan,
  SUM(CASE WHEN month = 'Feb' THEN sales ELSE NULL END) AS Feb
FROM df
GROUP BY region
                    """)
df_final.display()

region,Jan,Feb
East,100,120.0
West,80,


Using Pyspark built in **PIVOT** function

In [0]:
from pyspark.sql.functions import sum,when
df_pivot = df.groupBy("region").pivot("month").agg(sum("sales"))
df_pivot.display()

region,Feb,Jan
East,120.0,100
West,,80


## Question 3

In [0]:
# 3. Parse the JSON, explode the events array, and create a DataFrame with columns: user_id, event_type, timestamp

# # Sample DataFrame
# data = [
#     ('{"user_id":1,"events":[{"event_type":"click","timestamp":"2023-01-01T10:00:00"},{"event_type":"purchase","timestamp":"2023-01-01T10:05:00"}]}',),
#     ('{"user_id":2,"events":[{"event_type":"click","timestamp":"2023-01-02T11:00:00"}]}',)
# ]

- Step 1:  Created raw string DF using sampple given.
- Step 2: Defined schema for JSON.
- Step 3: Parsed json string with **from_json**.
- Step 4: using **Explode** converted event array to each row per user.
- Step 5: Selected event_type and timestamp from the event object.

In [0]:
data = [
    ('{"user_id":1,"events":[{"event_type":"click","timestamp":"2023-01-01T10:00:00"},{"event_type":"purchase","timestamp":"2023-01-01T10:05:00"}]}',),
    ('{"user_id":2,"events":[{"event_type":"click","timestamp":"2023-01-02T11:00:00"}]}',)
]
df = spark.createDataFrame(data,["json_str"])
df.display()

json_str
"{""user_id"":1,""events"":[{""event_type"":""click"",""timestamp"":""2023-01-01T10:00:00""},{""event_type"":""purchase"",""timestamp"":""2023-01-01T10:05:00""}]}"
"{""user_id"":2,""events"":[{""event_type"":""click"",""timestamp"":""2023-01-02T11:00:00""}]}"


In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType

schema = StructType([
    StructField("user_id", IntegerType()),
    StructField("events", ArrayType(
        StructType([
            StructField("event_type", StringType()),
            StructField("timestamp", StringType())
        ])
    ))
])
from pyspark.sql.functions import from_json, col
from pyspark.sql.functions import from_json, col

df_parsed = df.withColumn("parsed", from_json(col("json_str"), schema))
df_parsed.display()

In [0]:
from pyspark.sql.functions import explode

df_exploded = df_parsed.select(
    col("parsed.user_id").alias("user_id"),
    explode(col("parsed.events")).alias("event")
)
df_exploded.display()

user_id,event
1,"List(click, 2023-01-01T10:00:00)"
1,"List(purchase, 2023-01-01T10:05:00)"
2,"List(click, 2023-01-02T11:00:00)"


In [0]:
df_final = df_exploded.select(
    "user_id",
    col("event.event_type"),
    col("event.timestamp")
)
df_final.display()

user_id,event_type,timestamp
1,click,2023-01-01T10:00:00
1,purchase,2023-01-01T10:05:00
2,click,2023-01-02T11:00:00


## Question 4

In [0]:
# 4. Merge (Upsert) New JSON Data into Existing Table

# user_id	name	age
# 1		Alice	30
# 2		Bob		25

# New data : 
# {"user_id":1, "name":"Alice", "age":31}
# {"user_id":3, "name":"Charlie", "age":22}

- Step 1: Create DataFrame and save as Delta table.
- Step 2: Read new JSON data into new DataFrame.
- Step 3: Merge on user_id (update + insert).

In [0]:
%sql
DROP TABLE IF EXISTS df_test

In [0]:
schema = StructType([
    StructField("user_id",IntegerType()),
    StructField("name",StringType()),
    StructField("age",IntegerType())
])
data = [
    (1, "Alice", 30),
    (2, "Bob", 25)
]
df = spark.createDataFrame(data,schema)
df.write.format("delta").mode("overwrite").saveAsTable("df_test")
df = spark.read.table("df_test")
df.display()

user_id,name,age
1,Alice,30
2,Bob,25


In [0]:
# New JSON
data = [
    ('{"user_id":1, "name":"Alice", "age":31}',),
    ('{"user_id":3, "name":"Charlie", "age":22}',)
]
df_new_raw = spark.createDataFrame(data, ["json_str"])
df_new_raw.display()

json_str
"{""user_id"":1, ""name"":""Alice"", ""age"":31}"
"{""user_id"":3, ""name"":""Charlie"", ""age"":22}"


In [0]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType
schema = StructType([
    StructField("user_id", IntegerType()),
    StructField("name", StringType()),
    StructField("age", IntegerType())
])
df_json = df_new_raw.select(from_json("json_str", schema).alias("parsed")).select("parsed.*")
df_json.display()
df_json.createOrReplaceTempView("df_raw")

user_id,name,age
1,Alice,31
3,Charlie,22


In [0]:
df_merge = spark.sql("""
                     MERGE INTO df_test tgt
USING df_raw src
on tgt.user_id = src.user_id
WHEN MATCHED THEN
  UPDATE SET tgt.name = src.name, tgt.age = src.age
WHEN NOT MATCHED THEN
  INSERT (user_id, name, age)
  VALUES (src.user_id, src.name, src.age)
  """)
df_merge.display()

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
2,1,0,1


In [0]:
%sql
DROP TABLE IF EXISTS df_test

In [0]:
df_updated = spark.read.table("df_test")
df_updated.display()

user_id,name,age
3,Charlie,22
1,Alice,31
2,Bob,25


## Question 5

In [0]:
# 5.Find Patients with Increasing Sugar level

# Problem:
# You have patient monthly Sugar level details:
# Find users whose Sugar levels are strictly increasing every month.

# user_id	month	Value
# 1		2023-01	100
# 1		2023-02	130
# 1		2023-03	160
# 2		2023-01	170
# 2		2023-02	180
# 2		2023-03	150

- Step 1: Create df using sample data
- Step 2: Create column prev_value by grouping based on user_id and taking user_id prev value using **lag**.
- Step 3: Create new column is_increasing and flag with value 1 if value > prev_value else 0
- Step 4: Get user_id whose sugar level did not increase and subtract them from df
- Step 5: Show all users with strictly increasing sugar levels excluding prev non increasing users

In [0]:
data = [
    (1,'2023-01',100),
    (1,'2023-02',130),
    (1,'2023-03',160),
    (2,'2023-01',170),
    (2,'2023-02',180),
    (2,'2023-03',150)
]
schema = StructType([
    StructField("user_id",IntegerType()),
    StructField("month",StringType()),
    StructField("Value",IntegerType())
])
df = spark.createDataFrame(data,schema)
df.display()

user_id,month,Value
1,2023-01,100
1,2023-02,130
1,2023-03,160
2,2023-01,170
2,2023-02,180
2,2023-03,150


In [0]:
from pyspark.sql.functions import lag,sum,count
window_spec = Window.partitionBy("user_id").orderBy("month")
df_prev_value = df.withColumn("prev_value", lag("value").over(window_spec))
df_prev_value.display()

user_id,month,Value,prev_value
1,2023-01,100,
1,2023-02,130,100.0
1,2023-03,160,130.0
2,2023-01,170,
2,2023-02,180,170.0
2,2023-03,150,180.0


In [0]:
df_flagged = df_prev_value.withColumn("is_increasing", (col("value") > col("prev_value")).cast("int"))
df_flagged.display()

user_id,month,Value,prev_value,is_increasing
1,2023-01,100,,
1,2023-02,130,100.0,1.0
1,2023-03,160,130.0,1.0
2,2023-01,170,,
2,2023-02,180,170.0,1.0
2,2023-03,150,180.0,0.0


In [0]:
df_not_increased = df_flagged.filter(col("prev_value").isNotNull() & (col("value") <= col("prev_value"))).select("user_id").distinct()
df_not_increased.display()

user_id
2


In [0]:
df_final  = df_flagged.select("user_id").distinct().subtract(df_not_increased)
df_final.display()

user_id
1


## Question 6

In [0]:
# 6. Calculate Length of Stay and Total Number of Tests During Hospitalization

# For each patient, calculate their hospital length of stay (in days) and the total number of lab tests done during their stay.

# | patient_id | name    | gender | dob        | admission_date | discharge_date |
# | ---------- | ------- | ------ | ---------- | -------------- | -------------- |
# | 1          | Alice   | F      | 1980-05-20 | 2023-07-01     | 2023-07-10     |
# | 2          | Bob     | M      | 1975-03-15 | 2023-07-02     | 2023-07-12     |
# | 3          | Charlie | M      | 1990-11-30 | 2023-07-05     | 2023-07-15     |

# | test_id | patient_id | test_type  | test_date  | test_result | normal_range_min | normal_range_max |
# | ------- | ---------- | ---------- | ---------- | ----------- | ---------------- | ---------------- |
# | 101     | 1          | Hemoglobin | 2023-07-02 | 13.5        | 12.0             | 16.0             |
# | 102     | 1          | Hemoglobin | 2023-07-08 | 11.0        | 12.0             | 16.0             |
# | 103     | 2          | Hemoglobin | 2023-07-03 | 14.0        | 12.0             | 16.0             |
# | 104     | 2          | Glucose    | 2023-07-04 | 180         | 70               | 110              |
# | 105     | 3          | Hemoglobin | 2023-07-06 | 15.0        | 12.0             | 16.0             |
# | 106     | 3          | Glucose    | 2023-07-10 | 95          | 70               | 110              |


- Step 1: Create df using sample data
- Step 2: Calculate Length of Stay using **datediff** between discharge_date, admission_date
- Step 3: joined patients_df and tests_df on patient_id
- Step 4: Filtered tests that happened during the hospital stay
- Step 5: **grouped** the filtered data by patient_id, name, and length_of_stay, and used count(*) to get the total number of tests per patient.

In [0]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import to_date,datediff

patient_data = [
    (1, "Alice", "F", "1980-05-20", "2023-07-01", "2023-07-10"),
    (2, "Bob", "M", "1975-03-15", "2023-07-02", "2023-07-12"),
    (3, "Charlie", "M", "1990-11-30", "2023-07-05", "2023-07-15")
]

patient_schema = StructType([
    StructField("patient_id", IntegerType()),
    StructField("name", StringType()),
    StructField("gender", StringType()),
    StructField("dob", StringType()),
    StructField("admission_date", StringType()),
    StructField("discharge_date", StringType())
])

test_data = [
    (101, 1, "Hemoglobin", "2023-07-02", 13.5, 12.0, 16.0),
    (102, 1, "Hemoglobin", "2023-07-08", 11.0, 12.0, 16.0),
    (103, 2, "Hemoglobin", "2023-07-03", 14.0, 12.0, 16.0),
    (104, 2, "Glucose", "2023-07-04", 180.0, 70.0, 110.0),
    (105, 3, "Hemoglobin", "2023-07-06", 15.0, 12.0, 16.0),
    (106, 3, "Glucose", "2023-07-10", 95.0, 70.0, 110.0)
]

test_schema = StructType([
    StructField("test_id", IntegerType()),
    StructField("patient_id", IntegerType()),
    StructField("test_type", StringType()),
    StructField("test_date", StringType()),
    StructField("test_result", DoubleType()),
    StructField("normal_range_min", DoubleType()),
    StructField("normal_range_max", DoubleType())
])


patients_df = spark.createDataFrame(patient_data, schema=patient_schema)
tests_df = spark.createDataFrame(test_data, schema=test_schema)


patients_df = patients_df.withColumn("admission_date", to_date("admission_date")).withColumn("discharge_date", to_date("discharge_date"))
tests_df = tests_df.withColumn("test_date", to_date("test_date"))

patients_df.display()
tests_df.display()

patient_id,name,gender,dob,admission_date,discharge_date
1,Alice,F,1980-05-20,2023-07-01,2023-07-10
2,Bob,M,1975-03-15,2023-07-02,2023-07-12
3,Charlie,M,1990-11-30,2023-07-05,2023-07-15


test_id,patient_id,test_type,test_date,test_result,normal_range_min,normal_range_max
101,1,Hemoglobin,2023-07-02,13.5,12.0,16.0
102,1,Hemoglobin,2023-07-08,11.0,12.0,16.0
103,2,Hemoglobin,2023-07-03,14.0,12.0,16.0
104,2,Glucose,2023-07-04,180.0,70.0,110.0
105,3,Hemoglobin,2023-07-06,15.0,12.0,16.0
106,3,Glucose,2023-07-10,95.0,70.0,110.0


In [0]:
patients_df = patients_df.withColumn("length_of_stay", datediff("discharge_date", "admission_date"))
patients_df.display()

patient_id,name,gender,dob,admission_date,discharge_date,length_of_stay
1,Alice,F,1980-05-20,2023-07-01,2023-07-10,9
2,Bob,M,1975-03-15,2023-07-02,2023-07-12,10
3,Charlie,M,1990-11-30,2023-07-05,2023-07-15,10


In [0]:
joined_df = patients_df.join(tests_df, "patient_id").filter(col("test_date").between(col("admission_date"), col("discharge_date")))
joined_df.display()

patient_id,name,gender,dob,admission_date,discharge_date,length_of_stay,test_id,test_type,test_date,test_result,normal_range_min,normal_range_max
1,Alice,F,1980-05-20,2023-07-01,2023-07-10,9,101,Hemoglobin,2023-07-02,13.5,12.0,16.0
1,Alice,F,1980-05-20,2023-07-01,2023-07-10,9,102,Hemoglobin,2023-07-08,11.0,12.0,16.0
2,Bob,M,1975-03-15,2023-07-02,2023-07-12,10,103,Hemoglobin,2023-07-03,14.0,12.0,16.0
2,Bob,M,1975-03-15,2023-07-02,2023-07-12,10,104,Glucose,2023-07-04,180.0,70.0,110.0
3,Charlie,M,1990-11-30,2023-07-05,2023-07-15,10,105,Hemoglobin,2023-07-06,15.0,12.0,16.0
3,Charlie,M,1990-11-30,2023-07-05,2023-07-15,10,106,Glucose,2023-07-10,95.0,70.0,110.0


In [0]:
df_final = joined_df.groupBy("patient_id","name","length_of_stay").agg(count("*").alias("total_tests"))
df_final.display()

patient_id,name,length_of_stay,total_tests
1,Alice,9,2
3,Charlie,10,2
2,Bob,10,2
