# PySpark Structured API - Getting Started Guide

## Introduction to DataFrames

DataFrames are the core of Spark's Structured API. Think of them as distributed tables with named columns and schemas, similar to pandas DataFrames but distributed across a cluster.

**Note**: In PySpark, we primarily work with DataFrames. Datasets are a Scala/Java feature only.

## 1. Creating Your First DataFrame

### From Python Collections

In [1]:
from pyspark.sql import SparkSession

# Initialize Spark session
#spark = SparkSession.builder \
#    .appName("StructuredAPIDemo") \
#    .getOrCreate()

# From a list of tuples
data = [
    (1, "Alice", 25, "Engineer"),
    (2, "Bob", 30, "Data Scientist"),
    (3, "Charlie", 35, "Manager"),
    (4, "Diana", 28, "Engineer"),
    (5, "Eve", 32, "Data Scientist")
]

df = spark.createDataFrame(data, ["id", "name", "age", "job"])
df.show()

+---+-------+---+--------------+
| id|   name|age|           job|
+---+-------+---+--------------+
|  1|  Alice| 25|      Engineer|
|  2|    Bob| 30|Data Scientist|
|  3|Charlie| 35|       Manager|
|  4|  Diana| 28|      Engineer|
|  5|    Eve| 32|Data Scientist|
+---+-------+---+--------------+



### From pandas DataFrame

In [2]:
import pandas as pd

# Create pandas DataFrame
pandas_df = pd.DataFrame({
    'id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie'],
    'age': [25, 30, 35]
})

# Convert to Spark DataFrame
spark_df = spark.createDataFrame(pandas_df)
spark_df.show()

# Convert back to pandas (for small datasets)
result_pandas = spark_df.toPandas()

+---+---+-------+
|age| id|   name|
+---+---+-------+
| 25|  1|  Alice|
| 30|  2|    Bob|
| 35|  3|Charlie|
+---+---+-------+



## 2. Reading Data from Files

### CSV Files

In [6]:
# Reading CSV with options
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("sample_data/data.csv")

# Alternative syntax
df = spark.read.csv("./sample_data/data.csv", header=True, inferSchema=True)

# With explicit schema (better for production)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

df = spark.read.csv("./sample_data/data.csv", header=True, schema=schema)

In [7]:
df.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 25|
|  2|    Bob| 30|
|  3|Charlie| 35|
|  4|  Diana| 28|
|  5|    Eve| 32|
|  6|  Frank| 45|
|  7|  Grace| 29|
|  8|  Henry| 38|
|  9|    Ivy| 27|
| 10|   Jack| 41|
+---+-------+---+



### JSON Files

In [27]:
# Reading JSON
df = spark.read.json("./sample_data/data.json")
df.show()

# Reading multiple JSON files
df = spark.read.json(["./sample_data/file1.json", "./sample_data/file2.json"])
df.show()

# Reading from directory
df = spark.read.json("./sample_data/data_folder/")
df.show()

+---+-----------+---+-------+------+
|age| department| id|   name|salary|
+---+-----------+---+-------+------+
| 25|Engineering|  1|  Alice| 80000|
| 30|  Marketing|  2|    Bob| 65000|
| 35|Engineering|  3|Charlie| 90000|
| 28|      Sales|  4|  Diana| 70000|
| 32|Engineering|  5|    Eve| 85000|
| 45|  Marketing|  6|  Frank| 75000|
| 29|      Sales|  7|  Grace| 72000|
| 38|Engineering|  8|  Henry| 95000|
| 27|  Marketing|  9|    Ivy| 68000|
| 41|      Sales| 10|   Jack| 78000|
+---+-----------+---+-------+------+

+----------+--------+------+----------+--------+
|      date|order_id| price|   product|quantity|
+----------+--------+------+----------+--------+
|2024-01-18|       4| 350.0|   Monitor|       1|
|2024-01-19|       5| 89.99|    Webcam|       4|
|2024-01-20|       6| 120.0|Headphones|       2|
|2024-01-15|       1|1200.0|    Laptop|       2|
|2024-01-16|       2|  25.0|     Mouse|       5|
|2024-01-17|       3|  75.0|  Keyboard|       3|
+----------+--------+------+----------+-

### Parquet Files (Recommended for Big Data)

In [28]:
# Reading Parquet
df = spark.read.parquet("./sample_data/data.parquet")
df.show()

