# Delta Lake Introduction

## 🛠️ TASK 1: Convert CSV to Delta Format

In [0]:
# Step 1: Import libraries
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

# Step 2: Create Spark session with Delta Lake support
builder = SparkSession.builder \
    .appName("DeltaLakeIntro") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Step 3: Create a sample Spark DataFrame directly
data = [
    (1, 'Alice', 'Engineering', 75000),
    (2, 'Bob', 'Sales', 65000),
    (3, 'Charlie', 'Engineering', 80000),
    (4, 'David', 'HR', 55000),
    (5, 'Eve', 'Sales', 70000)
]
columns = ['employee_id', 'name', 'department', 'salary']
df = spark.createDataFrame(data, columns)

# Step 4: Show the data
print("Original data:")
df.show()

# Step 5: Write to Delta managed table
df.write.format("delta").mode("overwrite").saveAsTable("employees_delta")
print("\n✅ Successfully converted to Delta managed table: employees_delta")

# Step 6: Read back from Delta managed table to verify
df_delta = spark.read.table("employees_delta")
print("\nData read from Delta Lake managed table:")
df_delta.show()

Original data:
+-----------+-------+-----------+------+
|employee_id|   name| department|salary|
+-----------+-------+-----------+------+
|          1|  Alice|Engineering| 75000|
|          2|    Bob|      Sales| 65000|
|          3|Charlie|Engineering| 80000|
|          4|  David|         HR| 55000|
|          5|    Eve|      Sales| 70000|
+-----------+-------+-----------+------+


✅ Successfully converted to Delta managed table: employees_delta

Data read from Delta Lake managed table:
+-----------+-------+-----------+------+
|employee_id|   name| department|salary|
+-----------+-------+-----------+------+
|          1|  Alice|Engineering| 75000|
|          2|    Bob|      Sales| 65000|
|          3|Charlie|Engineering| 80000|
|          4|  David|         HR| 55000|
|          5|    Eve|      Sales| 70000|
+-----------+-------+-----------+------+



## 🛠️ TASK 2: Create Delta Tables (SQL and PySpark)

### Method 1: Using PySpark

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Define schema explicitly
schema = StructType([
    StructField("product_id", IntegerType(), False),  # False = NOT NULL
    StructField("product_name", StringType(), False),
    StructField("category", StringType(), True),      # True = NULLABLE
    StructField("price", DoubleType(), False)
])

# Create sample data
products_data = [
    (1, "Laptop", "Electronics", 999.99),
    (2, "Mouse", "Electronics", 25.99),
    (3, "Desk", "Furniture", 299.99),
    (4, "Chair", "Furniture", 199.99)
]

# Create DataFrame
df_products = spark.createDataFrame(products_data, schema)

# Write as Delta managed table
df_products.write.format("delta").mode("overwrite").saveAsTable("products_delta")

print("✅ Delta table created using PySpark!")
df_products.show()

✅ Delta table created using PySpark!
+----------+------------+-----------+------+
|product_id|product_name|   category| price|
+----------+------------+-----------+------+
|         1|      Laptop|Electronics|999.99|
|         2|       Mouse|Electronics| 25.99|
|         3|        Desk|  Furniture|299.99|
|         4|       Chair|  Furniture|199.99|
+----------+------------+-----------+------+



### Method 2: Using SQL

In [0]:
# First, register the DataFrame as a temp view
df_products.createOrReplaceTempView("products_temp")

# Create Delta table using SQL (no LOCATION clause)
spark.sql("""
    CREATE OR REPLACE TABLE products_delta_sql
    USING DELTA
    AS SELECT * FROM products_temp
""")

print("✅ Delta table created using SQL!")

# Query the table
spark.sql("SELECT * FROM products_delta_sql").show()

✅ Delta table created using SQL!
+----------+------------+-----------+------+
|product_id|product_name|   category| price|
+----------+------------+-----------+------+
|         1|      Laptop|Electronics|999.99|
|         2|       Mouse|Electronics| 25.99|
|         3|        Desk|  Furniture|299.99|
|         4|       Chair|  Furniture|199.99|
+----------+------------+-----------+------+



