In [None]:
# Setup
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
import os

# Create SparkSession
spark = SparkSession.builder.appName("Data Sources and Formats").getOrCreate()

# Create sample data
sample_data = [
    (1, "Alice", "Engineering", 75000, "2020-01-15", ["Python", "SQL"]),
    (2, "Bob", "Sales", 65000, "2019-03-20", ["CRM", "Excel"]),
    (3, "Charlie", "Engineering", 80000, "2018-06-10", ["Java", "Scala"]),
    (4, "Diana", "Marketing", 70000, "2021-02-28", ["Analytics", "Design"]),
    (5, "Eve", "Sales", 68000, "2017-11-05", ["Negotiation", "Presentation"])
]

df = spark.createDataFrame(sample_data, ["id", "name", "department", "salary", "hire_date", "skills"])

# Create data directory
os.makedirs("../data/formats", exist_ok=True)

print("Sample DataFrame:")
df.show()
df.printSchema()


In [None]:
# Writing CSV files
print("=== WRITING CSV FILES ===")

# Basic CSV write
df.write.mode("overwrite").option("header", "true").csv("../data/formats/employees.csv")
print("Basic CSV file written")

# CSV with custom options
df.write.mode("overwrite") \
  .option("header", "true") \
  .option("delimiter", "|") \
  .option("quote", '"') \
  .option("escape", "\\") \
  .csv("../data/formats/employees_custom.csv")
print("Custom CSV file written")

# Reading CSV files
print("\n=== READING CSV FILES ===")

# Basic CSV read
df_csv_basic = spark.read.option("header", "true").csv("../data/formats/employees.csv")
print("Basic CSV read:")
df_csv_basic.show()
df_csv_basic.printSchema()

# CSV read with schema inference
df_csv_infer = spark.read.option("header", "true").option("inferSchema", "true").csv("../data/formats/employees.csv")
print("\nCSV read with schema inference:")
df_csv_infer.show()
df_csv_infer.printSchema()

# CSV read with custom delimiter
df_csv_custom = spark.read.option("header", "true").option("delimiter", "|").csv("../data/formats/employees_custom.csv")
print("\nCSV read with custom delimiter:")
df_csv_custom.show()


In [None]:
# Writing JSON files
print("=== WRITING JSON FILES ===")

# Basic JSON write
df.write.mode("overwrite").json("../data/formats/employees.json")
print("JSON file written")

# Reading JSON files
print("\n=== READING JSON FILES ===")

df_json = spark.read.json("../data/formats/employees.json")
print("JSON read:")
df_json.show()
df_json.printSchema()

# Create more complex JSON data
complex_data = [
    {
        "id": 1,
        "personal": {"name": "Alice", "age": 30, "email": "alice@company.com"},
        "job": {"title": "Engineer", "salary": 75000, "department": "Engineering"},
        "skills": ["Python", "SQL", "Spark"],
        "projects": [
            {"name": "Project A", "status": "completed"},
            {"name": "Project B", "status": "in_progress"}
        ]
    },
    {
        "id": 2,
        "personal": {"name": "Bob", "age": 25, "email": "bob@company.com"},
        "job": {"title": "Analyst", "salary": 65000, "department": "Sales"},
        "skills": ["Excel", "PowerBI"],
        "projects": [
            {"name": "Project C", "status": "completed"}
        ]
    }
]

# Convert to DataFrame and write
df_complex = spark.createDataFrame(complex_data)
df_complex.write.mode("overwrite").json("../data/formats/employees_complex.json")

# Read complex JSON
df_complex_read = spark.read.json("../data/formats/employees_complex.json")
print("\nComplex JSON structure:")
df_complex_read.show(truncate=False)
df_complex_read.printSchema()

# Access nested fields
print("\nAccessing nested fields:")
df_complex_read.select("id", "personal.name", "job.title", "job.salary").show()


In [None]:
# Writing Parquet files
print("=== WRITING PARQUET FILES ===")

# Basic Parquet write
df.write.mode("overwrite").parquet("../data/formats/employees.parquet")
print("Parquet file written")

# Parquet with partitioning
df.write.mode("overwrite").partitionBy("department").parquet("../data/formats/employees_partitioned.parquet")
print("Partitioned Parquet file written")

# Reading Parquet files
print("\n=== READING PARQUET FILES ===")

df_parquet = spark.read.parquet("../data/formats/employees.parquet")
print("Parquet read:")
df_parquet.show()
df_parquet.printSchema()

# Read partitioned Parquet
df_partitioned = spark.read.parquet("../data/formats/employees_partitioned.parquet")
print("\nPartitioned Parquet read:")
df_partitioned.show()

# Parquet preserves schema perfectly
print("\nParquet schema preservation:")
print("Original schema:", df.schema.simpleString())
print("Parquet schema:", df_parquet.schema.simpleString())
print("Schemas match:", df.schema == df_parquet.schema)

# Parquet with compression
df.write.mode("overwrite").option("compression", "snappy").parquet("../data/formats/employees_snappy.parquet")
df.write.mode("overwrite").option("compression", "gzip").parquet("../data/formats/employees_gzip.parquet")
print("\nParquet files with different compression written")


In [None]:
# Create sample data for exercises
sales_data = [
    (1, "2024-01-15", "Product A", 100, 25.50, "Electronics"),
    (2, "2024-01-16", "Product B", 50, 15.75, "Books"),
    (3, "2024-01-17", "Product C", 75, 30.00, "Electronics"),
    (4, "2024-01-18", "Product D", 200, 5.25, "Books"),
    (5, "2024-01-19", "Product E", 25, 45.00, "Electronics")
]

sales_df = spark.createDataFrame(sales_data, ["order_id", "date", "product", "quantity", "price", "category"])
print("Sales data for exercises:")
sales_df.show()

print("\n=== EXERCISE 1: CSV Operations ===")
print("TODO: Write the sales_df to CSV with the following requirements:")
print("1. Include header")
print("2. Use semicolon as delimiter")
print("3. Write to '../data/formats/sales_exercise.csv'")
print("4. Read it back and verify the data")

# Your code here:
# sales_df.write...

print("\n=== EXERCISE 2: JSON with Nested Structure ===")
print("TODO: Transform the sales data to have nested structure:")
print("1. Create 'order_info' struct with order_id and date")
print("2. Create 'product_info' struct with product, quantity, price")
print("3. Keep category as top-level field")
print("4. Write to JSON and read back")

# Your code here:
# nested_sales = sales_df.select(...)

print("\n=== EXERCISE 3: Parquet with Partitioning ===")
print("TODO: Write sales data to Parquet with partitioning:")
print("1. Partition by 'category'")
print("2. Use snappy compression")
print("3. Write to '../data/formats/sales_partitioned.parquet'")
print("4. Read back and show partition information")

# Your code here:
# sales_df.write...

print("\n=== EXERCISE 4: Format Comparison ===")
print("TODO: Compare file sizes and read performance:")
print("1. Write the same data to CSV, JSON, and Parquet")
print("2. Check file sizes (use os.path.getsize)")
print("3. Time the read operations")
print("4. Compare schemas after reading")

# Your code here:
# import time
# import os

print("\n=== EXERCISE 5: Schema Evolution ===")
print("TODO: Handle schema evolution scenario:")
print("1. Write original sales_df to Parquet")
print("2. Create new version with additional 'discount' column")
print("3. Write new version to same location with mergeSchema option")
print("4. Read back and handle missing values")

# Your code here:
# sales_v2 = sales_df.withColumn("discount", F.lit(0.0))