# Parquet is columnar and compressed - best for analytics

+---+-------+---+-----------+------+
| id|   name|age| department|salary|
+---+-------+---+-----------+------+
|  1|  Alice| 25|Engineering| 80000|
|  2|    Bob| 30|  Marketing| 65000|
|  3|Charlie| 35|Engineering| 90000|
|  4|  Diana| 28|      Sales| 70000|
|  5|    Eve| 32|Engineering| 85000|
|  6|  Frank| 45|  Marketing| 75000|
|  7|  Grace| 29|      Sales| 72000|
|  8|  Henry| 38|Engineering| 95000|
|  9|    Ivy| 27|  Marketing| 68000|
| 10|   Jack| 41|      Sales| 78000|
+---+-------+---+-----------+------+



### Other Formats

In [29]:
# Reading text files (one line per row)
df = spark.read.text("sample_data/data.txt")
df.show()

# Reading ORC
df = spark.read.orc("sample_data/data.orc")
df.show()

# Reading from databases (requires JDBC driver)
#df = spark.read \
#    .format("jdbc") \
#    .option("url", "jdbc:postgresql://localhost:5432/mydb") \
#    .option("dbtable", "users") \
#    .option("user", "username") \
#    .option("password", "password") \
#    .load()

+--------------------+
|               value|
+--------------------+
|Apache Spark is a...|
|It provides high-...|
|Spark runs on Had...|
|It can access div...|
|PySpark is the Py...|
|DataFrames are th...|
|RDDs are the fund...|
|Spark provides bu...|
+--------------------+

+---+-------+---+-----------+------+
| id|   name|age| department|salary|
+---+-------+---+-----------+------+
|  1|  Alice| 25|Engineering| 80000|
|  2|    Bob| 30|  Marketing| 65000|
|  3|Charlie| 35|Engineering| 90000|
|  4|  Diana| 28|      Sales| 70000|
|  5|    Eve| 32|Engineering| 85000|
|  6|  Frank| 45|  Marketing| 75000|
|  7|  Grace| 29|      Sales| 72000|
|  8|  Henry| 38|Engineering| 95000|
|  9|    Ivy| 27|  Marketing| 68000|
| 10|   Jack| 41|      Sales| 78000|
+---+-------+---+-----------+------+



## 3. Exploring DataFrames

In [30]:
# Show first 10 rows
df.show()

# Show specific number of rows
df.show(5)

# Show without truncating long strings
df.show(truncate=False)

# Print schema
df.printSchema()
# Output:
# root
#  |-- id: integer (nullable = true)
#  |-- name: string (nullable = true)
#  |-- age: integer (nullable = true)

# Get column names
print(df.columns)  # ['id', 'name', 'age', ...]

# Get schema
print(df.schema)

# Count rows
print(df.count())  # 5

# Get summary statistics
df.describe().show()
df.summary().show()  # More detailed than describe()

# First few rows as list of Row objects
rows = df.head(3)
first_row = df.first()

+---+-------+---+-----------+------+
| id|   name|age| department|salary|
+---+-------+---+-----------+------+
|  1|  Alice| 25|Engineering| 80000|
|  2|    Bob| 30|  Marketing| 65000|
|  3|Charlie| 35|Engineering| 90000|
|  4|  Diana| 28|      Sales| 70000|
|  5|    Eve| 32|Engineering| 85000|
|  6|  Frank| 45|  Marketing| 75000|
|  7|  Grace| 29|      Sales| 72000|
|  8|  Henry| 38|Engineering| 95000|
|  9|    Ivy| 27|  Marketing| 68000|
| 10|   Jack| 41|      Sales| 78000|
+---+-------+---+-----------+------+

+---+-------+---+-----------+------+
| id|   name|age| department|salary|
+---+-------+---+-----------+------+
|  1|  Alice| 25|Engineering| 80000|
|  2|    Bob| 30|  Marketing| 65000|
|  3|Charlie| 35|Engineering| 90000|
|  4|  Diana| 28|      Sales| 70000|
|  5|    Eve| 32|Engineering| 85000|
+---+-------+---+-----------+------+
only showing top 5 rows

+---+-------+---+-----------+------+
|id |name   |age|department |salary|
+---+-------+---+-----------+------+
|1  |Alice  

## 4. Selecting and Projecting Data

In [31]:
from pyspark.sql.functions import col, expr

