In [2]:
# =====================================================
# LeetCode-Style Practice Dataset for PySpark
# Target: PySpark (Spark 3.x)
# Purpose: Same logical data as SQL Server dataset
# =====================================================

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Window

spark = SparkSession.builder \
    .appName("LeetCodeStyleSQLDataset") \
    .getOrCreate()

# ---------------------
# Departments
# ---------------------
departments_schema = StructType([
    StructField("DeptId", IntegerType(), False),
    StructField("DeptName", StringType(), True)
])

departments_data = [
    (1, "IT"),
    (2, "HR"),
    (3, "Sales")
]

Departments = spark.createDataFrame(departments_data, departments_schema)

# ---------------------
# Employees
# ---------------------
employees_schema = StructType([
    StructField("EmpId", IntegerType(), False),
    StructField("EmpName", StringType(), True),
    StructField("Salary", IntegerType(), True),
    StructField("DeptId", IntegerType(), True),
    StructField("ManagerId", IntegerType(), True)
])

employees_data = [
    (1, "Alice", 90000, 1, None),
    (2, "Bob", 80000, 1, 1),
    (3, "Charlie", 80000, 1, 1),
    (4, "David", 60000, 2, None),
    (5, "Eva", 70000, None, None),
    (6, "Frank", 90000, 3, None)
]

Employees = spark.createDataFrame(employees_data, employees_schema)

# ---------------------
# Customers
# ---------------------
customers_schema = StructType([
    StructField("CustomerId", IntegerType(), False),
    StructField("CustomerName", StringType(), True)
])

customers_data = [
    (1, "John"),
    (2, "Jane"),
    (3, "Alex")
]

Customers = spark.createDataFrame(customers_data, customers_schema)

# ---------------------
# Products
# ---------------------
products_schema = StructType([
    StructField("ProductId", IntegerType(), False),
    StructField("ProductName", StringType(), True),
    StructField("Price", IntegerType(), True)
])

products_data = [
    (1, "Laptop", 1000),
    (2, "Phone", 500),
    (3, "Tablet", 300)
]

Products = spark.createDataFrame(products_data, products_schema)

# ---------------------
# Orders
# ---------------------
orders_schema = StructType([
    StructField("OrderId", IntegerType(), False),
    StructField("CustomerId", IntegerType(), True),
    StructField("ProductId", IntegerType(), True),
    StructField("OrderDate", StringType(), True),
    StructField("Quantity", IntegerType(), True)
])

orders_data = [
    (1, 1, 1, "2024-01-01", 1),
    (2, 1, 2, "2024-01-02", 2),
    (3, 2, 2, "2024-01-03", 1),
    (4, 2, 3, "2024-01-10", 3),
    (5, 3, 1, "2024-01-11", 1)
]

Orders = spark.createDataFrame(orders_data, orders_schema)

# ---------------------
# Logs
# ---------------------
logs_schema = StructType([
    StructField("LogId", IntegerType(), True)
])

logs_data = [(1,), (1,), (2,), (3,), (3,), (3,), (5,), (6,)]

Logs = spark.createDataFrame(logs_data, logs_schema)

# ---------------------
# Stadium
# ---------------------
stadium_schema = StructType([
    StructField("VisitDate", StringType(), True),
    StructField("People", IntegerType(), True)
])

stadium_data = [
    ("2024-01-01", 10),
    ("2024-01-02", 120),
    ("2024-01-03", 130),
    ("2024-01-04", 140),
    ("2024-01-05", 20)
]

Stadium = spark.createDataFrame(stadium_data, stadium_schema)

# ---------------------
# Users
# ---------------------
users_schema = StructType([
    StructField("UserId", IntegerType(), False),
    StructField("Banned", StringType(), True)
])

users_data = [
    (1, "No"),
    (2, "Yes"),
    (3, "No"),
    (4, "No")
]

Users = spark.createDataFrame(users_data, users_schema)

# ---------------------
# Trips
# ---------------------
trips_schema = StructType([
    StructField("TripId", IntegerType(), False),
    StructField("ClientId", IntegerType(), True),
    StructField("DriverId", IntegerType(), True),
    StructField("Status", StringType(), True),
    StructField("RequestDate", StringType(), True)
])

trips_data = [
    (1, 1, 3, "completed", "2024-01-01"),
    (2, 2, 3, "cancelled_by_driver", "2024-01-01"),
    (3, 1, 4, "cancelled_by_client", "2024-01-02"),
    (4, 3, 4, "completed", "2024-01-02")
]

Trips = spark.createDataFrame(trips_data, trips_schema)

# ---------------------
# Movies
# ---------------------
movies_schema = StructType([
    StructField("MovieId", IntegerType(), False),
    StructField("Title", StringType(), True)
])

movies_data = [
    (1, "Inception"),
    (2, "Interstellar"),
    (3, "Dunkirk")
]

Movies = spark.createDataFrame(movies_data, movies_schema)

# ---------------------
# MovieRatings
# ---------------------
ratings_schema = StructType([
    StructField("MovieId", IntegerType(), True),
    StructField("UserId", IntegerType(), True),
    StructField("Rating", IntegerType(), True)
])

ratings_data = [
    (1, 1, 5),
    (1, 2, 4),
    (2, 1, 5),
    (2, 3, 5),
    (3, 2, 3)
]

