## **PySpark Fundamentals - Part 1: Setting Up and Core Concepts**

### What is PySpark?
PySpark is the Python API for Apache Spark, which is a distributed computing framework designed to process large datasets across clusters of computers. Think of it as a way to run Python code on multiple machines simultaneously, making it possible to handle datasets that are too large for a single computer's memory.
The key insight is that PySpark operates on the principle of "lazy evaluation" - it builds up a plan of what you want to do with your data, but doesn't actually execute anything until you explicitly ask for results. This allows Spark to optimize the entire workflow before running it.

### Setting Up PySpark
First, let's look at how to initialize a PySpark session:

In [None]:
# PySpark Setup and Initialization

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a Spark session - this is your entry point to PySpark
# Think of it as opening a connection to the Spark cluster
spark = SparkSession.builder \
    .appName("MySparkApplication") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# Set log level to reduce verbose output
spark.sparkContext.setLogLevel("WARN")

print(f"Spark Version: {spark.version}")
print(f"Spark Context available as 'sc': {spark.sparkContext}")
print(f"SQL Context available as 'spark': {spark}")

# At the end of your program, always stop the session
spark.stop()

### **Core Data Structures in PySpark**

PySpark has two main data structures you'll work with:
#### **1. RDD (Resilient Distributed Dataset)**
This is the lower-level abstraction. Think of it as a collection of objects distributed across your cluster. While powerful, it's more complex to work with and requires you to think about the underlying distributed nature of your data.

#### **2. DataFrame**

This is the higher-level abstraction that sits on top of RDDs. It's similar to a pandas DataFrame or a SQL table, with named columns and defined data types. This is what you'll use 95% of the time, and it's what we'll focus on.
The DataFrame abstraction is incredibly powerful because it allows Spark's Catalyst optimizer to understand your data's structure and optimize your queries automatically. When you write DataFrame operations, Spark can rearrange, combine, and optimize your operations in ways that would be impossible with raw RDDs.
Let me show you how to create and work with DataFrames in the next section. This will include reading data from various sources, understanding the DataFrame structure, and performing basic operations.

## **PySpark Fundamentals - Part 2: DataFrames and Basic Operations**

### **Creating DataFrames**
DataFrames are like tables in a database or spreadsheets - they have rows and columns with defined data types. Let's explore the different ways to create them:

In [None]:
# Creating DataFrames in PySpark

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import *

# Initialize Spark session
spark = SparkSession.builder.appName("DataFrameBasics").getOrCreate()

# Method 1: Create DataFrame from Python list of tuples
# This is great for small test datasets or when prototyping
data = [
    ("Alice", 25, "Engineer", 75000.0),
    ("Bob", 30, "Manager", 85000.0),
    ("Charlie", 35, "Director", 120000.0),
    ("Diana", 28, "Analyst", 65000.0)
]

# Define column names - Spark will infer data types
columns = ["name", "age", "job_title", "salary"]
df_simple = spark.createDataFrame(data, columns)

print("Simple DataFrame created:")
df_simple.show()
df_simple.printSchema()  # This shows the structure and data types

print("\n" + "="*50 + "\n")

# Method 2: Create DataFrame with explicit schema
# This is better for production code as you control exactly what data types you want
# Think of schema as a blueprint that tells Spark exactly what to expect

schema = StructType([
    StructField("name", StringType(), True),      # True means nullable
    StructField("age", IntegerType(), True),
    StructField("job_title", StringType(), True),
    StructField("salary", DoubleType(), True)
])

df_with_schema = spark.createDataFrame(data, schema)
print("DataFrame with explicit schema:")
df_with_schema.show()
df_with_schema.printSchema()

print("\n" + "="*50 + "\n")

# Method 3: Reading from files (most common in real applications)
# Let's create a sample CSV first, then read it

# Create sample data and save as CSV
df_from_csv = spark.read.option("header", "true").csv("path/to/your/file.csv")
df_from_json = spark.read.json("path/to/your/file.json")
df_from_parquet = spark.read.parquet("path/to/your/file.parquet")

print("Sample employee data:")
df_from_csv.show()

# Method 4: Create DataFrame from dictionary (useful for configuration data)
dict_data = [
    {"product": "Laptop", "price": 1200, "category": "Electronics"},
    {"product": "Chair", "price": 150, "category": "Furniture"},
    {"product": "Book", "price": 25, "category": "Education"}
]

df_from_dict = spark.createDataFrame(dict_data)
print("DataFrame from dictionary:")
df_from_dict.show()

### **Understanding DataFrames - Key Concepts**

Now that you've seen how to create DataFrames, let's understand what makes them special. Think of a DataFrame as a distributed table where the data is automatically split across multiple computers, but you can work with it as if it were a single table.

Here are the fundamental concepts you need to grasp:

#### **Lazy Evaluation:** 
When you write DataFrame operations, Spark doesn't immediately execute them. Instead, it builds what's called a "logical plan" - essentially a recipe of what you want to do. Only when you call an "action" (like .show(), .collect(), or .count()) does Spark actually execute the plan. This allows Spark to optimize your entire workflow before running it.

#### **Immutability:**
DataFrames are immutable, meaning you can't change them directly. Every operation creates a new DataFrame. This might seem inefficient, but it's actually what allows Spark to run operations in parallel safely and recover from failures.

#### **Partitioning:**
Your data is automatically split into chunks called partitions, distributed across different machines. You usually don't need to worry about this, but understanding it helps explain why Spark can process massive datasets efficiently.

#### **Basic DataFrame Operations:**

In [None]:
# Basic DataFrame Operations

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Initialize Spark session
spark = SparkSession.builder.appName("DataFrameBasics").getOrCreate()

# Create sample data for demonstration
data = [
    ("Alice", 25, "Engineer", 75000.0, "Tech"),
    ("Bob", 30, "Manager", 85000.0, "Sales"),
    ("Charlie", 35, "Director", 120000.0, "Tech"),
    ("Diana", 28, "Analyst", 65000.0, "Finance"),
    ("Eve", 32, "Engineer", 78000.0, "Tech"),
    ("Frank", 45, "VP", 150000.0, "Sales")
]

columns = ["name", "age", "job_title", "salary", "department"]
df = spark.createDataFrame(data, columns)

print("Original DataFrame:")
df.show()

print("\n" + "="*60 + "\n")

# 1. SELECTING COLUMNS
# Think of this like choosing which columns you want to see in your spreadsheet
print("1. Selecting specific columns:")