# Select specific columns
df.select("name", "age").show()

# Using col()
df.select(col("name"), col("age")).show()

# Select all columns
df.select("*").show()

# Select with expressions
df.select(
    col("name"),
    col("age"),
    (col("age") + 5).alias("age_plus_5")
).show()

# Using expr() for SQL-like expressions
df.select(
    expr("name"),
    expr("age * 2 as double_age")
).show()

# Select and rename
df.select(
    col("name").alias("employee_name"),
    col("age").alias("employee_age")
).show()

# SelectExpr - SQL expressions
df.selectExpr(
    "name",
    "age",
    "age + 5 as age_in_five_years"
).show()

+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
|  Diana| 28|
|    Eve| 32|
|  Frank| 45|
|  Grace| 29|
|  Henry| 38|
|    Ivy| 27|
|   Jack| 41|
+-------+---+

+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
|  Diana| 28|
|    Eve| 32|
|  Frank| 45|
|  Grace| 29|
|  Henry| 38|
|    Ivy| 27|
|   Jack| 41|
+-------+---+

+---+-------+---+-----------+------+
| id|   name|age| department|salary|
+---+-------+---+-----------+------+
|  1|  Alice| 25|Engineering| 80000|
|  2|    Bob| 30|  Marketing| 65000|
|  3|Charlie| 35|Engineering| 90000|
|  4|  Diana| 28|      Sales| 70000|
|  5|    Eve| 32|Engineering| 85000|
|  6|  Frank| 45|  Marketing| 75000|
|  7|  Grace| 29|      Sales| 72000|
|  8|  Henry| 38|Engineering| 95000|
|  9|    Ivy| 27|  Marketing| 68000|
| 10|   Jack| 41|      Sales| 78000|
+---+-------+---+-----------+------+

+-------+---+----------+
|   name|age|age_plus_5|
+-------+---+----------+
|  Alice| 25

## 5. Filtering Data

In [45]:
from pyspark.sql.functions import col

# Simple filter
df.filter(col("age") > 28).show()

# Alternative: using where (same as filter)
df.where(col("age") > 28).show()

# Using SQL string
df.filter("age > 28").show()

# Multiple conditions - AND
df.filter((col("age") > 25) & (col("department") == "Engineering")).show()

# Multiple conditions - OR
df.filter((col("age") > 30) | (col("department") == "Engineering")).show()

# NOT
df.filter(~(col("age") > 30)).show()

# IN operator
df.filter(col("department").isin(["Engineering", "Sales"])).show()

# NULL checks
df.filter(col("age").isNull()).show()
df.filter(col("age").isNotNull()).show()

# String operations
df.filter(col("name").startswith("A")).show()
df.filter(col("name").endswith("e")).show()
df.filter(col("name").contains("li")).show()

# Chaining filters
df.filter(col("age") > 25) \
  .filter(col("department") == "Engineering") \
  .show()

+---+-------+---+-----------+------+
| id|   name|age| department|salary|
+---+-------+---+-----------+------+
|  2|    Bob| 30|  Marketing| 65000|
|  3|Charlie| 35|Engineering| 90000|
|  5|    Eve| 32|Engineering| 85000|
|  6|  Frank| 45|  Marketing| 75000|
|  7|  Grace| 29|      Sales| 72000|
|  8|  Henry| 38|Engineering| 95000|
| 10|   Jack| 41|      Sales| 78000|
+---+-------+---+-----------+------+

+---+-------+---+-----------+------+
| id|   name|age| department|salary|
+---+-------+---+-----------+------+
|  2|    Bob| 30|  Marketing| 65000|
|  3|Charlie| 35|Engineering| 90000|
|  5|    Eve| 32|Engineering| 85000|
|  6|  Frank| 45|  Marketing| 75000|
|  7|  Grace| 29|      Sales| 72000|
|  8|  Henry| 38|Engineering| 95000|
| 10|   Jack| 41|      Sales| 78000|
+---+-------+---+-----------+------+

+---+-------+---+-----------+------+
| id|   name|age| department|salary|
+---+-------+---+-----------+------+
|  2|    Bob| 30|  Marketing| 65000|
|  3|Charlie| 35|Engineering| 90000|

## 6. Adding and Modifying Columns

In [47]:
from pyspark.sql.functions import lit, when, col

# Add a new column with constant value
df_with_country = df.withColumn("country", lit("USA"))
df_with_country.show()

