
# Chapter 4: Advanced Data Engineering Techniques

Welcome to Chapter 4! In this notebook, you'll practice advanced PySpark data engineering techniques, including UDFs, Spark SQL, partitioning, working with nested data, and more. This chapter is standalone: all data is freshly loaded and cleaned before you begin practicing.

## What You'll Do
- Practice advanced data engineering techniques on real-world data
- Learn with generic sample syntax, then apply concepts to your actual DataFrames
- Tackle interview-style and real-world analytics questions



## Important Instructions
- Sample syntax is for illustration only and uses generic DataFrame names (e.g., `df`, `input_df`).
- Always use the actual DataFrame names provided in the practice questions (e.g., `customers_df`, `orders_df`).
- Do not copy-paste the sample code for the practice question. Try to solve it yourself using the actual DataFrame.
- This is for your own practice, so type the commands even if the question is similar to the example.
- Don't execute the code mentioned in syntax as it may modify the data.
- Avoid using AI for code completion.
- Play around and try out a few more for your understanding.



## Data Preparation (Run This First)
This section downloads, loads, and cleans all datasets so you can start advanced practice without running previous chapters.


In [None]:

# Download the data
!wget -O customers.csv https://raw.githubusercontent.com/icyanide9/de-practice/refs/heads/main/customers.csv
!wget -O products.csv https://raw.githubusercontent.com/icyanide9/de-practice/refs/heads/main/products.csv
!wget -O orders.csv https://raw.githubusercontent.com/icyanide9/de-practice/refs/heads/main/orders.csv
!wget -O order_items.csv https://raw.githubusercontent.com/icyanide9/de-practice/refs/heads/main/order_items.csv
!wget -O employees.csv https://raw.githubusercontent.com/icyanide9/de-practice/refs/heads/main/employees.csv
!wget -O transactions.csv https://raw.githubusercontent.com/icyanide9/de-practice/refs/heads/main/transactions.csv

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName("PySpark Advanced Practice").getOrCreate()

# Load DataFrames
df_map = {}
for name in ["customers", "products", "orders", "order_items", "employees", "transactions"]:
    df_map[name] = spark.read.csv(f"{name}.csv", header=True, inferSchema=True)

customers_df = df_map["customers"]
products_df = df_map["products"]
orders_df = df_map["orders"]
order_items_df = df_map["order_items"]
employees_df = df_map["employees"]
transactions_df = df_map["transactions"]

# Data cleaning (minimal, for advanced topics):
customers_df = customers_df.dropDuplicates().dropna(subset=["customer_id", "name", "email"])
products_df = products_df.dropDuplicates().dropna(subset=["product_id", "product_name", "price"])
orders_df = orders_df.dropDuplicates().dropna(subset=["order_id", "customer_id", "order_amount"])
order_items_df = order_items_df.dropDuplicates().dropna(subset=["order_item_id", "order_id", "product_id"])
employees_df = employees_df.dropDuplicates().dropna(subset=["employee_id", "name"])
transactions_df = transactions_df.dropDuplicates().dropna(subset=["transaction_id", "customer_id", "amount"])



## Table Overview
- **customers_df**: customer_id, name, email, phone, address, registration_date, status
- **products_df**: product_id, product_name, category, price, stock_quantity
- **orders_df**: order_id, customer_id, order_date, order_amount, order_status, payment_method
- **order_items_df**: order_item_id, order_id, product_id, quantity, item_total
- **employees_df**: employee_id, name, department, hire_date, salary, manager_id
- **transactions_df**: transaction_id, customer_id, transaction_date, amount, transaction_type, location, created_at



### 1. User-Defined Functions (UDFs) & Built-in Functions

**Concept:** UDFs allow you to define custom logic for transforming data. Use built-in functions whenever possible for better performance.

**Sample Syntax (Generic):**
```python
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def my_func(x):
    return x.upper() if x else None

my_udf = udf(my_func, StringType())
df.withColumn("new_col", my_udf(df.col1))
```

**Practice:**
- Create a UDF to mask email addresses in `customers_df`.
- Use a built-in function to convert all product names to uppercase in `products_df`.

**Expected Output:**
- DataFrames with transformed columns.

**Additional Challenge:**
- Create a UDF to categorize transactions as "High" or "Low" based on amount in `transactions_df`.


In [None]:
#practice here


### 2. Spark SQL

**Concept:** Register DataFrames as temporary views and use SQL queries for analysis.

**Sample Syntax (Generic):**
```python
df.createOrReplaceTempView("my_table")
spark.sql("SELECT col1, COUNT(*) FROM my_table GROUP BY col1")
```