MovieRatings = spark.createDataFrame(ratings_data, ratings_schema)

print("LeetCode-style PySpark datasets loaded and registered.")


LeetCode-style PySpark datasets loaded and registered.


### Question 1 — Department Highest Salary

For each department, find the employee(s) who earn the **highest salary**.

Constraints:

- Multiple employees may share the same highest salary.
- Employees without a department must be excluded.

Expected columns:

- `DeptName`, `EmpName`, `Salary`

In [16]:
Window_spec = Window.partitionBy('DeptName').orderBy(desc('Salary'))

Q1 = Departments.alias('D').join(Employees.alias('E'), on = col('E.DeptID')==col('D.DeptId'),how = 'left').select(col('D.DeptName'), col('E.EmpName'), col('E.Salary'))
Q1 = Q1.withColumn('rn', row_number().over(Window_spec)).filter(col('rn')==1).drop('rn')
Q1.show()

+--------+-------+------+
|DeptName|EmpName|Salary|
+--------+-------+------+
|      HR|  David| 60000|
|      IT|  Alice| 90000|
|   Sales|  Frank| 90000|
+--------+-------+------+



### Question 2 — Top Customer by Total Spend

Using `Customers`, `Orders`, and `Products`, find the customer who has spent the **most money in total**.

Notes:

- Total spend = `Quantity * Price`
- Customers with no orders should not appear.
- Ties must be handled correctly.

Expected columns:

- `CustomerName`, `TotalSpend`

In [31]:
Window_spec = Window.orderBy(desc(col('TotalSpend')))
Q2 = Customers.alias('C').join(Orders.alias('O'), on = col('C.CustomerId')==col('O.CustomerId'), how='left')\
        .join(Products.alias('P'), on = col('P.ProductId')==col('O.ProductId'), how = 'inner')\
        .withColumn('TotalSpend', col('O.Quantity')*col('P.Price')).drop('CustomerId','OrderId','ProductId','OrderDate','Quantity','ProductName','Price')
Q2.withColumn('rnk', dense_rank().over(Window_spec)).distinct().filter(col('rnk')==1).drop('rnk').show()

+------------+----------+
|CustomerName|TotalSpend|
+------------+----------+
|        Alex|      1000|
|        John|      1000|
+------------+----------+



### Question 3 — Customers Who Bought All Products

Find customers who have purchased **every product** listed in the `Products` table.

Constraints:

- Quantity does not matter.
- Missing one product disqualifies the customer.

Expected columns:

- `CustomerName`

In [46]:
Q3 = Customers.alias('C').join(Orders.alias('O'), on = col('C.CustomerId')==col('O.CustomerId'), how='left')\
    .join(Products.alias('P'), on = col('P.ProductId')==col('O.ProductId'), how = 'inner')\
    .groupBy('CustomerName').agg(count(col('O.ProductId')).alias('OrderedProductsCount'))\
    .filter(col('OrderedProductsCount')==Products.count()).drop('OrderedProductsCount')
Q3.show()

+------------+
|CustomerName|
+------------+
+------------+



### Question 4 — Cancellation Rate by Day

Using `Trips` and `Users`, calculate the **daily cancellation rate**.

Rules:

- Exclude trips where either the client or the driver is banned.
- A trip is cancelled if its status is NOT `completed`.
- Cancellation rate = cancelled trips / total trips per day.

Expected columns:

- `RequestDate`, `CancellationRate`

In [80]:
Q4_1 = Trips.alias('T').join(Users.alias('U'), on = col('T.ClientId')==col('U.UserId'), how = 'inner')\
        .join(Users.alias('D'), on = col('T.DriverId')==col('D.UserId')).filter((col('U.Banned')=='No') & (col('D.Banned')=='No') & (col('status')=='completed'))\
        .groupBy('RequestDate').agg(count('TripId').alias('CompletedTrips'))
Q4_2 = Trips.groupBy('RequestDate').agg(count('TripId').alias('TotalTrips'))

In [93]:
Q4 = Q4_1.alias('C').join(Q4_2.alias('T'), on = col('C.RequestDate')==col('T.RequestDate'), how = 'inner')\
        .select(col('C.RequestDate'),col('CompletedTrips'), col('TotalTrips'))\
        .withColumn('CancellationRate', col('CompletedTrips')/col('TotalTrips')*100)\
        .drop('CompletedTrips','TotalTrips')
Q4.orderBy('RequestDate').show()

+-----------+----------------+
|RequestDate|CancellationRate|
+-----------+----------------+
| 2024-01-01|            50.0|
| 2024-01-02|            50.0|
+-----------+----------------+



### Question 5 — Average Salary by Department (Including Empty Departments)

Compute the **average salary per department**.

Constraints:

- Departments with no employees must still appear.
- Employees without a department must not affect averages.

Expected columns:

- `DeptName`, `AverageSalary`

In [97]:
Q5 = Departments.alias('D').join(Employees.alias('E'), on = col('D.DeptId')==col('E.DeptId'), how = 'left')\
        .groupBy('D.DeptName').agg(avg('Salary').alias('AverageSalary'))
Q5.show()

+--------+-----------------+
|DeptName|    AverageSalary|
+--------+-----------------+
|   Sales|          90000.0|
|      HR|          60000.0|
|      IT|83333.33333333333|
+--------+-----------------+

