# Databricks Spark Practice - 30 Questions

## Introduction

This notebook contains 30 comprehensive practice questions covering all major PySpark concepts. These questions are designed to be solved on Databricks and will help you master:

- SparkSession and basic operations
- Reading and writing data
- DataFrame transformations
- Aggregations and GroupBy
- Spark SQL
- Joins
- Window functions
- Complex data types
- Performance optimization
- Databricks-specific features

## Instructions

1. **In Databricks**: SparkSession is automatically available as `spark`
2. **For local testing**: Uncomment the SparkSession creation code in the setup cell
3. Complete each exercise in the provided code cells
4. Run the data setup cells first to create sample data
5. Test your solutions by running the code and checking outputs
6. Refer back to the PySpark module notebooks if you need help


## Data Setup

Run the cells below to set up all the sample data needed for the exercises.


In [0]:
# In Databricks, SparkSession is already available
# For local testing, uncomment the following:

# from pyspark.sql import SparkSession
# spark = SparkSession.builder \
#     .appName("Databricks Practice") \
#     .master("local[*]") \
#     .getOrCreate()

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType, ArrayType
from pyspark.sql.functions import col, when, lit, expr, sum, avg, count, max, min, row_number, rank, dense_rank, lead, lag, window

print("Setup complete! SparkSession ready.")


In [0]:
# Create employees DataFrame
employees_data = [
    (1, "Alice", 25, "Sales", 50000, "NYC", "2020-01-15"),
    (2, "Bob", 30, "IT", 60000, "LA", "2019-03-20"),
    (3, "Charlie", 35, "Sales", 70000, "Chicago", "2018-06-10"),
    (4, "Diana", 28, "IT", 55000, "NYC", "2021-02-14"),
    (5, "Eve", 32, "HR", 65000, "Houston", "2019-11-05"),
    (6, "Frank", 27, "Sales", 52000, "LA", "2022-01-08"),
    (7, "Grace", 29, "IT", 58000, "Chicago", "2020-09-12"),
    (8, "Henry", 31, "HR", 62000, "NYC", "2018-12-01"),
    (9, "Ivy", 26, "Sales", 51000, "Houston", "2021-07-22"),
    (10, "Jack", 33, "Finance", 75000, "LA", "2017-05-30")
]