# Method 1: Using column names as strings
df_selected = df.select("name", "salary")
df_selected.show()

# Method 2: Using column objects (more flexible for complex operations)
df_selected2 = df.select(col("name"), col("salary"))
df_selected2.show()

# Method 3: Selecting all columns from original df
df_all = df.select("*")
print(f"Total columns in original DataFrame: {len(df.columns)}")

print("\n" + "="*60 + "\n")

# 2. FILTERING ROWS
# This is like applying a filter in Excel - only show rows that meet certain conditions
print("2. Filtering rows:")

# Find employees with salary > 75000
high_earners = df.filter(col("salary") > 75000)
print("Employees earning more than $75,000:")
high_earners.show()

# Multiple conditions - find Tech employees earning > 75000
tech_high_earners = df.filter(
    (col("department") == "Tech") & (col("salary") > 75000)
)
print("Tech employees earning more than $75,000:")
tech_high_earners.show()

# Using SQL-like string expressions (alternative syntax)
young_employees = df.filter("age < 35")
print("Employees younger than 35:")
young_employees.show()

print("\n" + "="*60 + "\n")

# 3. ADDING NEW COLUMNS
# Think of this as creating calculated fields in your spreadsheet
print("3. Adding new columns:")

# Add a column for annual bonus (10% of salary)
df_with_bonus = df.withColumn("annual_bonus", col("salary") * 0.10)
print("DataFrame with bonus column:")
df_with_bonus.show()

# Add multiple columns at once
df_enhanced = df.withColumn("annual_bonus", col("salary") * 0.10) \
                .withColumn("age_category", 
                           when(col("age") < 30, "Young")
                           .when(col("age") < 40, "Mid-Career")
                           .otherwise("Senior"))

print("DataFrame with multiple new columns:")
df_enhanced.show()

print("\n" + "="*60 + "\n")

# 4. RENAMING COLUMNS
# Simple way to change column names
print("4. Renaming columns:")

df_renamed = df.withColumnRenamed("job_title", "position") \
               .withColumnRenamed("salary", "annual_salary")

print("DataFrame with renamed columns:")
df_renamed.show()

print("\n" + "="*60 + "\n")

# 5. SORTING DATA
# Order your data by one or more columns
print("5. Sorting data:")

# Sort by salary in descending order
df_sorted = df.orderBy(col("salary").desc())
print("Employees sorted by salary (highest first):")
df_sorted.show()

# Sort by multiple columns
df_multi_sort = df.orderBy(col("department").asc(), col("salary").desc())
print("Employees sorted by department, then by salary within each department:")
df_multi_sort.show()

print("\n" + "="*60 + "\n")

# 6. BASIC STATISTICS AND AGGREGATIONS
print("6. Basic statistics:")

# Count total rows
total_count = df.count()
print(f"Total number of employees: {total_count}")

# Basic statistics for numeric columns
df.describe().show()

# Specific aggregations
avg_salary = df.agg(avg("salary")).collect()[0][0]  # collect() brings data to driver
max_age = df.agg(max("age")).collect()[0][0]
print(f"Average salary: ${avg_salary:,.2f}")
print(f"Maximum age: {max_age}")

print("\n" + "="*60 + "\n")

# 7. VIEWING DATA IN DIFFERENT WAYS
print("7. Different ways to examine your data:")

# Show first few rows
print("First 3 rows:")
df.show(3)

# Show all data (be careful with large datasets!)
print("All data with unlimited width:")
df.show(truncate=False)

# Get column names and types
print("Column information:")
print(f"Columns: {df.columns}")
df.printSchema()

# Get a single row as a Row object
first_row = df.first()
print(f"First row: {first_row}")
print(f"First employee's name: {first_row['name']}")

# Understanding the difference between transformations and actions:
# Transformations (lazy - create new DataFrame): select, filter, withColumn, orderBy
# Actions (eager - trigger computation): show, count, collect, first, take

- The operations represent the foundation of almost everything you'll do with PySpark DataFrames. Notice how each operation follows a pattern: you start with a DataFrame, apply a transformation, and get back a new DataFrame. This chaining pattern is central to how PySpark works and makes it possible to build complex data processing pipelines by combining simple operations.

- The key insight here is understanding the difference between transformations and actions. Transformations like select(), filter(), and withColumn() are lazy - they just build up a plan of what you want to do. Actions like show(), count(), and collect() actually trigger Spark to execute all the transformations you've defined. This design allows Spark to optimize your entire pipeline before running it, often making dramatic performance improvements.

- Think of it like planning a route on a map app - you can add multiple stops and change your route as much as you want, but the app only calculates the optimal path when you press "Start Navigation." Similarly, Spark waits until you ask for results before figuring out the most efficient way to get them.

## **PySpark Advanced - Part 3: Grouping, Aggregations, and Joins**

### **Grouping and Aggregations**

Think of grouping like creating pivot tables in Excel - you're organizing your data by certain categories and then calculating summaries for each group.

In [None]:
# Grouping and Aggregations in PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Create a more comprehensive dataset for demonstration
sales_data = [
    ("Alice", "Tech", "Q1", 2023, 150000, 5),
    ("Alice", "Tech", "Q2", 2023, 180000, 6),
    ("Bob", "Sales", "Q1", 2023, 120000, 8),
    ("Bob", "Sales", "Q2", 2023, 140000, 9),
    ("Charlie", "Tech", "Q1", 2023, 200000, 4),
    ("Charlie", "Tech", "Q2", 2023, 220000, 5),
    ("Diana", "Marketing", "Q1", 2023, 80000, 12),
    ("Diana", "Marketing", "Q2", 2023, 95000, 15),
    ("Eve", "Sales", "Q1", 2023, 110000, 7),
    ("Eve", "Sales", "Q2", 2023, 130000, 8),
    ("Frank", "Tech", "Q1", 2023, 175000, 3),
    ("Frank", "Tech", "Q2", 2023, 190000, 4)
]

columns = ["employee", "department", "quarter", "year", "revenue", "deals_closed"]
df_sales = spark.createDataFrame(sales_data, columns)

print("Sales Dataset:")
df_sales.show()

print("\n" + "="*70 + "\n")

# 1. BASIC GROUPING - Group by single column
print("1. Basic Grouping - Revenue by Department:")

# Group by department and calculate total revenue
dept_revenue = df_sales.groupBy("department") \
                      .agg(sum("revenue").alias("total_revenue"),
                           avg("revenue").alias("avg_revenue"),
                           count("*").alias("record_count")) \
                      .orderBy(col("total_revenue").desc())

