**Q1**

In [1]:
pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=02d47ebcfeac0cde6d499fdc172b9354653bfe1a022e9c7d600ce92df36da704
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [64]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list

In [65]:
spark = SparkSession.builder.appName("StudentClassData").getOrCreate()

In [66]:
students_columns = ["rollno", "name", "address", "age", "gender", "classid"]
students_df = spark.createDataFrame(students_data, students_columns)
students_df.show()

+------+--------+-------+---+------+-------+
|rollno|    name|address|age|gender|classid|
+------+--------+-------+---+------+-------+
|   101| Shalini| 123 St| 20|     F|      1|
|   102|    Anna| 456 St| 23|     M|      2|
|   103|    Abba| 789 St| 21|     M|      1|
|   104| Sandeep| 101 St| 22|     M|      3|
|   105|Vaishali| 202 St| 24|     F|      2|
+------+--------+-------+---+------+-------+



In [67]:
# Create the student dataframe
students_data = [
    (101, "Shalini", "123 St", 20, "F", 1),
    (102, "Anna", "456 St", 23, "M", 2),
    (103, "Abba", "789 St", 21, "M", 1),
    (104, "Sandeep", "101 St", 22, "M", 3),
    (105, "Vaishali", "202 St", 24, "F", 2)
]

In [68]:
class_columns = ["classid", "classname"]
class_df = spark.createDataFrame(class_data, class_columns)

In [69]:
# Create the class dataframe
class_data = [
    (1, "Math"),
    (2, "Science"),
    (3, "History")
]

In [70]:
print("Class DataFrame:")
class_df.show()

Class DataFrame:
+-------+---------+
|classid|classname|
+-------+---------+
|      1|     Math|
|      2|  Science|
|      3|  History|
+-------+---------+



In [71]:
# Display student names with gender = M and gender = F separately
male_students = students_df.filter(col("gender") == "M")
female_students = students_df.filter(col("gender") == "F")

In [72]:
print("Male Students:")
male_students.select("name").show()

Male Students:
+-------+
|   name|
+-------+
|   Anna|
|   Abba|
|Sandeep|
+-------+



In [73]:
print("Female Students:")
female_students.select("name").show()

Female Students:
+--------+
|    name|
+--------+
| Shalini|
|Vaishali|
+--------+



In [74]:
# Display students of a specific class
class_name = "Math"
class_id = class_df.filter(col("classname") == class_name).select("classid").collect()[0][0]
students_in_class = students_df.filter(col("classid") == class_id)

In [75]:
print(f"Students in class {class_name}:")
students_in_class.show()

Students in class Math:
+------+-------+-------+---+------+-------+
|rollno|   name|address|age|gender|classid|
+------+-------+-------+---+------+-------+
|   101|Shalini| 123 St| 20|     F|      1|
|   103|   Abba| 789 St| 21|     M|      1|
+------+-------+-------+---+------+-------+



In [76]:
# Display students whose age > 22
students_above_22 = students_df.filter(col("age") > 22)

In [77]:
print("Students whose age is greater than 22:")
students_above_22.show()

Students whose age is greater than 22:
+------+--------+-------+---+------+-------+
|rollno|    name|address|age|gender|classid|
+------+--------+-------+---+------+-------+
|   102|    Anna| 456 St| 23|     M|      2|
|   105|Vaishali| 202 St| 24|     F|      2|
+------+--------+-------+---+------+-------+



In [78]:
# Add new column 'grade' to student dataframe
students_df = students_df.withColumn("grade", col("rollno").cast("string").substr(-1, 1))
grade_mapping = {"1": "A", "2": "B", "3": "C", "4": "C", "5": "B"}
students_df = students_df.replace(grade_mapping, subset=["grade"])

In [79]:
print("Students DataFrame with Grade:")
students_df.show()