### Creating an Empty Table with Schema

In [0]:
# SQL approach
spark.sql("""
    CREATE TABLE IF NOT EXISTS orders_delta (
        order_id INT NOT NULL,
        customer_name STRING NOT NULL,
        order_date DATE,
        total_amount DOUBLE
    )
    USING DELTA
""")

print("✅ Empty Delta table created with schema!")

✅ Empty Delta table created with schema!


## 🛠️ TASK 3: Test Schema Enforcement

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define strict schema
schema = StructType([
    StructField("user_id", IntegerType(), False),
    StructField("username", StringType(), False),
    StructField("age", IntegerType(), False)
])

# Create initial data
initial_data = [
    (1, "alice", 25),
    (2, "bob", 30)
]

df_initial = spark.createDataFrame(initial_data, schema)
df_initial.write.format("delta").mode("overwrite").saveAsTable("users_delta")

print("✅ Initial table created")
df_initial.show()

✅ Initial table created
+-------+--------+---+
|user_id|username|age|
+-------+--------+---+
|      1|   alice| 25|
|      2|     bob| 30|
+-------+--------+---+



### Test 1: Try Adding Data with WRONG Data Type

In [0]:
# This will FAIL because age is string instead of integer
wrong_type_data = [
    (3, "charlie", "twenty-five")  # age as string - WRONG!
]

try:
    # Try to create DataFrame with wrong type
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    
    wrong_schema = StructType([
        StructField("user_id", IntegerType(), False),
        StructField("username", StringType(), False),
        StructField("age", StringType(), False)  # String instead of Int
    ])
    
    df_wrong = spark.createDataFrame(wrong_type_data, wrong_schema)
    df_wrong.write.format("delta").mode("append").save("users_delta")
    print("❌ This shouldn't succeed!")
    
except Exception as e:
    print("✅ Schema enforcement worked! Error caught:")
    print(f"   {str(e)[:100]}...")

✅ Schema enforcement worked! Error caught:
   Path must be absolute: users_delta/_delta_log

JVM stacktrace:
java.lang.IllegalArgumentException
	a...


### Test 2: Try Adding Data with MISSING Column

In [0]:
# This will FAIL because we're missing the 'age' column
missing_column_data = [
    (4, "david")  # Missing age column
]

try:
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    
    incomplete_schema = StructType([
        StructField("user_id", IntegerType(), False),
        StructField("username", StringType(), False)
        # Missing age field!
    ])
    
    df_incomplete = spark.createDataFrame(missing_column_data, incomplete_schema)
    df_incomplete.write.format("delta").mode("append").save("users_delta")
    print("❌ This shouldn't succeed!")
    
except Exception as e:
    print("✅ Schema enforcement worked! Error caught:")
    print(f"   {str(e)[:100]}...")

✅ Schema enforcement worked! Error caught:
   Path must be absolute: users_delta/_delta_log

JVM stacktrace:
java.lang.IllegalArgumentException
	a...


### Test 3: Correctly Add Data (SHOULD WORK)

In [0]:
# This WILL WORK because schema matches
correct_data = [
    (3, "charlie", 28),
    (4, "david", 35)
]

df_correct = spark.createDataFrame(correct_data, schema)
df_correct.write.format("delta").mode("append").saveAsTable("users_delta")

print("✅ Data appended successfully!")

# Verify
df_final = spark.read.table("users_delta")
df_final.show()

✅ Data appended successfully!
+-------+--------+---+
|user_id|username|age|
+-------+--------+---+
|      3| charlie| 28|
|      4|   david| 35|
|      1|   alice| 25|
|      2|     bob| 30|
+-------+--------+---+



## 🛠️ TASK 4: Handle Duplicate Inserts

### Problem Setup

In [0]:
# Create orders table
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType

orders_schema = StructType([
    StructField("order_id", IntegerType(), False),
    StructField("customer_name", StringType(), False),
    StructField("order_date", DateType(), True),
    StructField("total_amount", DoubleType(), True)
])