dept_revenue.show()

print("\n" + "="*70 + "\n")

# 2. MULTIPLE GROUPING COLUMNS
print("2. Multiple Grouping - Revenue by Department and Quarter:")

dept_quarter_stats = df_sales.groupBy("department", "quarter") \
                            .agg(sum("revenue").alias("total_revenue"),
                                 sum("deals_closed").alias("total_deals"),
                                 avg("revenue").alias("avg_revenue_per_person"),
                                 count("employee").alias("employee_count")) \
                            .orderBy("department", "quarter")

dept_quarter_stats.show()

print("\n" + "="*70 + "\n")

# 3. ADVANCED AGGREGATIONS
print("3. Advanced Aggregations with Multiple Functions:")

comprehensive_stats = df_sales.groupBy("department") \
    .agg(
        # Revenue statistics
        sum("revenue").alias("total_revenue"),
        avg("revenue").alias("avg_revenue"),
        min("revenue").alias("min_revenue"),
        max("revenue").alias("max_revenue"),
        
        # Deal statistics
        sum("deals_closed").alias("total_deals"),
        avg("deals_closed").alias("avg_deals_per_person"),
        
        # Count statistics
        count("*").alias("total_records"),
        countDistinct("employee").alias("unique_employees"),
        
        # Advanced: Standard deviation and variance
        stddev("revenue").alias("revenue_stddev"),
        
        # Custom aggregation: Revenue per deal
        (sum("revenue") / sum("deals_closed")).alias("revenue_per_deal")
    ) \
    .orderBy(col("total_revenue").desc())

comprehensive_stats.show()

print("\n" + "="*70 + "\n")

# 4. CONDITIONAL AGGREGATIONS
print("4. Conditional Aggregations:")

# Count how many high performers (>= 150k revenue) vs regular performers by department
performance_analysis = df_sales.groupBy("department") \
    .agg(
        sum(when(col("revenue") >= 150000, 1).otherwise(0)).alias("high_performers"),
        sum(when(col("revenue") < 150000, 1).otherwise(0)).alias("regular_performers"),
        avg("revenue").alias("avg_revenue"),
        # Calculate percentage of high performers
        (sum(when(col("revenue") >= 150000, 1).otherwise(0)) * 100.0 / count("*")).alias("high_performer_percentage")
    )

performance_analysis.show()

print("\n" + "="*70 + "\n")

# 5. COLLECT_LIST and COLLECT_SET - Gathering values
print("5. Collecting Values - Who works in each department:")

department_employees = df_sales.groupBy("department") \
    .agg(
        collect_set("employee").alias("unique_employees"),  # collect_set removes duplicates
        collect_list("employee").alias("all_employee_records"),  # collect_list keeps duplicates
        count("*").alias("total_records")
    )

department_employees.show(truncate=False)

print("\n" + "="*70 + "\n")

# 6. PIVOT OPERATIONS - Reshape data like Excel pivot tables
print("6. Pivot Operations - Department revenue by quarter:")

# Pivot: Turn quarter values into columns
pivoted_data = df_sales.groupBy("department") \
                      .pivot("quarter") \
                      .agg(sum("revenue")) \
                      .orderBy("department")

pivoted_data.show()

# You can also pivot with multiple aggregations
print("Multi-value pivot - Revenue and Deals by quarter:")
multi_pivot = df_sales.groupBy("department") \
                     .pivot("quarter") \
                     .agg(sum("revenue").alias("revenue"), 
                          sum("deals_closed").alias("deals"))

multi_pivot.show()

print("\n" + "="*70 + "\n")

# 7. ROLLUP and CUBE - Multi-dimensional aggregations
print("7. Rollup Operations - Hierarchical totals:")

# Rollup provides subtotals at different levels
rollup_stats = df_sales.rollup("department", "quarter") \
                      .agg(sum("revenue").alias("total_revenue"),
                           sum("deals_closed").alias("total_deals")) \
                      .orderBy("department", "quarter")

print("Rollup (includes subtotals and grand total):")
rollup_stats.show()

# Cube provides all possible combinations of subtotals
cube_stats = df_sales.cube("department", "quarter") \
                    .agg(sum("revenue").alias("total_revenue")) \
                    .orderBy("department", "quarter")

print("Cube (all possible subtotal combinations):")
cube_stats.show()

print("\n" + "="*70 + "\n")

# 8. WINDOW FUNCTIONS PREVIEW
print("8. Adding Running Totals with Window Functions:")

from pyspark.sql.window import Window

# Define a window: partition by department, order by quarter
window_spec = Window.partitionBy("department").orderBy("quarter")

# Add running total of revenue within each department
df_with_running_total = df_sales.withColumn(
    "running_revenue_total", 
    sum("revenue").over(window_spec)
).withColumn(
    "revenue_rank_in_dept",
    rank().over(Window.partitionBy("department").orderBy(col("revenue").desc()))
)

print("Data with running totals and rankings:")
df_with_running_total.select("employee", "department", "quarter", "revenue", 
                            "running_revenue_total", "revenue_rank_in_dept") \
                     .orderBy("department", "quarter") \
                     .show()

# Key insights about grouping and aggregations:
print("\n" + "="*70)
print("KEY CONCEPTS:")
print("1. groupBy() creates groups, agg() performs calculations on each group")
print("2. Always use .alias() to name your aggregated columns clearly")
print("3. You can combine multiple aggregation functions in one agg() call")
print("4. Pivot reshapes data - turns row values into columns")
print("5. Rollup/Cube provide hierarchical subtotals")
print("6. Window functions let you do calculations across related rows")
print("="*70)

### **Joining DataFrames**

Joins are how you combine data from multiple DataFrames, similar to JOIN operations in SQL. This is crucial when your data is spread across multiple tables or datasets.

In [None]:
# DataFrame Joins in PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Create sample datasets for demonstration
# Employee basic info
employees_data = [
    (1, "Alice", "Engineer", "Tech"),
    (2, "Bob", "Manager", "Sales"),
    (3, "Charlie", "Director", "Tech"),
    (4, "Diana", "Analyst", "Finance"),
    (5, "Eve", "Engineer", "Tech")
]
employees_df = spark.createDataFrame(employees_data, ["emp_id", "name", "job_title", "department"])