Students DataFrame with Grade:
+------+--------+-------+---+------+-------+-----+
|rollno|    name|address|age|gender|classid|grade|
+------+--------+-------+---+------+-------+-----+
|   101| Shalini| 123 St| 20|     F|      1|    A|
|   102|    Anna| 456 St| 23|     M|      2|    B|
|   103|    Abba| 789 St| 21|     M|      1|    C|
|   104| Sandeep| 101 St| 22|     M|      3|    C|
|   105|Vaishali| 202 St| 24|     F|      2|    B|
+------+--------+-------+---+------+-------+-----+



In [80]:
# Display class names and their associated students
merged_df = students_df.join(class_df, "classid")
merged_df.groupBy("classname").agg(collect_list("name").alias("students")).show(truncate=False)

+---------+----------------+
|classname|students        |
+---------+----------------+
|Science  |[Anna, Vaishali]|
|Math     |[Shalini, Abba] |
|History  |[Sandeep]       |
+---------+----------------+



**Q2**

In [81]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [82]:
# Initialize Spark session
spark = SparkSession.builder.appName("EmployeeDepartmentData").getOrCreate()

In [83]:
emp_columns = ["eno", "ename", "gender", "designation", "city", "salary", "dno"]
emp_df = spark.createDataFrame(emp_data, emp_columns)

In [84]:
# Create the emp dataframe
emp_data = [
    (1, "Alice", "F", "Developer", "New York", 90000, 101),
    (2, "Bob", "M", "Manager", "San Francisco", 120000, 102),
    (3, "Charlie", "M", "Analyst", "Los Angeles", 75000, 103),
    (4, "David", "M", "Developer", "New York", 85000, 101),
    (5, "Eve", "F", "Manager", "Boston", 110000, 104)
]

In [103]:
emp_df.show()

+---+-----+------+-----------+-------------+------+---+
|eno|ename|gender|designation|         city|salary|dno|
+---+-----+------+-----------+-------------+------+---+
|  2|  Bob|     M|    Manager|San Francisco|130000|102|
|  5|  Eve|     F|    Manager|       Boston|120000|104|
|  6|Frank|     M|    Analyst|      Chicago| 60000|105|
|  7|Grace|     F|  Developer|San Francisco| 95000|101|
|  8|Henry|     M|    Manager|     New York|130000|104|
+---+-----+------+-----------+-------------+------+---+



In [85]:
dept_columns = ["dno", "dname"]
dept_df = spark.createDataFrame(dept_data, dept_columns)

In [86]:
# Create the dept dataframe
dept_data = [
    (101, "IT"),
    (102, "HR"),
    (103, "Finance"),
    (104, "Sales"),
    (105, "Marketing")
]

In [104]:
dept_df.show()

+---+----------+
|dno|     dname|
+---+----------+
|101|        IT|
|102|        HR|
|103|   Finance|
|104|     Sales|
|105| Marketing|
|106|Operations|
|107|     Legal|
|108|       R&D|
+---+----------+



In [87]:
# Print the schema for both dataframes
print("Schema of emp DataFrame:")
emp_df.printSchema()

Schema of emp DataFrame:
root
 |-- eno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- dno: long (nullable = true)



In [88]:
print("Schema of dept DataFrame:")
dept_df.printSchema()

Schema of dept DataFrame:
root
 |-- dno: long (nullable = true)
 |-- dname: string (nullable = true)



In [89]:
# Filter emp dataframe based on designation
print("Employees with designation 'Developer':")
emp_df.filter(col("designation") == "Developer").show()

Employees with designation 'Developer':
+---+-----+------+-----------+--------+------+---+
|eno|ename|gender|designation|    city|salary|dno|
+---+-----+------+-----------+--------+------+---+
|  1|Alice|     F|  Developer|New York| 90000|101|
|  4|David|     M|  Developer|New York| 85000|101|
+---+-----+------+-----------+--------+------+---+