# Add column based on calculation
df_with_age_category = df.withColumn(
    "age_category",
    when(col("age") < 30, "Young")
    .when(col("age") < 40, "Middle")
    .otherwise("Senior")
)
df_with_age_category.show()

# Multiple when conditions
df_with_salary = df.withColumn(
    "estimated_salary",
    when(col("department") == "Engineering", 80000)
    .when(col("department") == "Data Scientist", 90000)
    .when(col("department") == "Manager", 100000)
    .otherwise(70000)
)
df_with_salary.show()

# Rename column
df_renamed = df.withColumnRenamed("name", "employee_name")
df_renamed.show()

# Drop column
df_dropped = df.drop("department")
df_dropped.show()

# Drop multiple columns
df_dropped_multi = df.drop("department", "age")
df_dropped_multi.show()

+---+-------+---+-----------+------+-------+
| id|   name|age| department|salary|country|
+---+-------+---+-----------+------+-------+
|  1|  Alice| 25|Engineering| 80000|    USA|
|  2|    Bob| 30|  Marketing| 65000|    USA|
|  3|Charlie| 35|Engineering| 90000|    USA|
|  4|  Diana| 28|      Sales| 70000|    USA|
|  5|    Eve| 32|Engineering| 85000|    USA|
|  6|  Frank| 45|  Marketing| 75000|    USA|
|  7|  Grace| 29|      Sales| 72000|    USA|
|  8|  Henry| 38|Engineering| 95000|    USA|
|  9|    Ivy| 27|  Marketing| 68000|    USA|
| 10|   Jack| 41|      Sales| 78000|    USA|
+---+-------+---+-----------+------+-------+

+---+-------+---+-----------+------+------------+
| id|   name|age| department|salary|age_category|
+---+-------+---+-----------+------+------------+
|  1|  Alice| 25|Engineering| 80000|       Young|
|  2|    Bob| 30|  Marketing| 65000|      Middle|
|  3|Charlie| 35|Engineering| 90000|      Middle|
|  4|  Diana| 28|      Sales| 70000|       Young|
|  5|    Eve| 32|En

## 7. Sorting Data

In [48]:
from pyspark.sql.functions import col, asc, desc

# Sort ascending (default)
df.orderBy("age").show()
df.sort("age").show()  # same as orderBy

# Sort descending
df.orderBy(col("age").desc()).show()
df.orderBy(desc("age")).show()

# Multiple columns
df.orderBy(col("department"), col("age").desc()).show()

# Using asc and desc
df.orderBy(asc("department"), desc("age")).show()

+---+-------+---+-----------+------+
| id|   name|age| department|salary|
+---+-------+---+-----------+------+
|  1|  Alice| 25|Engineering| 80000|
|  9|    Ivy| 27|  Marketing| 68000|
|  4|  Diana| 28|      Sales| 70000|
|  7|  Grace| 29|      Sales| 72000|
|  2|    Bob| 30|  Marketing| 65000|
|  5|    Eve| 32|Engineering| 85000|
|  3|Charlie| 35|Engineering| 90000|
|  8|  Henry| 38|Engineering| 95000|
| 10|   Jack| 41|      Sales| 78000|
|  6|  Frank| 45|  Marketing| 75000|
+---+-------+---+-----------+------+

+---+-------+---+-----------+------+
| id|   name|age| department|salary|
+---+-------+---+-----------+------+
|  1|  Alice| 25|Engineering| 80000|
|  9|    Ivy| 27|  Marketing| 68000|
|  4|  Diana| 28|      Sales| 70000|
|  7|  Grace| 29|      Sales| 72000|
|  2|    Bob| 30|  Marketing| 65000|
|  5|    Eve| 32|Engineering| 85000|
|  3|Charlie| 35|Engineering| 90000|
|  8|  Henry| 38|Engineering| 95000|
| 10|   Jack| 41|      Sales| 78000|
|  6|  Frank| 45|  Marketing| 75000|


## 8. Grouping and Aggregations

In [52]:
from pyspark.sql.functions import count, sum, avg, min, max, collect_list, countDistinct

# Simple groupBy with count
df.groupBy("department").count().show()

# Multiple aggregations
df.groupBy("department").agg(
    count("*").alias("count"),
    avg("age").alias("avg_age"),
    min("age").alias("min_age"),
    max("age").alias("max_age")
).show()

