In [0]:
from pyspark.sql.functions import *

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Initialize Spark session
spark = SparkSession.builder.appName("DummyDataFrame").getOrCreate()

# Define schema
schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("artist_name", StringType(), True),
    StructField("song_id", StringType(), True),
    StructField("played_duration", IntegerType(), True)
])

# Create data
data = [
    ("U001", "Artist A", "S001", 180),
    ("U001", "Artist B", "S002", 200),
    ("U001", "Artist A", "S003", 150),
    ("U001", "Artist C", "S004", 300),
    ("U001", "Artist D", "S008", 300),
    ("U001", "Artist B", "S005", 50),
    ("U002", "Artist D", "S006",240),
    ("U002", "Artist A", "S007", 140),
]

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Show DataFrame
df.show()


+-------+-----------+-------+---------------+
|user_id|artist_name|song_id|played_duration|
+-------+-----------+-------+---------------+
|   U001|   Artist A|   S001|            180|
|   U001|   Artist B|   S002|            200|
|   U001|   Artist A|   S003|            150|
|   U001|   Artist C|   S004|            300|
|   U001|   Artist D|   S008|            300|
|   U001|   Artist B|   S005|             50|
|   U002|   Artist D|   S006|            240|
|   U002|   Artist A|   S007|            140|
+-------+-----------+-------+---------------+



In [0]:
agg_df=df.groupBy("user_id","artist_name").agg(sum("played_duration").alias("total_time"))

In [0]:
agg_df.show()

+-------+-----------+----------+
|user_id|artist_name|total_time|
+-------+-----------+----------+
|   U001|   Artist A|       330|
|   U001|   Artist B|       250|
|   U001|   Artist C|       300|
|   U001|   Artist D|       300|
|   U002|   Artist D|       240|
|   U002|   Artist A|       140|
+-------+-----------+----------+



In [0]:
from pyspark.sql.window import Window

In [0]:
ranked=agg_df.withColumn("rank",row_number().over(Window.partitionBy("user_id").orderBy(col("total_time").desc())))


In [0]:
ranked.show()

+-------+-----------+----------+----+
|user_id|artist_name|total_time|rank|
+-------+-----------+----------+----+
|   U001|   Artist A|       330|   1|
|   U001|   Artist C|       300|   2|
|   U001|   Artist D|       300|   3|
|   U001|   Artist B|       250|   4|
|   U002|   Artist D|       240|   1|
|   U002|   Artist A|       140|   2|
+-------+-----------+----------+----+



In [0]:
result=ranked.filter(col("rank")<=3)

In [0]:
result.show()

+-------+-----------+----------+----+
|user_id|artist_name|total_time|rank|
+-------+-----------+----------+----+
|   U001|   Artist A|       330|   1|
|   U001|   Artist C|       300|   2|
|   U001|   Artist D|       300|   3|
|   U002|   Artist D|       240|   1|
|   U002|   Artist A|       140|   2|
+-------+-----------+----------+----+



In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("InterviewTask").getOrCreate()

# DataFrame 1: Employee details
data_emp = [
    (101, "Alice", "IT", 6000),
    (102, "Bob", "CS", 7500),
    (103, "Cathy", "IT", 6200),
    (104, "David", "Mech", 5500),
    (105, "Eve", "CS", 7000),
    (106, "Frank", "ECE", 4500),
    (107, "Grace", "ECE", 4800),
]

columns_emp = ["emp_id", "name", "department", "salary"]
df_emp = spark.createDataFrame(data_emp, columns_emp)

# DataFrame 2: Department budget info
data_dept = [
    ("IT", 20000),
    ("CS", 25000),
    ("ECE", 15000),
    ("Mech", 10000),
    ("Civil", 12000)
]

columns_dept = ["department", "budget"]
df_dept = spark.createDataFrame(data_dept, columns_dept)

In [0]:
df_dept.show()

+----------+------+
|department|budget|
+----------+------+
|        IT| 20000|
|        CS| 25000|
|       ECE| 15000|
|      Mech| 10000|
|     Civil| 12000|
+----------+------+



In [0]:
df_emp.show()

+------+-----+----------+------+
|emp_id| name|department|salary|
+------+-----+----------+------+
|   101|Alice|        IT|  6000|
|   102|  Bob|        CS|  7500|
|   103|Cathy|        IT|  6200|
|   104|David|      Mech|  5500|
|   105|  Eve|        CS|  7000|
|   106|Frank|       ECE|  4500|
|   107|Grace|       ECE|  4800|
+------+-----+----------+------+



In [0]:
total_salary=df_emp.groupBy("department").agg(sum("salary").alias("totalsalary"))

In [0]:
total_salary.show()

+----------+-----------+
|department|totalsalary|
+----------+-----------+
|        IT|      12200|
|        CS|      14500|
|      Mech|       5500|
|       ECE|       9300|
+----------+-----------+



In [0]:
df_joined = total_salary.join(df_dept,on="department" , how="inner")
df_joined.show()

+----------+-----------+------+
|department|totalsalary|budget|
+----------+-----------+------+
|        IT|      12200| 20000|
|        CS|      14500| 25000|
|       ECE|       9300| 15000|
|      Mech|       5500| 10000|
+----------+-----------+------+



In [0]:
df_filtered = df_joined.filter(col("totalsalary") > col("budget"))

In [0]:
df_filtered.show()

+----------+-----------+------+
|department|totalsalary|budget|
+----------+-----------+------+
+----------+-----------+------+



In [0]:
dfresult = df_joined.withColumn("over_budget", when(col("totalsalary") > col("budget"), "Yes").otherwise("No"))
dfresult.show()

+----------+-----------+------+-----------+
|department|totalsalary|budget|over_budget|
+----------+-----------+------+-----------+
|        IT|      12200| 20000|         No|
|        CS|      14500| 25000|         No|
|       ECE|       9300| 15000|         No|
|      Mech|       5500| 10000|         No|
+----------+-----------+------+-----------+