# Employee salary info (separate table)
salaries_data = [
    (1, 75000, "2023-01-01"),
    (2, 85000, "2023-01-01"),
    (3, 120000, "2023-01-01"),
    (4, 65000, "2023-01-01"),
    (6, 90000, "2023-01-01")  # Note: emp_id 6 doesn't exist in employees table
]
salaries_df = spark.createDataFrame(salaries_data, ["emp_id", "salary", "effective_date"])

# Department budget info
dept_budget_data = [
    ("Tech", 500000, "John Smith"),
    ("Sales", 300000, "Sarah Johnson"),
    ("Finance", 200000, "Mike Brown"),
    ("HR", 150000, "Lisa Davis")  # Note: No employees in HR department
]
dept_budget_df = spark.createDataFrame(dept_budget_data, ["department", "budget", "manager"])

print("SAMPLE DATASETS:")
print("\nEmployees:")
employees_df.show()
print("Salaries:")
salaries_df.show()
print("Department Budgets:")
dept_budget_df.show()

print("\n" + "="*80 + "\n")

# 1. INNER JOIN - Only matching records from both sides
print("1. INNER JOIN - Employees with salary information:")

inner_join_result = employees_df.join(salaries_df, "emp_id", "inner")
inner_join_result.show()

print("Notice: Only employees 1-4 appear because emp_id 5 has no salary, and emp_id 6 has no employee record")

print("\n" + "="*80 + "\n")

# 2. LEFT (OUTER) JOIN - All records from left table, matching from right
print("2. LEFT JOIN - All employees, with salary where available:")

left_join_result = employees_df.join(salaries_df, "emp_id", "left")
left_join_result.show()

print("Notice: All 5 employees appear, but Eve (emp_id 5) has null salary values")

print("\n" + "="*80 + "\n")

# 3. RIGHT (OUTER) JOIN - All records from right table, matching from left
print("3. RIGHT JOIN - All salary records, with employee info where available:")

right_join_result = employees_df.join(salaries_df, "emp_id", "right")
right_join_result.show()

print("Notice: emp_id 6 appears with salary but null employee information")

print("\n" + "="*80 + "\n")

# 4. FULL OUTER JOIN - All records from both tables
print("4. FULL OUTER JOIN - Everything from both tables:")

full_join_result = employees_df.join(salaries_df, "emp_id", "full")
full_join_result.show()

print("Notice: All employees AND all salary records appear, with nulls where data is missing")

print("\n" + "="*80 + "\n")

# 5. JOINING ON MULTIPLE CONDITIONS
print("5. Complex Join Conditions:")

# Let's create data with multiple join keys
projects_data = [
    (1, "Tech", "Project Alpha", "2023-Q1"),
    (2, "Sales", "Project Beta", "2023-Q1"),
    (3, "Tech", "Project Gamma", "2023-Q2"),
    (1, "Tech", "Project Delta", "2023-Q2")
]
projects_df = spark.createDataFrame(projects_data, ["emp_id", "department", "project_name", "quarter"])

# Join on multiple conditions
multi_condition_join = employees_df.join(
    projects_df, 
    (employees_df.emp_id == projects_df.emp_id) & 
    (employees_df.department == projects_df.department),
    "inner"
)

print("Employees matched to projects (by employee ID AND department):")
multi_condition_join.select(
    employees_df.name, 
    employees_df.department, 
    projects_df.project_name, 
    projects_df.quarter
).show()

print("\n" + "="*80 + "\n")

# 6. JOINING TABLES WITH DIFFERENT COLUMN NAMES
print("6. Joining on columns with different names:")

# Create a table where the join key has a different name
performance_data = [
    (1, 85, "Excellent"),
    (2, 78, "Good"),
    (3, 92, "Outstanding"),
    (4, 70, "Satisfactory")
]
performance_df = spark.createDataFrame(performance_data, ["employee_id", "score", "rating"])

# Join where left table has 'emp_id' and right table has 'employee_id'
different_names_join = employees_df.join(
    performance_df,
    employees_df.emp_id == performance_df.employee_id,
    "left"
)

print("Employees with performance ratings:")
different_names_join.select(
    "name", "job_title", "score", "rating"
).show()

print("\n" + "="*80 + "\n")

# 7. SELF JOINS - Joining a table with itself
print("7. Self Join - Finding employees in the same department:")

# Join employees table with itself to find colleagues
emp_alias1 = employees_df.alias("emp1")
emp_alias2 = employees_df.alias("emp2")

colleagues = emp_alias1.join(
    emp_alias2,
    (emp_alias1.department == emp_alias2.department) & 
    (emp_alias1.emp_id != emp_alias2.emp_id),  # Don't match employee with themselves
    "inner"
).select(
    col("emp1.name").alias("employee_1"),
    col("emp2.name").alias("employee_2"),
    col("emp1.department").alias("shared_department")
)

print("Employees who work in the same department:")
colleagues.show()

print("\n" + "="*80 + "\n")

# 8. ANTI JOIN - Records in left table that DON'T have matches in right table
print("8. Anti Join - Employees without salary records:")

employees_without_salary = employees_df.join(salaries_df, "emp_id", "left_anti")
employees_without_salary.show()

print("\n" + "="*80 + "\n")

# 9. SEMI JOIN - Records in left table that DO have matches in right table
print("9. Semi Join - Employees who have salary records:")

employees_with_salary = employees_df.join(salaries_df, "emp_id", "left_semi")
employees_with_salary.show()

print("Notice: This shows employee info only, even though we joined with salary table")

print("\n" + "="*80 + "\n")

# 10. COMPLEX MULTI-TABLE JOIN
print("10. Complex Multi-table Join - Complete Employee Information:")

# Join all three tables together
complete_info = employees_df \
    .join(salaries_df, "emp_id", "left") \
    .join(dept_budget_df, "department", "left") \
    .select(
        "name",
        "job_title", 
        "department",
        "salary",
        "budget",
        "manager"
    ).orderBy("name")

print("Complete employee information with department details:")
complete_info.show()

print("\n" + "="*80 + "\n")

# 11. HANDLING DUPLICATE COLUMN NAMES AFTER JOINS
print("11. Handling duplicate column names:")

# When joining tables with same column names, you need to be explicit
employees_with_dept_col = employees_df.select("emp_id", "name", "department")
projects_with_dept_col = projects_df.select("emp_id", "department", "project_name")

# This would cause ambiguous column reference - which department column?
# Solution: Select specific columns or rename before joining
clean_join = employees_with_dept_col.alias("emp").join(
    projects_with_dept_col.alias("proj"),
    col("emp.emp_id") == col("proj.emp_id"),
    "inner"
).select(
    col("emp.name"),
    col("emp.department").alias("employee_dept"),
    col("proj.department").alias("project_dept"),
    col("proj.project_name")
)