# Without groupBy - aggregating entire DataFrame
df.agg(
    count("*").alias("total_count"),
    avg("age").alias("overall_avg_age")
).show()

# Example with more data
sales_data = [
    ("Electronics", "Laptop", 1000, 2),
    ("Electronics", "Phone", 500, 5),
    ("Electronics", "Tablet", 300, 3),
    ("Clothing", "Shirt", 50, 10),
    ("Clothing", "Pants", 80, 7),
    ("Clothing", "Shoes", 100, 4)
]

sales_df = spark.createDataFrame(
    sales_data, 
    ["category", "product", "price", "quantity"]
)

sales_df.show()

# Group by category
category_stats = sales_df.groupBy("category").agg(
    count("product").alias("num_products"),
    sum(col("price") * col("quantity")).alias("total_revenue"),
    avg("price").alias("avg_price")
)
category_stats.show()

# Collect list of items
products_by_category = sales_df.groupBy("category").agg(
    collect_list("product").alias("products")
)
products_by_category.show(truncate=False)

# Count distinct
df.agg(countDistinct("department").alias("distinct_jobs")).show()

+-----------+-----+
| department|count|
+-----------+-----+
|      Sales|    3|
|Engineering|    4|
|  Marketing|    3|
+-----------+-----+

+-----------+-----+------------------+-------+-------+
| department|count|           avg_age|min_age|max_age|
+-----------+-----+------------------+-------+-------+
|      Sales|    3|32.666666666666664|     28|     41|
|Engineering|    4|              32.5|     25|     38|
|  Marketing|    3|              34.0|     27|     45|
+-----------+-----+------------------+-------+-------+

+-----------+---------------+
|total_count|overall_avg_age|
+-----------+---------------+
|         10|           33.0|
+-----------+---------------+

+-----------+-------+-----+--------+
|   category|product|price|quantity|
+-----------+-------+-----+--------+
|Electronics| Laptop| 1000|       2|
|Electronics|  Phone|  500|       5|
|Electronics| Tablet|  300|       3|
|   Clothing|  Shirt|   50|      10|
|   Clothing|  Pants|   80|       7|
|   Clothing|  Shoes|  100

## 9. Joins

In [54]:
# Sample data for joins
employees = [
    (1, "Alice", 101),
    (2, "Bob", 102),
    (3, "Charlie", 101),
    (4, "Diana", 103)
]

departments = [
    (101, "Engineering"),
    (102, "Marketing"),
    (103, "Sales")
]

emp_df = spark.createDataFrame(employees, ["emp_id", "name", "dept_id"])
dept_df = spark.createDataFrame(departments, ["dept_id", "dept_name"])

emp_df.show()
dept_df.show()

# Inner join (default)
result = emp_df.join(dept_df, "dept_id")
result.show()

# Explicit join condition
result = emp_df.join(dept_df, emp_df.dept_id == dept_df.dept_id)
result.show()

# Left join
result = emp_df.join(dept_df, "dept_id", "left")
result.show()

# Right join
result = emp_df.join(dept_df, "dept_id", "right")
result.show()

# Outer join
result = emp_df.join(dept_df, "dept_id", "outer")
result.show()

# Self join
df_aliased = df.alias("df1")
df2_aliased = df.alias("df2")
self_join = df_aliased.join(
    df2_aliased, 
    col("df1.age") == col("df2.age")
).select("df1.name", "df2.name", "df1.age")
self_join.show()

+------+-------+-------+
|emp_id|   name|dept_id|
+------+-------+-------+
|     1|  Alice|    101|
|     2|    Bob|    102|
|     3|Charlie|    101|
|     4|  Diana|    103|
+------+-------+-------+

+-------+-----------+
|dept_id|  dept_name|
+-------+-----------+
|    101|Engineering|
|    102|  Marketing|
|    103|      Sales|
+-------+-----------+

+-------+------+-------+-----------+
|dept_id|emp_id|   name|  dept_name|
+-------+------+-------+-----------+
|    103|     4|  Diana|      Sales|
|    101|     3|Charlie|Engineering|
|    101|     1|  Alice|Engineering|
|    102|     2|    Bob|  Marketing|
+-------+------+-------+-----------+