employees_schema = StructType([
    StructField("emp_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("department", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("city", StringType(), True),
    StructField("hire_date", StringType(), True)
])

df_employees = spark.createDataFrame(employees_data, employees_schema)
print("Employees DataFrame created:")
df_employees.show()


In [0]:
# Create departments DataFrame
departments_data = [
    ("Sales", "John", 1000000),
    ("IT", "Sarah", 1500000),
    ("HR", "Mike", 800000),
    ("Finance", "Lisa", 1200000),
    ("Marketing", "Tom", 900000)
]

departments_schema = StructType([
    StructField("dept_name", StringType(), True),
    StructField("manager", StringType(), True),
    StructField("budget", IntegerType(), True)
])

df_departments = spark.createDataFrame(departments_data, departments_schema)
print("Departments DataFrame created:")
df_departments.show()


In [0]:
# Create sales DataFrame
sales_data = [
    (1, "2024-01-15", 1000, "Product A"),
    (1, "2024-02-20", 1500, "Product B"),
    (2, "2024-01-10", 2000, "Product A"),
    (3, "2024-02-05", 1200, "Product C"),
    (1, "2024-03-12", 1800, "Product A"),
    (4, "2024-01-25", 900, "Product B"),
    (2, "2024-02-28", 2200, "Product C"),
    (5, "2024-03-01", 1100, "Product A"),
    (3, "2024-03-15", 1300, "Product B")
]

sales_schema = StructType([
    StructField("emp_id", IntegerType(), True),
    StructField("sale_date", StringType(), True),
    StructField("amount", IntegerType(), True),
    StructField("product", StringType(), True)
])

df_sales = spark.createDataFrame(sales_data, sales_schema)
print("Sales DataFrame created:")
df_sales.show()


In [0]:
# Create products DataFrame
products_data = [
    ("Product A", "Electronics", 500),
    ("Product B", "Clothing", 300),
    ("Product C", "Electronics", 800),
    ("Product D", "Food", 50)
]

products_schema = StructType([
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("base_price", IntegerType(), True)
])

df_products = spark.createDataFrame(products_data, products_schema)
print("Products DataFrame created:")
df_products.show()


---

## Questions

### Questions 1-5: Basic DataFrame Operations


### Question 1: Filter and Select

Filter `df_employees` to show only employees from the 'Sales' department, and select only the columns: `name`, `age`, and `salary`.


In [0]:

display(df_employees.filter(col("department") == "Sales").select("name", "age", "salary"))


### Question 2: Sort Data

Sort `df_employees` by `salary` in descending order and show the top 5 employees.


In [0]:
display(df_employees.orderBy(col("salary").desc()).limit(5))


### Question 3: Add Calculated Column

Add a new column `annual_bonus` to `df_employees` that is 10% of the salary. Display the result with columns: `name`, `salary`, and `annual_bonus`.


In [0]:
display(
    df_employees.withColumn("annual_bonus", col("salary") * 0.10)
    .select("name", "salary", "annual_bonus")
)

### Question 4: Conditional Logic

Create a new column `salary_category` in `df_employees` that categorizes salaries as:
- "High" if salary >= 65000
- "Medium" if salary >= 55000 and < 65000
- "Low" if salary < 55000

Show `name`, `salary`, and `salary_category`.


In [0]:
display(
    df_employees.withColumn(
        "salary_category",
        when(col("salary") >= 65000, "High")
        .when(col("salary") >= 55000, "Medium")
        .otherwise("Low")
    ).select("name", "salary", "salary_category")
)

### Question 5: Remove Duplicates and Null Handling

Filter `df_employees` to remove any rows where `age` is null, then remove duplicate rows based on all columns. Count the total number of rows remaining.


In [0]:
filtered_df = df_employees.filter(col("age").isNotNull()).dropDuplicates()
row_count = filtered_df.count()
row_count

---

### Questions 6-10: Aggregations and GroupBy


### Question 6: Basic Aggregation

Calculate the average salary for each department in `df_employees`. Show department and average salary, sorted by average salary in descending order.


In [0]:
display(
    df_employees.groupBy("department")
    .agg(avg("salary").alias("average_salary"))
    .orderBy(col("average_salary").desc())
)

### Question 7: Multiple Aggregations

For each department, calculate:
- Total number of employees
- Average salary
- Maximum salary
- Minimum salary

Display the results sorted by department name.


In [0]:
display(
    df_employees.groupBy("department")
    .agg(
        count("emp_id").alias("total_employees"),
        avg("salary").alias("average_salary"),
        max("salary").alias("max_salary"),
        min("salary").alias("min_salary")
    )
    .orderBy("department")
)

### Question 8: GroupBy with Filter

Find the total sales amount (`amount`) for each employee (`emp_id`) in `df_sales`, but only include employees who have total sales greater than 2000. Show `emp_id` and total sales amount.


In [0]:
display(
    df_sales.groupBy("emp_id")
    .agg(sum("amount").alias("total_sales"))
    .filter(col("total_sales") > 2000)
    .select("emp_id", "total_sales")
)

### Question 9: Count Distinct

Count the number of distinct cities where employees work in `df_employees`. Also, for each city, count how many employees work there.


In [0]:
distinct_city_count = df_employees.select("city").distinct().count()
print("Number of distinct cities:", distinct_city_count)

display(
    df_employees.groupBy("city")
    .agg(count("emp_id").alias("employee_count"))
    .orderBy("city")
)


### Question 10: Aggregation with Conditions

Calculate the average age of employees for each department, but only include employees who are 30 years or older in the calculation.


In [0]:
display(
    df_employees.filter(col("age") >= 30)
    .groupBy("department")
    .agg(avg("age").alias("average_age_30plus"))
    .orderBy("department")
)

---

### Questions 11-15: Spark SQL


### Question 11: Create Temporary View and Query

Create a temporary view from `df_employees` called `employees_view` and write a SQL query to find all employees in the 'IT' department with salary greater than 55000. Show `name`, `age`, and `salary`.


In [0]:
df_employees.createOrReplaceTempView("employees_view")

result_df = spark.sql("""
    SELECT name, age, salary
    FROM employees_view
    WHERE department = 'IT' AND salary > 55000
""")

display(result_df)

### Question 12: SQL Aggregation

Using Spark SQL, write a query to find the department with the highest total salary. Show the department name and total salary.


In [0]:
df_employees.createOrReplaceTempView("employees_view")

result_df = spark.sql("""
    SELECT department, SUM(salary) AS total_salary
    FROM employees_view
    GROUP BY department
    ORDER BY total_salary DESC
    LIMIT 1
""")

display(result_df)

### Question 13: SQL with CASE Statement

Using Spark SQL, create a query that shows `name`, `salary`, and a new column `salary_band`:
- 'A' for salary >= 70000
- 'B' for salary >= 60000 and < 70000
- 'C' for salary < 60000


In [0]:
df_employees.createOrReplaceTempView("employees_view")

result_df = spark.sql("""
    SELECT
        name,
        salary,
        CASE
            WHEN salary >= 70000 THEN 'A'
            WHEN salary >= 60000 AND salary < 70000 THEN 'B'
            ELSE 'C'
        END AS salary_band
    FROM employees_view
""")

display(result_df)

### Question 14: SQL Subquery

Using Spark SQL, find all employees whose salary is greater than the average salary of all employees. Show `name`, `department`, and `salary`.


In [0]:
df_employees.createOrReplaceTempView("employees_view")

result_df = spark.sql("""
    SELECT name, department, salary
    FROM employees_view
    WHERE salary > (SELECT AVG(salary) FROM employees_view)
""")

display(result_df)

### Question 15: SQL Window Function

Using Spark SQL, rank employees within each department by their salary (highest salary gets rank 1). Show `name`, `department`, `salary`, and `rank`.


In [0]:
df_employees.createOrReplaceTempView("employees_view")

result_df = spark.sql("""
    SELECT
        name,
        department,
        salary,
        RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS salary_rank
    FROM employees_view
""")

display(result_df)

---

### Questions 16-20: Joins


### Question 16: Inner Join

Perform an inner join between `df_employees` and `df_departments` on `department` = `dept_name`. Show `name`, `department`, `salary`, and `manager`.


In [0]:
result_df = df_employees.join(
    df_departments,
    df_employees["department"] == df_departments["dept_name"],
    "inner"
).select(
    "name",
    df_employees["department"],
    "salary",
    "manager"
)

display(result_df)

### Question 17: Left Join

Perform a left join between `df_employees` and `df_departments` on `department` = `dept_name`. This will show all employees even if their department doesn't exist in the departments table. Show `name`, `department`, and `manager`.


In [0]:
result_df = df_employees.join(
    df_departments,
    df_employees["department"] == df_departments["dept_name"],
    "left"
).select(
    "name",
    df_employees["department"],
    "manager"
)

display(result_df)

### Question 18: Multiple Table Join

Join `df_employees`, `df_sales`, and `df_products` to show:
- Employee name
- Sale date
- Sale amount
- Product name
- Product category

Use appropriate join types to include all sales records.


In [0]:
result_df = df_sales.join(
    df_employees,
    df_sales["emp_id"] == df_employees["emp_id"],
    "left"
).join(
    df_products,
    df_sales["product"] == df_products["product_name"],
    "left"
).select(
    df_employees["name"],
    df_sales["sale_date"],
    df_sales["amount"],
    df_sales["product"],
    df_products["category"]
)

display(result_df)

### Question 19: Left Semi Join

Use a left semi join to find all employees from `df_employees` who have made at least one sale (exist in `df_sales`). Show only the employee information: `name`, `department`, and `salary`.


In [0]:
result_df = df_employees.join(
    df_sales,
    df_employees["emp_id"] == df_sales["emp_id"],
    "left_semi"
).select(
    "name",
    "department",
    "salary"
)

display(result_df)

### Question 20: Anti Join

Use an anti join to find all employees from `df_employees` who have NOT made any sales (do not exist in `df_sales`). Show `name`, `department`, and `salary`.


In [0]:
result_df = df_employees.join(
    df_sales,
    df_employees["emp_id"] == df_sales["emp_id"],
    "left_anti"
).select(
    "name",
    "department",
    "salary"
)

display(result_df)

---

### Questions 21-25: Window Functions


### Question 21: Row Number

Use a window function to assign row numbers to employees within each department, ordered by salary in descending order. Show `name`, `department`, `salary`, and `row_number`.


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy("department").orderBy(col("salary").desc())

result_df = df_employees.withColumn(
    "row_number",
    row_number().over(window_spec)
).select("name", "department", "salary", "row_number")

display(result_df)

### Question 22: Rank and Dense Rank

Calculate both `rank` and `dense_rank` for employees within each department based on salary. Show `name`, `department`, `salary`, `rank`, and `dense_rank`. Notice the difference between rank and dense_rank when there are ties.


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank

window_spec = Window.partitionBy("department").orderBy(col("salary").desc())

result_df = df_employees.withColumn(
    "rank", rank().over(window_spec)
).withColumn(
    "dense_rank", dense_rank().over(window_spec)
).select("name", "department", "salary", "rank", "dense_rank")

display(result_df)

### Question 23: Running Total

Calculate a running total of sales amounts for each employee in `df_sales`, ordered by `sale_date`. Show `emp_id`, `sale_date`, `amount`, and `running_total`.


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import sum

window_spec = Window.partitionBy("emp_id").orderBy("sale_date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

result_df = df_sales.withColumn(
    "running_total",
    sum("amount").over(window_spec)
).select("emp_id", "sale_date", "amount", "running_total")

display(result_df)

### Question 24: Lead and Lag

For each employee's sales in `df_sales`, show:
- Current sale amount
- Previous sale amount (lag)
- Next sale amount (lead)

Order by `emp_id` and `sale_date`. Show `emp_id`, `sale_date`, `amount`, `prev_amount`, and `next_amount`.


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, lead

window_spec = Window.partitionBy("emp_id").orderBy("sale_date")

result_df = df_sales.withColumn(
    "prev_amount", lag("amount", 1).over(window_spec)
).withColumn(
    "next_amount", lead("amount", 1).over(window_spec)
).select(
    "emp_id", "sale_date", "amount", "prev_amount", "next_amount"
).orderBy("emp_id", "sale_date")

display(result_df)

### Question 25: Window Aggregation

For each sale in `df_sales`, calculate:
- Average sale amount for the same employee
- Maximum sale amount for the same employee
- Minimum sale amount for the same employee

Show `emp_id`, `sale_date`, `amount`, `avg_amount`, `max_amount`, and `min_amount`.


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, max, min

window_spec = Window.partitionBy("emp_id")

result_df = df_sales.withColumn(
    "avg_amount", avg("amount").over(window_spec)
).withColumn(
    "max_amount", max("amount").over(window_spec)
).withColumn(
    "min_amount", min("amount").over(window_spec)
).select(
    "emp_id", "sale_date", "amount", "avg_amount", "max_amount", "min_amount"
)

display(result_df)

---

### Questions 26-30: Advanced Topics


### Question 26: Pivot Operation

Pivot `df_sales` to show total sales amount for each employee (`emp_id`) by product. The result should have columns: `emp_id`, `Product A`, `Product B`, `Product C` (and `Product D` if applicable).


In [0]:
result_df = df_sales.groupBy("emp_id").pivot("product", ["Product A", "Product B", "Product C", "Product D"]).sum("amount")
display(result_df)

### Question 27: Union Operation

Create two DataFrames:
1. Employees from 'Sales' department
2. Employees from 'IT' department

Union them together and show the result with columns: `name`, `department`, `salary`.


In [0]:
df_sales_dept = df_employees.filter(col("department") == "Sales").select("name", "department", "salary")
df_it_dept = df_employees.filter(col("department") == "IT").select("name", "department", "salary")
result_df = df_sales_dept.union(df_it_dept)
display(result_df)

### Question 28: Complex Aggregation with Multiple Conditions

For each department in `df_employees`, calculate:
- Total number of employees
- Number of employees with salary > 60000
- Average salary for employees with salary > 60000
- Average salary for all employees

Show all results in a single query.


In [0]:
from pyspark.sql.functions import count, avg, sum, when

result_df = df_employees.groupBy("department").agg(
    count("*").alias("total_employees"),
    sum(when(col("salary") > 60000, 1).otherwise(0)).alias("employees_salary_gt_60000"),
    avg(when(col("salary") > 60000, col("salary"))).alias("avg_salary_gt_60000"),
    avg("salary").alias("avg_salary_all")
)

display(result_df)

### Question 29: Reading and Writing Data (Databricks)

**In Databricks:**
1. Write `df_employees` to a Parquet file in Volumes at path `Volumes/workspace/default/databricks_practice/employees/`
2. Read the data back from that path into a new DataFrame
3. Verify by showing the first 5 rows

**Note:** 
- In Databricks, use the Volumes path format: `Volumes/workspace/default/<catalog_name>/<schema_name>/<path>`
- For local testing, use a local path like `./data/output/employees/`


In [0]:
# For Databricks (Volumes path):
output_path = "/Volumes/demo_catalog/demo_schema/demo_volume/sellers_dataset.csv"

df_employees.write.mode("overwrite").csv(output_path)
df_employees_parquet = spark.read.csv(output_path)
display(df_employees_parquet.limit(5))

### Question 30: Complete ETL Pipeline

Create a complete ETL pipeline that:
1. **Extract**: Join `df_employees` and `df_sales` to get employee sales data
2. **Transform**: 
   - Calculate total sales per employee
   - Add a column `performance` that is "Excellent" if total sales > 3000, "Good" if > 2000, else "Average"
   - Join with `df_employees` to get employee details
3. **Load**: Select and display the final result with columns: `name`, `department`, `total_sales`, `performance`

Chain all operations together.


In [0]:
from pyspark.sql.functions import sum, when, col

result_df = (
    df_employees.join(df_sales, "emp_id", "inner")
    .groupBy("emp_id")
    .agg(sum("amount").alias("total_sales"))
    .withColumn(
        "performance",
        when(col("total_sales") > 3000, "Excellent")
        .when(col("total_sales") > 2000, "Good")
        .otherwise("Average")
    )
    .join(df_employees.select("emp_id", "name", "department"), "emp_id", "inner")
    .select("name", "department", "total_sales", "performance")
)

display(result_df)

---

## Additional Challenges (Optional)

If you've completed all 30 questions, try these advanced challenges:

1. **Performance Optimization**: Repartition `df_employees` by `department` and cache it. Measure the performance improvement.

2. **Complex Window Function**: Calculate the 3-month moving average of sales for each employee.

3. **Broadcast Join**: Use broadcast join hint for joining `df_employees` with `df_departments` (assuming departments is small).

4. **Date Operations**: Convert `hire_date` in `df_employees` to DateType and calculate the number of years each employee has been with the company.

5. **Array Operations**: Create an array column containing all cities where each department has employees, then explode it.

---

## Summary

Congratulations on completing the practice questions! These exercises covered:

✅ Basic DataFrame operations (filter, select, sort)

✅ Aggregations and GroupBy

✅ Spark SQL queries

✅ Various join types

✅ Window functions

✅ Advanced transformations

✅ ETL pipeline creation

Keep practicing and refer back to the PySpark module notebooks for detailed explanations!