In [90]:
# Filter emp dataframe based on salary
print("Employees with salary greater than 80000:")
emp_df.filter(col("salary") > 80000).show()

Employees with salary greater than 80000:
+---+-----+------+-----------+-------------+------+---+
|eno|ename|gender|designation|         city|salary|dno|
+---+-----+------+-----------+-------------+------+---+
|  1|Alice|     F|  Developer|     New York| 90000|101|
|  2|  Bob|     M|    Manager|San Francisco|120000|102|
|  4|David|     M|  Developer|     New York| 85000|101|
|  5|  Eve|     F|    Manager|       Boston|110000|104|
+---+-----+------+-----------+-------------+------+---+



In [91]:
# Show data of departments for female employees
print("Departments of female employees:")
emp_df.filter(col("gender") == "F").join(dept_df, "dno").select("ename", "dname").show()

Departments of female employees:
+-----+-----+
|ename|dname|
+-----+-----+
|Alice|   IT|
|  Eve|Sales|
+-----+-----+



In [92]:
# Increase salary of employees whose designation is 'Manager'
emp_df = emp_df.withColumn("salary", col("salary") + 10000).where(col("designation") == "Manager")

In [93]:
print("Updated employee data with increased salary for Managers:")
emp_df.show()

Updated employee data with increased salary for Managers:
+---+-----+------+-----------+-------------+------+---+
|eno|ename|gender|designation|         city|salary|dno|
+---+-----+------+-----------+-------------+------+---+
|  2|  Bob|     M|    Manager|San Francisco|130000|102|
|  5|  Eve|     F|    Manager|       Boston|120000|104|
+---+-----+------+-----------+-------------+------+---+



In [94]:
# Add 3 more records to each dataframe
additional_emp_data = [
    (6, "Frank", "M", "Analyst", "Chicago", 60000, 105),
    (7, "Grace", "F", "Developer", "San Francisco", 95000, 101),
    (8, "Henry", "M", "Manager", "New York", 130000, 104)
]

In [95]:
additional_dept_data = [
    (106, "Operations"),
    (107, "Legal"),
    (108, "R&D")
]

In [96]:
# Adding additional records to emp dataframe
additional_emp_df = spark.createDataFrame(additional_emp_data, emp_columns)
emp_df = emp_df.union(additional_emp_df)

In [97]:
# Adding additional records to dept dataframe
additional_dept_df = spark.createDataFrame(additional_dept_data, dept_columns)
dept_df = dept_df.union(additional_dept_df)

In [98]:
# Print the schema for both dataframes after adding new records
print("Schema of emp DataFrame after adding new records:")
emp_df.printSchema()

Schema of emp DataFrame after adding new records:
root
 |-- eno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- dno: long (nullable = true)



In [99]:
print("Schema of dept DataFrame after adding new records:")
dept_df.printSchema()

Schema of dept DataFrame after adding new records:
root
 |-- dno: long (nullable = true)
 |-- dname: string (nullable = true)



In [100]:
# Show the use of join in order to fetch unique records
print("Unique records by joining emp and dept on 'dno':")
unique_records_df = emp_df.join(dept_df, "dno").dropDuplicates()
unique_records_df.show()

Unique records by joining emp and dept on 'dno':
+---+---+-----+------+-----------+-------------+------+---------+
|dno|eno|ename|gender|designation|         city|salary|    dname|
+---+---+-----+------+-----------+-------------+------+---------+
|101|  7|Grace|     F|  Developer|San Francisco| 95000|       IT|
|102|  2|  Bob|     M|    Manager|San Francisco|130000|       HR|
|104|  5|  Eve|     F|    Manager|       Boston|120000|    Sales|
|104|  8|Henry|     M|    Manager|     New York|130000|    Sales|
|105|  6|Frank|     M|    Analyst|      Chicago| 60000|Marketing|
+---+---+-----+------+-----------+-------------+------+---------+