+------+-------+-------+-------+-----------+
|emp_id|   name|dept_id|dept_id|  dept_name|
+------+-------+-------+-------+-----------+
|     4|  Diana|    103|    103|      Sales|
|     1|  Alice|    101|    101|Engineering|
|     3|Charlie|    101|    101|Engineering|
|     2|    Bob|    102|    102|  Marketing|
+------+-------+-------+-------+

## 10. Working with Strings

In [56]:
from pyspark.sql.functions import (
    lower, upper, trim, ltrim, rtrim, 
    concat, concat_ws, substring, length,
    split, regexp_replace
)

text_data = [
    (1, "  Hello World  "),
    (2, "PYSPARK"),
    (3, "Data Engineering")
]

text_df = spark.createDataFrame(text_data, ["id", "text"])

# String transformations
result = text_df.select(
    col("text"),
    lower(col("text")).alias("lowercase"),
    upper(col("text")).alias("uppercase"),
    trim(col("text")).alias("trimmed"),
    length(col("text")).alias("length")
)
result.show(truncate=False)

# Substring (1-indexed)
text_df.select(substring(col("text"), 1, 5).alias("first_5")).show()

# Concatenate
df.select(
    concat(col("name"), lit(" - "), col("department")).alias("name_job")
).show()

# Concatenate with separator
df.select(
    concat_ws(" | ", col("name"), col("department")).alias("combined")
).show()

# Split string
text_df.select(
    split(col("text"), " ").alias("words")
).show(truncate=False)

# Replace
text_df.select(
    regexp_replace(col("text"), "Hello", "Hi").alias("replaced")
).show(truncate=False)

+----------------+----------------+----------------+----------------+------+
|text            |lowercase       |uppercase       |trimmed         |length|
+----------------+----------------+----------------+----------------+------+
|  Hello World   |  hello world   |  HELLO WORLD   |Hello World     |15    |
|PYSPARK         |pyspark         |PYSPARK         |PYSPARK         |7     |
|Data Engineering|data engineering|DATA ENGINEERING|Data Engineering|16    |
+----------------+----------------+----------------+----------------+------+

+-------+
|first_5|
+-------+
|    Hel|
|  PYSPA|
|  Data |
+-------+

+--------------------+
|            name_job|
+--------------------+
| Alice - Engineering|
|     Bob - Marketing|
|Charlie - Enginee...|
|       Diana - Sales|
|   Eve - Engineering|
|   Frank - Marketing|
|       Grace - Sales|
| Henry - Engineering|
|     Ivy - Marketing|
|        Jack - Sales|
+--------------------+

+--------------------+
|            combined|
+-------------------

## 11. Working with Dates and Timestamps

In [57]:
from pyspark.sql.functions import (
    current_date, current_timestamp, date_add, date_sub,
    year, month, dayofmonth, dayofweek, hour, minute,
    to_date, to_timestamp, datediff, months_between
)

# Create sample date data
from datetime import datetime, date

date_data = [
    (1, date(2024, 1, 15), datetime(2024, 1, 15, 10, 30, 0)),
    (2, date(2024, 2, 20), datetime(2024, 2, 20, 14, 45, 30)),
    (3, date(2024, 3, 10), datetime(2024, 3, 10, 9, 15, 45))
]

date_df = spark.createDataFrame(date_data, ["id", "date", "timestamp"])

# Extract date parts
date_df.select(
    col("date"),
    year(col("date")).alias("year"),
    month(col("date")).alias("month"),
    dayofmonth(col("date")).alias("day"),
    dayofweek(col("date")).alias("day_of_week")
).show()

# Date arithmetic
date_df.select(
    col("date"),
    date_add(col("date"), 7).alias("next_week"),
    date_sub(col("date"), 7).alias("last_week")
).show()

# Current date and timestamp
df.select(
    current_date().alias("today"),
    current_timestamp().alias("now")
).show(1, truncate=False)

# Parse string to date
string_dates = [("2024-01-15",), ("2024-02-20",)]
string_df = spark.createDataFrame(string_dates, ["date_string"])
string_df.select(
    to_date(col("date_string"), "yyyy-MM-dd").alias("date")
).show()

# Date difference
date_df.select(
    datediff(current_date(), col("date")).alias("days_ago")
).show()

+----------+----+-----+---+-----------+
|      date|year|month|day|day_of_week|
+----------+----+-----+---+-----------+
|2024-01-15|2024|    1| 15|          2|
|2024-02-20|2024|    2| 20|          3|
|2024-03-10|2024|    3| 10|          1|
+----------+----+-----+---+-----------+