**Practice:**
- Register `orders_df` as a temp view and write a SQL query to find the total order amount per customer.

**Expected Output:**
- A DataFrame with customer_id and total order amount.

**Additional Challenge:**
- Write a SQL query to find the top 3 products by total quantity sold in `order_items_df`.


In [None]:
#practice here


### 3. Data Partitioning, Bucketing, and Caching

**Concept:** Partitioning and bucketing optimize data storage and query performance. Caching stores DataFrames in memory for faster access.

**Sample Syntax (Generic):**
```python
# Repartition
df = df.repartition(4, "col1")
# Cache
df.cache()
# Bucketing (when writing to disk)
df.write.bucketBy(4, "col1").saveAsTable("bucketed_table")
```

**Practice:**
- Repartition `orders_df` by `customer_id`.
- Cache `products_df` for repeated use.

**Expected Output:**
- DataFrames partitioned/cached as specified.

**Additional Challenge:**
- Write `order_items_df` to disk using bucketing by `product_id` (if supported in your environment).


In [None]:
#practice here


### 4. Data Writing and Saving

**Concept:** Save DataFrames to disk in various formats and modes.

**Sample Syntax (Generic):**
```python
df.write.csv("output.csv", mode="overwrite")
df.write.parquet("output.parquet", partitionBy=["col1"])
```

**Practice:**
- Save `customers_df` as a Parquet file partitioned by `status`.
- Save `orders_df` as a CSV file.

**Expected Output:**
- Files written to disk in the specified format.

**Additional Challenge:**
- Save `transactions_df` as a JSON file, partitioned by `transaction_type`.


In [None]:
#practice here


### 5. Error Handling and Data Quality Checks

**Concept:** Handle corrupt records, schema mismatches, and validate data quality.

**Sample Syntax (Generic):**
```python
# Handle corrupt records when reading
df = spark.read.option("mode", "DROPMALFORMED").csv("file.csv")
# Data validation
from pyspark.sql.functions import col
invalid_rows = df.filter(col("col1").isNull())
```

**Practice:**
- Find and count rows in `orders_df` with null `order_amount`.
- Drop rows in `products_df` with negative price.

**Expected Output:**
- DataFrames with invalid rows identified or removed.

**Additional Challenge:**
- Write a UDF to flag invalid email addresses in `customers_df`.


In [None]:
#practice here


### 6. Exploding and Flattening Nested Data

**Concept:** Use `explode` and `posexplode` to flatten arrays and structs in DataFrames.

**Sample Syntax (Generic):**
```python
from pyspark.sql.functions import explode
# Assume df has a column 'arr_col' which is an array
df_exploded = df.withColumn("element", explode(df.arr_col))
```

**Practice:**
- Create a DataFrame with an array column and use `explode` to flatten it.

**Expected Output:**
- A DataFrame with one row per array element.

**Additional Challenge:**
- Use `posexplode` to get both position and value from an array column.


In [None]:
#practice here


### 7. Working with Complex/Nested Data Types

**Concept:** Handle arrays, structs, and maps in DataFrames. Access and manipulate nested fields.

**Sample Syntax (Generic):**
```python
from pyspark.sql.functions import col
# Access a field in a struct
df.select(col("struct_col.field1"))
# Access an element in an array
df.select(col("array_col")[0])
```

**Practice:**
- Create a DataFrame with a struct column and select nested fields.
- Create a DataFrame with a map column and access values by key.

**Expected Output:**
- DataFrames with selected nested fields or map values.

**Additional Challenge:**
- Flatten a DataFrame with multiple levels of nested structs.


In [None]:
#practice here


### 8. Date/Time and Timezone Handling

**Concept:** Advanced date/time manipulations, working with time zones, and timestamp conversions.

**Sample Syntax (Generic):**
```python
from pyspark.sql.functions import to_timestamp, from_utc_timestamp
# Convert string to timestamp
df = df.withColumn("ts", to_timestamp(df.col1, "yyyy-MM-dd HH:mm:ss"))
# Convert UTC to local timezone
df = df.withColumn("local_ts", from_utc_timestamp(df.ts, "Asia/Kolkata"))
```

**Practice:**
- Convert `created_at` in `transactions_df` to a timestamp and extract the hour.
- Convert UTC timestamps to IST in any DataFrame.

**Expected Output:**
- DataFrames with new timestamp or timezone-adjusted columns.

**Additional Challenge:**
- Calculate the time difference in hours between two timestamp columns.


In [None]:
#practice here