print("Handling duplicate column names:")
clean_join.show()

print("\n" + "="*80)
print("JOIN TYPE SUMMARY:")
print("• INNER: Only matching records from both tables")
print("• LEFT: All from left table + matching from right")
print("• RIGHT: All from right table + matching from left") 
print("• FULL: All records from both tables")
print("• LEFT_ANTI: Records in left that DON'T match right")
print("• LEFT_SEMI: Records in left that DO match right (left columns only)")
print("="*80)

### **Key Concepts**

- **Grouping and Aggregations** allow you to answer questions like "What's the total sales by region?" or "Who are the top performers in each department?" The pattern is always: group your data by some categories, then calculate summaries for each group.
Joins let you bring together related information from different datasets. Think of them as ways to connect the dots between different pieces of information. The type of join you choose depends on what you want to include in your final result.

- The most important insight about joins is understanding what each type preserves:

    **Inner joins are conservative** - only keep perfect matches \
    **Left joins prioritize the left table** - keep everything from it \
    **Full outer joins are comprehensive** - keep everything from everywhere

- These operations form the backbone of most data analysis workflows. You'll typically read data from multiple sources, join them together to create a complete picture, then group and aggregate to generate insights.

## **PySpark Advanced - Part 4: Window Functions and Complex Data Manipulation**

Window functions are one of PySpark's most powerful features. They let you perform calculations across a set of related rows without collapsing your data into groups. Think of them as a way to "look around" at neighboring rows while keeping all your original data intact.

### **Window Functions Deep Dive**

In [None]:
# Window Functions in PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

# Create a comprehensive sales dataset
sales_data = [
    ("Alice", "Tech", "2023-01", 150000, 5),
    ("Alice", "Tech", "2023-02", 180000, 6),
    ("Alice", "Tech", "2023-03", 200000, 7),
    ("Bob", "Sales", "2023-01", 120000, 8),
    ("Bob", "Sales", "2023-02", 140000, 9),
    ("Bob", "Sales", "2023-03", 160000, 10),
    ("Charlie", "Tech", "2023-01", 200000, 4),
    ("Charlie", "Tech", "2023-02", 220000, 5),
    ("Charlie", "Tech", "2023-03", 240000, 6),
    ("Diana", "Marketing", "2023-01", 80000, 12),
    ("Diana", "Marketing", "2023-02", 95000, 15),
    ("Diana", "Marketing", "2023-03", 110000, 18),
    ("Eve", "Sales", "2023-01", 110000, 7),
    ("Eve", "Sales", "2023-02", 130000, 8),
    ("Eve", "Sales", "2023-03", 150000, 9)
]

columns = ["employee", "department", "month", "revenue", "deals_closed"]
df = spark.createDataFrame(sales_data, columns)

print("Sales Dataset:")
df.orderBy("employee", "month").show()

print("\n" + "="*80 + "\n")

# 1. RANKING FUNCTIONS
print("1. Ranking Functions - Who are the top performers?")

# Define window for ranking within each department
dept_window = Window.partitionBy("department").orderBy(col("revenue").desc())

df_with_rankings = df.withColumn("revenue_rank", rank().over(dept_window)) \
                    .withColumn("revenue_dense_rank", dense_rank().over(dept_window)) \
                    .withColumn("revenue_row_number", row_number().over(dept_window))

print("Rankings within each department:")
df_with_rankings.select("employee", "department", "revenue", 
                       "revenue_rank", "revenue_dense_rank", "revenue_row_number") \
                .orderBy("department", "revenue_rank") \
                .show()

print("Key differences:")
print("- rank(): Gaps in ranking when there are ties (1, 2, 2, 4)")
print("- dense_rank(): No gaps in ranking (1, 2, 2, 3)")
print("- row_number(): Always unique numbers (1, 2, 3, 4)")

print("\n" + "="*80 + "\n")

# 2. RUNNING TOTALS AND CUMULATIVE CALCULATIONS
print("2. Running Totals - Track cumulative performance over time")

# Window ordered by month for each employee
employee_time_window = Window.partitionBy("employee").orderBy("month")

df_with_running_totals = df.withColumn(
    "cumulative_revenue", sum("revenue").over(employee_time_window)
).withColumn(
    "cumulative_deals", sum("deals_closed").over(employee_time_window)
).withColumn(
    "running_avg_revenue", avg("revenue").over(employee_time_window)
)

print("Running totals for each employee over time:")
df_with_running_totals.select("employee", "month", "revenue", "deals_closed",
                             "cumulative_revenue", "cumulative_deals", "running_avg_revenue") \
                     .orderBy("employee", "month") \
                     .show()

print("\n" + "="*80 + "\n")

# 3. LAG AND LEAD - Looking at previous and next rows
print("3. Lag and Lead - Compare with previous/next periods")

df_with_lag_lead = df.withColumn(
    "prev_month_revenue", lag("revenue", 1).over(employee_time_window)
).withColumn(
    "next_month_revenue", lead("revenue", 1).over(employee_time_window)
).withColumn(
    "revenue_growth", col("revenue") - lag("revenue", 1).over(employee_time_window)
).withColumn(
    "revenue_growth_pct", 
    ((col("revenue") - lag("revenue", 1).over(employee_time_window)) / 
     lag("revenue", 1).over(employee_time_window) * 100)
)

print("Month-over-month comparisons:")
df_with_lag_lead.select("employee", "month", "revenue", "prev_month_revenue", 
                       "revenue_growth", "revenue_growth_pct") \
                .orderBy("employee", "month") \
                .show()

print("\n" + "="*80 + "\n")

# 4. MOVING AVERAGES AND WINDOW FRAMES
print("4. Moving Averages - Smooth out trends")

# Define a window with a specific frame: current row and 1 preceding row
moving_avg_window = Window.partitionBy("employee") \
                         .orderBy("month") \
                         .rowsBetween(-1, 0)  # Previous row and current row

df_with_moving_avg = df.withColumn(
    "moving_avg_revenue_2months", avg("revenue").over(moving_avg_window)
).withColumn(
    "moving_sum_deals_2months", sum("deals_closed").over(moving_avg_window)
)

# 3-month moving average (current + 2 preceding)
moving_avg_3month_window = Window.partitionBy("employee") \
                                .orderBy("month") \
                                .rowsBetween(-2, 0)

