You are given a dataset of employees in an Indian company with columns: emp_id, name, department, salary, and city. Write a PySpark program to find the total salary paid in each department.
Sample Data:
emp_id, name, department, salary, city
101, Rajesh, IT, 75000, Bangalore
102, Priya, HR, 60000, Mumbai
103, Anil, IT, 80000, Hyderabad
104, Sneha, HR, 62000, Pune
105, Manish, Finance, 90000, Chennai
106, Suresh, IT, 78000, Bangalore

In [0]:
data = [
    (101, "Rajesh", "IT", 75000, "Bangalore"),
    (102, "Priya", "HR", 60000, "Mumbai"),
    (103, "Anil", "IT", 80000, "Hyderabad"),
    (104, "Sneha", "HR", 62000, "Pune"),
    (105, "Manish", "Finance", 90000, "Chennai"),
    (106, "Suresh", "IT", 78000, "Bangalore")
]
col = ["emp_id", "name", "department", "salary", "city"]
df = spark.createDataFrame(data, col)
df.createOrReplaceTempView("employee")
df1 = spark.sql("SELECT department, SUM(salary) AS total_salary FROM employee GROUP BY department")
display(df1)

In [0]:
from pyspark.sql.functions import sum

df1 = df.groupBy("department").agg(sum("salary").alias("total_salary"))
display(df1)

You are given a dataset containing customer transactions in an Indian e-commerce platform with columns: cust_id, cust_name, city, purchase_amount, and product_category. Some records have missing purchase_amount. Write a PySpark program to fill missing purchase_amount values with the average purchase amount of that product category.

Sample Data:
cust_id, cust_name, city, purchase_amount, product_category
201, Aman, Delhi, 1500, Electronics
202, Kiran, Mumbai, , Fashion
203, Ravi, Bangalore, 2000, Electronics
204, Simran, Hyderabad, , Fashion
205, Vinay, Pune, 1800, Electronics
206, Pooja, Chennai, 1300, Grocery
Expected Output (assuming average for Fashion = 2000):
201, Aman, Delhi, 1500, Electronics
202, Kiran, Mumbai, 2000, Fashion
203, Ravi, Bangalore, 2000, Electronics
204, Simran, Hyderabad, 2000, Fashion
205, Vinay, Pune, 1800, Electronics

206, Pooja, Chennai, 1300, Grocery


In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import avg, when, col
from pyspark.sql import functions as F

data = [
    (201, "Aman", "Delhi", 1500, "Electronics"),
    (202, "Kiran", "Mumbai", None, "Fashion"),
    (203, "Ravi", "Bangalore", 2000, "Electronics"),
    (204, "Simran", "Hyderabad", 2000, "Fashion"),
    (205, "Vinay", "Pune", " ", "Electronics"),
    (206, "Pooja", "Chennai", 1300, "Grocery")
]

schema=StructType(
   [ StructField("cust_id",StringType(),False),
    StructField("cust_name",StringType(),False),
    StructField("city",StringType(),False),
    StructField("purchase_amount",StringType(),True),
    StructField("product_category",StringType(),False)]
)
df=spark.createDataFrame(data, schema)
df_clean = df.withColumn("purchase_amount", when((col("purchase_amount").isNull()) | (col("purchase_amount") == " "), None).otherwise(col("purchase_amount")))
df_clean = df_clean.withColumn("purchase_amount", col("purchase_amount").cast("double"))
df1=df_clean.groupBy("product_category").agg(avg("purchase_amount").alias("avg_purchase"))
df2=df_clean.join(df1,"product_category","left")
df3=df2.withColumn("purchase_amount", when(col("purchase_amount").isNull(), col("avg_purchase")).otherwise(col("purchase_amount"))).drop("avg_purchase")
display(df3)
df.explain()

You have a dataset of students from different Indian states with columns: student_id, student_name, state, score. Write a PySpark program to rank students within each state based on their scores in descending order.
Sample Data:
student_id, student_name, state, score
301, Rohit, Maharashtra, 85
302, Sneha, Karnataka, 92
303, Amit, Maharashtra, 90
304, Kunal, Karnataka, 88
305, Nidhi, Maharashtra, 78
306, Pavan, Karnataka, 80

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window as w
data = [
    (301, "Rohit", "Maharashtra", 85),
    (302, "Sneha", "Karnataka", 92),
    (303, "Amit", "Maharashtra", 90),
    (304, "Kunal", "Karnataka", 88),
    (305, "Nidhi", "Maharashtra", 78),
    (306, "Pavan", "Karnataka", 80)
]
schema=StructType([
    StructField("student_id", StringType(), False),
    StructField("student_name", StringType(), False),
    StructField("state", StringType(), False),
    StructField("marks", StringType(), False)
])
df=spark.createDataFrame(data,schema)
df.createOrReplaceTempView("Student")
df=spark.sql('select * from (select *,dense_rank() over(partition by state order by marks desc,state asc) as rn from Student) t ')
display(df)
windowSpec=Window.partitionBy("state").orderBy(col("marks").desc())
df1=df.withColumn("rn",F.dense_rank().over(windowSpec))
df1.display()