from datetime import date
initial_orders = [
    (1, "Alice", date(2024, 1, 1), 100.00),
    (2, "Bob", date(2024, 1, 2), 150.00),
    (3, "Charlie", date(2024, 1, 3), 200.00)
]

df_orders = spark.createDataFrame(initial_orders, orders_schema)
df_orders.write.format("delta").mode("overwrite").saveAsTable("orders_delta")

print("Initial orders:")
df_orders.show()

Initial orders:
+--------+-------------+----------+------------+
|order_id|customer_name|order_date|total_amount|
+--------+-------------+----------+------------+
|       1|        Alice|2024-01-01|       100.0|
|       2|          Bob|2024-01-02|       150.0|
|       3|      Charlie|2024-01-03|       200.0|
+--------+-------------+----------+------------+



### Method 1: Using MERGE (Recommended)

In [0]:
from delta.tables import DeltaTable

# New orders (includes duplicate order_id = 2)
new_orders_data = [
    (2, "Bob", None, 150.00),    # Duplicate - should be ignored
    (4, "David", None, 250.00),  # New order - should be added
    (5, "Eve", None, 300.00)     # New order - should be added
]

df_new_orders = spark.createDataFrame(new_orders_data, orders_schema)

print("New orders to insert:")
df_new_orders.show()

# Load existing Delta table by name (Unity Catalog managed table)
delta_table = DeltaTable.forName(spark, "orders_delta")

# Merge: Insert only if order_id doesn't exist
delta_table.alias("existing").merge(
    df_new_orders.alias("new"),
    "existing.order_id = new.order_id"  # Match condition
).whenNotMatchedInsertAll().execute()  # Insert only non-matches

print("✅ Merge complete! Duplicates handled:")
spark.read.table("orders_delta").show()

New orders to insert:
+--------+-------------+----------+------------+
|order_id|customer_name|order_date|total_amount|
+--------+-------------+----------+------------+
|       2|          Bob|      NULL|       150.0|
|       4|        David|      NULL|       250.0|
|       5|          Eve|      NULL|       300.0|
+--------+-------------+----------+------------+

✅ Merge complete! Duplicates handled:
+--------+-------------+----------+------------+
|order_id|customer_name|order_date|total_amount|
+--------+-------------+----------+------------+
|       1|        Alice|2024-01-01|       100.0|
|       2|          Bob|2024-01-02|       150.0|
|       3|      Charlie|2024-01-03|       200.0|
|       4|        David|      NULL|       250.0|
|       5|          Eve|      NULL|       300.0|
+--------+-------------+----------+------------+



### Method 2: Using DataFrame Operations

In [0]:
# Register tables
# Minimal fix: use spark.read.table instead of .format('delta').load()
df_existing = spark.read.table("orders_delta")
df_existing.createOrReplaceTempView("existing_orders")

new_orders_data = [
    (5, "Eve", None, 300.00),      # Duplicate
    (8, "Henry", None, 450.00),    # New
    (9, "Ivy", None, 500.00)       # New
]

df_new = spark.createDataFrame(new_orders_data, orders_schema)
df_new.createOrReplaceTempView("new_orders")

# SQL MERGE statement
spark.sql("""
    MERGE INTO orders_delta AS existing
    USING new_orders AS new
    ON existing.order_id = new.order_id
    WHEN NOT MATCHED THEN INSERT *
""")

print("✅ SQL MERGE complete!")
spark.read.table("orders_delta").show()

✅ SQL MERGE complete!
+--------+-------------+----------+------------+
|order_id|customer_name|order_date|total_amount|
+--------+-------------+----------+------------+
|       1|        Alice|2024-01-01|       100.0|
|       2|          Bob|2024-01-02|       150.0|
|       3|      Charlie|2024-01-03|       200.0|
|       4|        David|      NULL|       250.0|
|       8|        Henry|      NULL|       450.0|
|       5|          Eve|      NULL|       300.0|
|       9|          Ivy|      NULL|       500.0|
+--------+-------------+----------+------------+