df_with_moving_avg = df_with_moving_avg.withColumn(
    "moving_avg_revenue_3months", avg("revenue").over(moving_avg_3month_window)
)

print("Moving averages to smooth trends:")
df_with_moving_avg.select("employee", "month", "revenue", 
                         "moving_avg_revenue_2months", "moving_avg_revenue_3months") \
                 .orderBy("employee", "month") \
                 .show()

print("\n" + "="*80 + "\n")

# 5. PERCENTILES AND NTILE
print("5. Percentiles and Distribution Analysis")

# Overall window for percentiles across all employees
overall_window = Window.orderBy("revenue")

df_with_percentiles = df.withColumn(
    "revenue_percentile", percent_rank().over(overall_window)
).withColumn(
    "revenue_quartile", ntile(4).over(overall_window)  # Divide into 4 groups
).withColumn(
    "revenue_decile", ntile(10).over(overall_window)   # Divide into 10 groups
)

print("Percentile analysis:")
df_with_percentiles.select("employee", "month", "revenue", 
                          "revenue_percentile", "revenue_quartile", "revenue_decile") \
                  .orderBy("revenue") \
                  .show()

print("\n" + "="*80 + "\n")

# 6. FIRST_VALUE AND LAST_VALUE
print("6. First and Last Values - Benchmarking against period extremes")

dept_month_window = Window.partitionBy("department", "month").orderBy("revenue")

df_with_first_last = df.withColumn(
    "dept_min_revenue_this_month", first("revenue").over(dept_month_window)
).withColumn(
    "dept_max_revenue_this_month", last("revenue").over(dept_month_window)
).withColumn(
    "performance_vs_dept_min", col("revenue") - first("revenue").over(dept_month_window)
).withColumn(
    "performance_vs_dept_max", col("revenue") - last("revenue").over(dept_month_window)
)

print("Performance vs department extremes:")
df_with_first_last.select("employee", "department", "month", "revenue",
                         "dept_min_revenue_this_month", "dept_max_revenue_this_month",
                         "performance_vs_dept_min", "performance_vs_dept_max") \
                 .orderBy("department", "month", "revenue") \
                 .show()

print("\n" + "="*80 + "\n")

# 7. COMPLEX WINDOW ANALYSIS - Multiple windows in one query
print("7. Complex Analysis - Multiple Window Functions Together")

# Define multiple windows for different analyses
employee_window = Window.partitionBy("employee").orderBy("month")
dept_window = Window.partitionBy("department").orderBy(col("revenue").desc())
overall_window = Window.orderBy("revenue")

comprehensive_analysis = df.withColumn(
    # Time-based analysis for each employee
    "employee_running_total", sum("revenue").over(employee_window)
).withColumn(
    "employee_avg_so_far", avg("revenue").over(employee_window)
).withColumn(
    "month_over_month_change", col("revenue") - lag("revenue", 1).over(employee_window)
).withColumn(
    # Department ranking
    "dept_revenue_rank", rank().over(dept_window)
).withColumn(
    # Overall percentile
    "overall_percentile", percent_rank().over(overall_window)
).withColumn(
    # Performance indicators
    "is_top_performer_in_dept", when(rank().over(dept_window) <= 2, "Yes").otherwise("No")
).withColumn(
    "performance_tier", 
    when(percent_rank().over(overall_window) >= 0.8, "Top 20%")
    .when(percent_rank().over(overall_window) >= 0.6, "Top 40%") 
    .when(percent_rank().over(overall_window) >= 0.4, "Middle 20%")
    .when(percent_rank().over(overall_window) >= 0.2, "Bottom 40%")
    .otherwise("Bottom 20%")
)

print("Comprehensive performance analysis:")
comprehensive_analysis.select(
    "employee", "department", "month", "revenue",
    "employee_running_total", "month_over_month_change",
    "dept_revenue_rank", "performance_tier", "is_top_performer_in_dept"
).orderBy("employee", "month").show(truncate=False)

print("\n" + "="*80 + "\n")

# 8. WINDOW FRAMES - Different ways to define your "window"
print("8. Understanding Window Frames")

# Current row only
current_only = Window.partitionBy("employee").orderBy("month").rowsBetween(0, 0)

# Unbounded preceding to current (running total)
running_total_frame = Window.partitionBy("employee").orderBy("month").rowsBetween(Window.unboundedPreceding, 0)

# Range-based frame (useful for time-based windows)
# Note: This would work better with actual date columns
range_frame = Window.partitionBy("employee").orderBy("month").rangeBetween(Window.unboundedPreceding, 0)

df_frame_examples = df.withColumn(
    "current_revenue_only", sum("revenue").over(current_only)
).withColumn(
    "running_total_explicit", sum("revenue").over(running_total_frame)
).withColumn(
    "count_records_so_far", count("*").over(running_total_frame)
)

print("Different window frame examples:")
df_frame_examples.select("employee", "month", "revenue", 
                        "current_revenue_only", "running_total_explicit", "count_records_so_far") \
                 .orderBy("employee", "month") \
                 .show()

print("\n" + "="*80)
print("WINDOW FUNCTIONS KEY CONCEPTS:")
print("• Window = Partition (group) + Order + Frame (which rows to include)")
print("• Partition: Which records to group together")
print("• Order: How to sort within each partition") 
print("• Frame: Which rows to include in calculation (default is unbounded preceding to current)")
print("• Ranking: rank(), dense_rank(), row_number(), ntile()")
print("• Analytics: lag(), lead(), first_value(), last_value()")
print("• Aggregates: sum(), avg(), count(), etc. over windows")
print("="*80)

### **Advanced Data Manipulation Techniques**

Now let's explore some sophisticated data manipulation techniques that you'll use in real-world scenarios.

In [None]:
# Advanced Data Manipulation in PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json

# Create complex sample data for demonstration
complex_data = [
    (1, "Alice Johnson", "alice.johnson@company.com", "2023-01-15", 
     "{'skills': ['Python', 'SQL', 'Machine Learning'], 'years_exp': 5, 'certifications': ['AWS', 'GCP']}", 
     "Engineering,Data Science", "New York,San Francisco"),
    (2, "Bob Smith", "bob.smith@company.com", "2023-02-20",
     "{'skills': ['Java', 'Spring', 'Microservices'], 'years_exp': 8, 'certifications': ['Oracle', 'Spring']}", 
     "Engineering,Architecture", "Chicago"),
    (3, "Carol Davis", "carol.davis@company.com", "2023-03-10",
     "{'skills': ['Marketing', 'Analytics', 'SEO'], 'years_exp': 6, 'certifications': ['Google Analytics', 'HubSpot']}", 
     "Marketing,Growth", "Los Angeles,Austin"),
    (4, "David Wilson", "david.wilson@company.com", "2023-01-05",
     "{'skills': ['Finance', 'Excel', 'PowerBI'], 'years_exp': 10, 'certifications': ['CPA', 'CFA']}", 
     "Finance,Analytics", "Boston")
]