+----------+----------+----------+
|      date| next_week| last_week|
+----------+----------+----------+
|2024-01-15|2024-01-22|2024-01-08|
|2024-02-20|2024-02-27|2024-02-13|
|2024-03-10|2024-03-17|2024-03-03|
+----------+----------+----------+

+----------+-----------------------+
|today     |now                    |
+----------+-----------------------+
|2026-01-28|2026-01-28 17:39:06.285|
+----------+-----------------------+
only showing top 1 row

+----------+
|      date|
+----------+
|2024-01-15|
|2024-02-20|
+----------+

+--------+
|days_ago|
+--------+
|     744|
|     708|
|     689|
+--------+



## 12. Window Functions

Window functions perform calculations across rows related to the current row.

In [59]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, sum

# Sample sales data
sales_data = [
    ("2024-01", "Electronics", 1000),
    ("2024-01", "Clothing", 500),
    ("2024-01", "Food", 300),
    ("2024-02", "Electronics", 1200),
    ("2024-02", "Clothing", 600),
    ("2024-02", "Food", 400),
    ("2024-03", "Electronics", 900),
    ("2024-03", "Clothing", 700),
    ("2024-03", "Food", 350)
]

sales_df = spark.createDataFrame(sales_data, ["month", "category", "sales"])

sales_df.show()

# Define window specification
window_spec = Window.partitionBy("category").orderBy("month")

# Row number
sales_df.withColumn("row_num", row_number().over(window_spec)).show()

# Rank (with gaps)
rank_window = Window.partitionBy("month").orderBy(desc("sales"))
sales_df.withColumn("rank", rank().over(rank_window)).show()

# Dense rank (no gaps)
sales_df.withColumn("dense_rank", dense_rank().over(rank_window)).show()

# Running total
sales_df.withColumn(
    "running_total", 
    sum("sales").over(window_spec)
).show()

# Lag and lead (previous and next values)
sales_df.select(
    col("month"),
    col("category"),
    col("sales"),
    lag("sales", 1).over(window_spec).alias("prev_month_sales"),
    lead("sales", 1).over(window_spec).alias("next_month_sales")
).show()

# Moving average (previous, current, next)
moving_window = Window.partitionBy("category") \
    .orderBy("month") \
    .rowsBetween(-1, 1)

sales_df.withColumn(
    "moving_avg",
    avg("sales").over(moving_window)
).show()

# Range-based window (all rows within value range)
range_window = Window.partitionBy("category") \
    .orderBy("sales") \
    .rangeBetween(-100, 100)

+-------+-----------+-----+
|  month|   category|sales|
+-------+-----------+-----+
|2024-01|Electronics| 1000|
|2024-01|   Clothing|  500|
|2024-01|       Food|  300|
|2024-02|Electronics| 1200|
|2024-02|   Clothing|  600|
|2024-02|       Food|  400|
|2024-03|Electronics|  900|
|2024-03|   Clothing|  700|
|2024-03|       Food|  350|
+-------+-----------+-----+

+-------+-----------+-----+-------+
|  month|   category|sales|row_num|
+-------+-----------+-----+-------+
|2024-01|       Food|  300|      1|
|2024-02|       Food|  400|      2|
|2024-03|       Food|  350|      3|
|2024-01|Electronics| 1000|      1|
|2024-02|Electronics| 1200|      2|
|2024-03|Electronics|  900|      3|
|2024-01|   Clothing|  500|      1|
|2024-02|   Clothing|  600|      2|
|2024-03|   Clothing|  700|      3|
+-------+-----------+-----+-------+

+-------+-----------+-----+----+
|  month|   category|sales|rank|
+-------+-----------+-----+----+
|2024-02|Electronics| 1200|   1|
|2024-02|   Clothing|  600|   2|
|

## 13. Handling Missing Data

In [64]:
from pyspark.sql.functions import coalesce, when, isnan

# Sample data with nulls
data_with_nulls = [
    (1, "Alice", 25, 80000),
    (2, "Bob", None, 90000),
    (3, "Charlie", 35, None),
    (4, None, 28, 85000),
    (5, "Eve", None, None)
]

null_df = spark.createDataFrame(
    data_with_nulls, 
    ["id", "name", "age", "salary"]
)

null_df.show()