In [101]:
# Show department-wise list of employees
print("Department-wise list of employees:")
department_wise_employees_df = emp_df.join(dept_df, "dno").groupBy("dname").agg(collect_list("ename").alias("employees"))
department_wise_employees_df.show(truncate=False)

Department-wise list of employees:
+---------+------------+
|dname    |employees   |
+---------+------------+
|Sales    |[Eve, Henry]|
|HR       |[Bob]       |
|Marketing|[Frank]     |
|IT       |[Grace]     |
+---------+------------+



In [102]:
# List of employees whose salary is less than 20000 and designation is 'Analyst'
print("List of employees whose salary is less than 20000 and designation is 'Analyst':")
emp_df.filter((col("salary") < 20000) & (col("designation") == "Analyst")).show()

List of employees whose salary is less than 20000 and designation is 'Analyst':
+---+-----+------+-----------+----+------+---+
|eno|ename|gender|designation|city|salary|dno|
+---+-----+------+-----------+----+------+---+
+---+-----+------+-----------+----+------+---+



**Q3**

In [105]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, date_format, min, max, count

In [106]:
# Initialize Spark session
spark = SparkSession.builder.appName("ProductRevenueData").getOrCreate()

In [107]:
# Create the Product dataframe
product_data = [
    (1, "Product A", "Type 1", 100),
    (2, "Product B", "Type 2", 200),
    (3, "Product C", "Type 3", 300),
    (4, "Product D", "Type 1", 150),
    (5, "Product E", "Type 2", 250)
]

In [108]:
product_columns = ["product_id", "pname", "ptype", "price"]
product_df = spark.createDataFrame(product_data, product_columns)

In [109]:
# Create the Customer dataframe
customer_data = [
    (1, "Alice", "1234567890", "Pune"),
    (2, "Bob", "0987654321", "Mumbai"),
    (3, "Charlie", "5555555555", "Pune"),
    (4, "David", "4444444444", "Delhi"),
    (5, "Eve", "3333333333", "Pune")
]

In [113]:
customer_columns = ["cust_id", "cname", "mobileno", "city"]
customer_df = spark.createDataFrame(customer_data, customer_columns)

In [114]:
# Create the Orders dataframe
orders_data = [
    (1, "2013-08-01", 1, "COMPLETE"),
    (2, "2013-08-02", 2, "CLOSED"),
    (3, "2013-08-01", 3, "PENDING"),
    (4, "2013-08-05", 4, "COMPLETE"),
    (5, "2013-08-10", 5, "CLOSED")
]

In [115]:
orders_columns = ["order_id", "order_date", "order_customer_id", "order_status"]
orders_df = spark.createDataFrame(orders_data, orders_columns)

In [116]:
# Create the Order Items dataframe
order_items_data = [
    (1, 1, 2, 200),
    (2, 2, 1, 200),
    (3, 3, 3, 900),
    (4, 4, 4, 600),
    (5, 5, 2, 500)
]

In [117]:
order_items_columns = ["order_item_order_id", "order_item_product_id", "order_item_quantity", "order_item_subtotal"]
order_items_df = spark.createDataFrame(order_items_data, order_items_columns)

In [118]:
# Get details of all customers from Pune city
print("Customers from Pune:")
customers_from_pune = customer_df.filter(col("city") == "Pune")
customers_from_pune.show()

Customers from Pune:
+-------+-------+----------+----+
|cust_id|  cname|  mobileno|city|
+-------+-------+----------+----+
|      1|  Alice|1234567890|Pune|
|      3|Charlie|5555555555|Pune|
|      5|    Eve|3333333333|Pune|
+-------+-------+----------+----+



In [119]:
# Get details of orders with subtotal > some value in a specific month
subtotal_threshold = 500
month = "08"  # August
orders_with_high_subtotal = orders_df.join(order_items_df, orders_df["order_id"] == order_items_df["order_item_order_id"]) \
    .filter((order_items_df["order_item_subtotal"] > subtotal_threshold) & (date_format(orders_df["order_date"], "MM") == month))