columns = ["emp_id", "full_name", "email", "hire_date", "profile_json", "departments", "locations"]
df = spark.createDataFrame(complex_data, columns)

print("Original Complex Dataset:")
df.show(truncate=False)

print("\n" + "="*90 + "\n")

# 1. STRING MANIPULATION AND EXTRACTION
print("1. Advanced String Operations")

df_string_ops = df.withColumn(
    # Extract first and last names
    "first_name", split(col("full_name"), " ").getItem(0)
).withColumn(
    "last_name", split(col("full_name"), " ").getItem(1)
).withColumn(
    # Extract domain from email
    "email_domain", regexp_extract(col("email"), r"@(.+)", 1)
).withColumn(
    # Clean and standardize email
    "email_clean", lower(trim(col("email")))
).withColumn(
    # Extract year from hire date
    "hire_year", year(to_date(col("hire_date"), "yyyy-MM-dd"))
).withColumn(
    # Create initials
    "initials", concat(
        substring(col("first_name"), 1, 1),
        lit("."),
        substring(col("last_name"), 1, 1),
        lit(".")
    )
).withColumn(
    # Check if name contains specific pattern
    "has_common_lastname", when(col("last_name").rlike("Smith|Johnson|Davis"), "Yes").otherwise("No")
)

print("String manipulation results:")
df_string_ops.select("full_name", "first_name", "last_name", "email_domain", 
                    "initials", "hire_year", "has_common_lastname").show()

print("\n" + "="*90 + "\n")

# 2. WORKING WITH ARRAYS AND COMPLEX DATA
print("2. Array and Complex Data Operations")

# Split comma-separated values into arrays
df_arrays = df.withColumn(
    "departments_array", split(col("departments"), ",")
).withColumn(
    "locations_array", split(col("locations"), ",")
).withColumn(
    # Get array size
    "num_departments", size(split(col("departments"), ","))
).withColumn(
    "num_locations", size(split(col("locations"), ","))
).withColumn(
    # Check if array contains specific value
    "works_in_engineering", array_contains(split(col("departments"), ","), "Engineering")
).withColumn(
    # Get first element
    "primary_department", split(col("departments"), ",").getItem(0)
).withColumn(
    # Get all except first element (if more than one)
    "secondary_departments", 
    when(size(split(col("departments"), ",")) > 1,
         slice(split(col("departments"), ","), 2, 10))
    .otherwise(array())
)

print("Array operations:")
df_arrays.select("emp_id", "full_name", "departments_array", "num_departments", 
                "works_in_engineering", "primary_department", "secondary_departments").show(truncate=False)

print("\n" + "="*90 + "\n")

# 3. WORKING WITH JSON DATA
print("3. JSON Data Extraction")

# Parse JSON strings to extract structured data
df_json = df.withColumn(
    # Parse JSON string to struct
    "profile_struct", from_json(col("profile_json"), 
        StructType([
            StructField("skills", ArrayType(StringType()), True),
            StructField("years_exp", IntegerType(), True),
            StructField("certifications", ArrayType(StringType()), True)
        ])
    )
).withColumn(
    # Extract specific fields from JSON
    "skills_array", col("profile_struct.skills")
).withColumn(
    "years_experience", col("profile_struct.years_exp")
).withColumn(
    "certifications_array", col("profile_struct.certifications")
).withColumn(
    # Work with extracted arrays
    "num_skills", size(col("profile_struct.skills"))
).withColumn(
    "num_certifications", size(col("profile_struct.certifications"))
).withColumn(
    "has_python_skill", array_contains(col("profile_struct.skills"), "Python")
).withColumn(
    "primary_skill", col("profile_struct.skills").getItem(0)
)

print("JSON extraction results:")
df_json.select("emp_id", "full_name", "skills_array", "years_experience", 
              "num_skills", "has_python_skill", "primary_skill").show(truncate=False)

print("\n" + "="*90 + "\n")

# 4. CONDITIONAL LOGIC AND CASE STATEMENTS
print("4. Complex Conditional Logic")

df_conditional = df_json.withColumn(
    # Multi-level conditional logic
    "experience_level",
    when(col("years_experience") < 3, "Junior")
    .when(col("years_experience") < 7, "Mid-Level") 
    .when(col("years_experience") < 10, "Senior")
    .otherwise("Principal")
).withColumn(
    # Complex conditions with AND/OR
    "tech_leader",
    when(
        (col("years_experience") >= 5) & 
        (array_contains(col("skills_array"), "Python") | array_contains(col("skills_array"), "Java")) &
        (size(col("certifications_array")) >= 2),
        "Yes"
    ).otherwise("No")
).withColumn(
    # Nested conditions
    "employee_category",
    when(col("works_in_engineering") & (col("years_experience") >= 5), "Senior Engineer")
    .when(col("works_in_engineering") & (col("years_experience") < 5), "Junior Engineer")
    .when(~col("works_in_engineering") & (col("years_experience") >= 8), "Senior Non-Tech")
    .otherwise("Junior Non-Tech")
).withColumn(
    # Using case with multiple columns
    "compensation_band",
    when((col("experience_level") == "Principal") | (col("tech_leader") == "Yes"), "Band A")
    .when(col("experience_level") == "Senior", "Band B")
    .when(col("experience_level") == "Mid-Level", "Band C")
    .otherwise("Band D")
)

print("Conditional logic results:")
df_conditional.select("full_name", "years_experience", "experience_level", 
                     "tech_leader", "employee_category", "compensation_band").show()

print("\n" + "="*90 + "\n")

# 5. DATA TYPE CONVERSIONS AND CLEANING
print("5. Data Type Conversions and Cleaning")

# Create some messy data for cleaning demonstration
messy_data = [
    ("  John Doe  ", "123.45", "2023-01-15T10:30:00", "true", "NULL"),
    ("Jane Smith", "67.8", "2023-02-20T14:15:30", "false", ""),
    ("Bob Johnson", "invalid_number", "invalid_date", "maybe", "N/A"),
    ("", "0.00", "2023-03-10T09:45:00", "1", "none")
]