# Drop rows with any null values
null_df.dropna().show()

# Drop rows where all values are null
null_df.dropna(how='all').show()

# Drop rows with nulls in specific columns
null_df.dropna(subset=['age']).show()

# Fill null values
null_df.fillna(0).show()  # Fill all numeric nulls with 0

# Fill with different values for different columns
null_df.fillna({'age': 0, 'salary': 50000}).show()

# Replace nulls with column default
null_df.fillna({'name': 'Unknown', 'age': 30}).show()


+---+-------+----+------+
| id|   name| age|salary|
+---+-------+----+------+
|  1|  Alice|  25| 80000|
|  2|    Bob|null| 90000|
|  3|Charlie|  35|  null|
|  4|   null|  28| 85000|
|  5|    Eve|null|  null|
+---+-------+----+------+

+---+-----+---+------+
| id| name|age|salary|
+---+-----+---+------+
|  1|Alice| 25| 80000|
+---+-----+---+------+

+---+-------+----+------+
| id|   name| age|salary|
+---+-------+----+------+
|  1|  Alice|  25| 80000|
|  2|    Bob|null| 90000|
|  3|Charlie|  35|  null|
|  4|   null|  28| 85000|
|  5|    Eve|null|  null|
+---+-------+----+------+

+---+-------+---+------+
| id|   name|age|salary|
+---+-------+---+------+
|  1|  Alice| 25| 80000|
|  3|Charlie| 35|  null|
|  4|   null| 28| 85000|
+---+-------+---+------+

+---+-------+---+------+
| id|   name|age|salary|
+---+-------+---+------+
|  1|  Alice| 25| 80000|
|  2|    Bob|  0| 90000|
|  3|Charlie| 35|     0|
|  4|   null| 28| 85000|
|  5|    Eve|  0|     0|
+---+-------+---+------+

+---+-------

## 14. Writing Data

In [66]:
# Write to CSV
df.write.mode("overwrite").csv("sample_data/output.csv", header=True)

# Write to JSON
df.write.mode("overwrite").json("sample_data/output.json")

# Write to Parquet (recommended)
df.write.mode("overwrite").parquet("sample_data/output.parquet")

# Write modes:
# - "overwrite": Replace existing data
# - "append": Add to existing data
# - "ignore": Don't write if data exists
# - "error" (default): Throw error if data exists

# Partitioned write (for big data)
df.write.partitionBy("department").parquet("sample_data/output_partitioned")

# Write to single file
df.coalesce(1).write.csv("sample_data/output_single.csv", header=True)


## 15. SQL Queries

In [69]:
# Register DataFrame as temporary view
df.createOrReplaceTempView("employees")

# Run SQL queries
result = spark.sql("""
    SELECT department, COUNT(*) as count, AVG(age) as avg_age
    FROM employees
    WHERE age > 25
    GROUP BY department
    ORDER BY count DESC
""")
result.show()

# Mix SQL and DataFrame API
sql_result = spark.sql("SELECT * FROM employees WHERE age > 28")
final_result = sql_result.groupBy("department").count()
final_result.show()

# Register global view (accessible across sessions)
df.createOrReplaceGlobalTempView("global_employees")
spark.sql("SELECT * FROM global_temp.global_employees").show()

+-----------+-----+------------------+
| department|count|           avg_age|
+-----------+-----+------------------+
|Engineering|    3|              35.0|
|  Marketing|    3|              34.0|
|      Sales|    3|32.666666666666664|
+-----------+-----+------------------+

+-----------+-----+
| department|count|
+-----------+-----+
|      Sales|    2|
|Engineering|    3|
|  Marketing|    2|
+-----------+-----+

+---+-------+---+-----------+------+
| id|   name|age| department|salary|
+---+-------+---+-----------+------+
|  1|  Alice| 25|Engineering| 80000|
|  2|    Bob| 30|  Marketing| 65000|
|  3|Charlie| 35|Engineering| 90000|
|  4|  Diana| 28|      Sales| 70000|
|  5|    Eve| 32|Engineering| 85000|
|  6|  Frank| 45|  Marketing| 75000|
|  7|  Grace| 29|      Sales| 72000|
|  8|  Henry| 38|Engineering| 95000|
|  9|    Ivy| 27|  Marketing| 68000|
| 10|   Jack| 41|      Sales| 78000|
+---+-------+---+-----------+------+