print(f"Orders with subtotal > {subtotal_threshold} in month {month}:")
orders_with_high_subtotal.show()

Orders with subtotal > 500 in month 08:
+--------+----------+-----------------+------------+-------------------+---------------------+-------------------+-------------------+
|order_id|order_date|order_customer_id|order_status|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|
+--------+----------+-----------------+------------+-------------------+---------------------+-------------------+-------------------+
|       3|2013-08-01|                3|     PENDING|                  3|                    3|                  3|                900|
|       4|2013-08-05|                4|    COMPLETE|                  4|                    4|                  4|                600|
+--------+----------+-----------------+------------+-------------------+---------------------+-------------------+-------------------+



In [120]:
# Print orders in ascending order of subtotal
print("Orders in ascending order of subtotal:")
orders_sorted_by_subtotal = orders_df.join(order_items_df, orders_df["order_id"] == order_items_df["order_item_order_id"]) \
    .orderBy(order_items_df["order_item_subtotal"])
orders_sorted_by_subtotal.show()

Orders in ascending order of subtotal:
+--------+----------+-----------------+------------+-------------------+---------------------+-------------------+-------------------+
|order_id|order_date|order_customer_id|order_status|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|
+--------+----------+-----------------+------------+-------------------+---------------------+-------------------+-------------------+
|       1|2013-08-01|                1|    COMPLETE|                  1|                    1|                  2|                200|
|       2|2013-08-02|                2|      CLOSED|                  2|                    2|                  1|                200|
|       5|2013-08-10|                5|      CLOSED|                  5|                    5|                  2|                500|
|       4|2013-08-05|                4|    COMPLETE|                  4|                    4|                  4|                600|
|       3|2013-0

In [121]:
# Print customer details with min order and max order amount
print("Customer details with min and max order amount:")
customer_order_amounts = orders_df.join(order_items_df, orders_df["order_id"] == order_items_df["order_item_order_id"]) \
    .groupBy("order_customer_id") \
    .agg(min("order_item_subtotal").alias("min_order_amount"), max("order_item_subtotal").alias("max_order_amount"))
customer_details_with_orders = customer_order_amounts.join(customer_df, customer_order_amounts["order_customer_id"] == customer_df["cust_id"])
customer_details_with_orders.show()

Customer details with min and max order amount:
+-----------------+----------------+----------------+-------+-------+----------+------+
|order_customer_id|min_order_amount|max_order_amount|cust_id|  cname|  mobileno|  city|
+-----------------+----------------+----------------+-------+-------+----------+------+
|                1|             200|             200|      1|  Alice|1234567890|  Pune|
|                2|             200|             200|      2|    Bob|0987654321|Mumbai|
|                5|             500|             500|      5|    Eve|3333333333|  Pune|
|                3|             900|             900|      3|Charlie|5555555555|  Pune|
|                4|             600|             600|      4|  David|4444444444| Delhi|
+-----------------+----------------+----------------+-------+-------+----------+------+



In [122]:
# Get orders which are either COMPLETE or CLOSED
print("Orders which are either COMPLETE or CLOSED:")
complete_or_closed_orders = orders_df.filter((col("order_status") == "COMPLETE") | (col("order_status") == "CLOSED"))
complete_or_closed_orders.show()

Orders which are either COMPLETE or CLOSED:
+--------+----------+-----------------+------------+
|order_id|order_date|order_customer_id|order_status|
+--------+----------+-----------------+------------+
|       1|2013-08-01|                1|    COMPLETE|
|       2|2013-08-02|                2|      CLOSED|
|       4|2013-08-05|                4|    COMPLETE|
|       5|2013-08-10|                5|      CLOSED|
+--------+----------+-----------------+------------+