messy_df = spark.createDataFrame(messy_data, ["name", "score", "timestamp", "active", "notes"])

print("Original messy data:")
messy_df.show()

# Clean and convert the data
cleaned_df = messy_df.withColumn(
    # Clean strings
    "name_clean", trim(col("name"))
).withColumn(
    # Handle empty strings
    "name_final", when(trim(col("name")) == "", lit("Unknown")).otherwise(trim(col("name")))
).withColumn(
    # Safe numeric conversion
    "score_numeric", 
    when(col("score").rlike(r"^\d+\.?\d*$"), col("score").cast(DoubleType()))
    .otherwise(lit(0.0))
).withColumn(
    # Safe date conversion
    "timestamp_clean",
    when(col("timestamp").rlike(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$"),
         to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss"))
    .otherwise(lit(None).cast(TimestampType()))
).withColumn(
    # Standardize boolean values
    "active_boolean",
    when(lower(col("active")).isin("true", "1", "yes"), lit(True))
    .when(lower(col("active")).isin("false", "0", "no"), lit(False))
    .otherwise(lit(None).cast(BooleanType()))
).withColumn(
    # Handle various null representations
    "notes_clean",
    when(lower(trim(col("notes"))).isin("null", "", "n/a", "none"), lit(None))
    .otherwise(col("notes"))
)

print("Cleaned data:")
cleaned_df.show()
print("Schema after cleaning:")
cleaned_df.printSchema()

print("\n" + "="*90 + "\n")

# 6. EXPLODING ARRAYS - Converting arrays to separate rows
print("6. Exploding Arrays - Array to Rows Conversion")

# Using our earlier data with arrays
df_to_explode = df_json.select("emp_id", "full_name", "skills_array", "certifications_array")

# Explode skills array - each skill becomes a separate row
df_skills_exploded = df_to_explode.select(
    "emp_id", "full_name", 
    explode("skills_array").alias("individual_skill")
)

print("Skills exploded to separate rows:")
df_skills_exploded.show()

# Explode with position to keep track of array index
df_skills_with_pos = df_to_explode.select(
    "emp_id", "full_name",
    posexplode("skills_array").alias("skill_position", "individual_skill")
)

print("Skills exploded with positions:")
df_skills_with_pos.show()

print("\n" + "="*90 + "\n")

# 7. PIVOT AND UNPIVOT OPERATIONS
print("7. Pivot and Unpivot Operations")

# Create data suitable for pivoting
performance_data = [
    ("Alice", "Q1", 85, 12),
    ("Alice", "Q2", 92, 15),
    ("Alice", "Q3", 88, 13),
    ("Bob", "Q1", 78, 10),
    ("Bob", "Q2", 85, 14),
    ("Bob", "Q3", 90, 16),
    ("Carol", "Q1", 95, 18),
    ("Carol", "Q2", 88, 16),
    ("Carol", "Q3", 92, 19)
]

perf_df = spark.createDataFrame(performance_data, ["employee", "quarter", "score", "sales"])

print("Original performance data:")
perf_df.show()

# Pivot to make quarters into columns
pivoted_perf = perf_df.groupBy("employee") \
                     .pivot("quarter") \
                     .agg(first("score").alias("score"), 
                          first("sales").alias("sales"))

print("Pivoted performance data:")
pivoted_perf.show()

print("\n" + "="*90 + "\n")

# 8. USER-DEFINED FUNCTIONS (UDFs)
print("8. User-Defined Functions for Custom Logic")

from pyspark.sql.functions import udf

# Define a complex function that would be hard to express with built-in functions
def calculate_risk_score(years_exp, num_skills, num_certs):
    """Calculate employee risk score based on multiple factors"""
    if years_exp is None or num_skills is None or num_certs is None:
        return 0
    
    base_score = min(years_exp * 10, 100)  # Cap at 100
    skill_bonus = min(num_skills * 5, 50)   # Cap at 50
    cert_bonus = min(num_certs * 10, 30)    # Cap at 30
    
    total_score = base_score + skill_bonus + cert_bonus
    
    # Apply risk categories
    if total_score >= 150:
        return 1  # Low risk
    elif total_score >= 100:
        return 2  # Medium risk
    else:
        return 3  # High risk

# Register UDF
risk_score_udf = udf(calculate_risk_score, IntegerType())

# Apply UDF
df_with_risk = df_json.withColumn(
    "risk_score",
    risk_score_udf(col("years_experience"), col("num_skills"), col("num_certifications"))
).withColumn(
    "risk_category",
    when(col("risk_score") == 1, "Low Risk")
    .when(col("risk_score") == 2, "Medium Risk")
    .otherwise("High Risk")
)

print("Data with custom risk scoring:")
df_with_risk.select("full_name", "years_experience", "num_skills", "num_certifications",
                   "risk_score", "risk_category").show()

print("\n" + "="*90)
print("ADVANCED MANIPULATION KEY CONCEPTS:")
print("• String functions: split(), regexp_extract(), substring(), concat()")
print("• Array functions: array_contains(), size(), slice(), explode()")
print("• JSON functions: from_json(), to_json(), get_json_object()")
print("• Conditional logic: when().otherwise(), complex boolean conditions")
print("• Data cleaning: trim(), cast(), safe conversions with when()")
print("• UDFs: For complex logic that can't be expressed with built-in functions")
print("• Always prefer built-in functions over UDFs for performance")
print("="*90)

### **What You've Just Mastered**

- **Window Functions** are the secret weapon of advanced data analysts. They let you perform complex calculations like running totals, rankings, and comparisons without losing the detail of your original data. The key insight is understanding the three components: partition (which records to group), order (how to sort within groups), and frame (which rows to include in calculations).

- **Advanced Data Manipulation** techniques let you handle real-world messy data. You'll rarely get clean, perfectly structured data in practice. These techniques - from JSON parsing to complex conditional logic to custom functions - are what transform you from someone who can work with toy datasets to someone who can handle production data systems.
The most important concept here is that PySpark gives you both high-level abstractions (DataFrame operations) and the flexibility to handle complex scenarios (UDFs, complex transformations). The art is knowing when to use each approach - built-in functions are almost always faster and more reliable than custom code.

- **Window functions**, in particular, solve a class of problems that would otherwise require complex self-joins or multiple passes through your data. They're essential for time-series analysis, ranking problems, and any scenario where you need to compare each row with its neighbors or with aggregate statistics.