In [123]:
# Get orders which are either COMPLETE or CLOSED and placed in the month of August 2013
print("Orders which are either COMPLETE or CLOSED and placed in August 2013:")
complete_or_closed_august_orders = complete_or_closed_orders.filter((date_format(col("order_date"), "yyyy-MM") == "2013-08"))
complete_or_closed_august_orders.show()

Orders which are either COMPLETE or CLOSED and placed in August 2013:
+--------+----------+-----------------+------------+
|order_id|order_date|order_customer_id|order_status|
+--------+----------+-----------------+------------+
|       1|2013-08-01|                1|    COMPLETE|
|       2|2013-08-02|                2|      CLOSED|
|       4|2013-08-05|                4|    COMPLETE|
|       5|2013-08-10|                5|      CLOSED|
+--------+----------+-----------------+------------+



In [124]:
# Get order items where order_item_subtotal is not equal to the product of order_item_quantity and product price
print("Order items where order_item_subtotal is not equal to quantity * product price:")
order_items_with_discrepancy = order_items_df.join(product_df, order_items_df["order_item_product_id"] == product_df["product_id"]) \
    .filter(order_items_df["order_item_subtotal"] != (order_items_df["order_item_quantity"] * product_df["price"]))
order_items_with_discrepancy.show()

Order items where order_item_subtotal is not equal to quantity * product price:
+-------------------+---------------------+-------------------+-------------------+----------+-----+-----+-----+
|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|product_id|pname|ptype|price|
+-------------------+---------------------+-------------------+-------------------+----------+-----+-----+-----+
+-------------------+---------------------+-------------------+-------------------+----------+-----+-----+-----+



In [125]:
# Get all orders placed on the first of every month
print("Orders placed on the first of every month:")
orders_on_first_day = orders_df.filter(date_format(col("order_date"), "dd") == "01")
orders_on_first_day.show()

Orders placed on the first of every month:
+--------+----------+-----------------+------------+
|order_id|order_date|order_customer_id|order_status|
+--------+----------+-----------------+------------+
|       1|2013-08-01|                1|    COMPLETE|
|       3|2013-08-01|                3|     PENDING|
+--------+----------+-----------------+------------+



In [126]:
# Get count by status from orders
print("Count by status from orders:")
count_by_status = orders_df.groupBy("order_status").agg(count("*").alias("status_count"))
count_by_status.show()

Count by status from orders:
+------------+------------+
|order_status|status_count|
+------------+------------+
|    COMPLETE|           2|
|      CLOSED|           2|
|     PENDING|           1|
+------------+------------+



In [127]:
# Get revenue for each order id from order items
print("Revenue for each order id from order items:")
revenue_per_order = order_items_df.groupBy("order_item_order_id").agg(sum("order_item_subtotal").alias("total_revenue"))
revenue_per_order.show()

Revenue for each order id from order items:
+-------------------+-------------+
|order_item_order_id|total_revenue|
+-------------------+-------------+
|                  1|          200|
|                  2|          200|
|                  5|          500|
|                  3|          900|
|                  4|          600|
+-------------------+-------------+



In [128]:
# Get daily product revenue (order_date and order_item_product_id are part of keys, order_item_subtotal is used for aggregation)
print("Daily product revenue:")
daily_product_revenue = orders_df.join(order_items_df, orders_df["order_id"] == order_items_df["order_item_order_id"]) \
    .groupBy("order_date", "order_item_product_id") \
    .agg(sum("order_item_subtotal").alias("daily_revenue"))
daily_product_revenue.show()

Daily product revenue:
+----------+---------------------+-------------+
|order_date|order_item_product_id|daily_revenue|
+----------+---------------------+-------------+
|2013-08-02|                    2|          200|
|2013-08-05|                    4|          600|
|2013-08-10|                    5|          500|
|2013-08-01|                    1|          200|
|2013-08-01|                    3|          900|
+----------+---------------------+-------------+

