# Workshop 2: Ingestion Pipeline - COPY INTO & Auto Loader

**Workshop Objectives:**
- Implement batch ingestion using COPY INTO
- Configure Auto Loader for streaming ingestion
- Handle various file formats (CSV, JSON, Parquet)
- Monitor and manage ingestion pipelines

**Duration:** 30 minutes

---

## Theoretical Introduction

**Section Objective:** Understanding data ingestion methods in Databricks Lakehouse

### COPY INTO - Batch Ingestion
- **Purpose**: Load data from external files into Delta tables
- **Idempotency**: Automatically tracks processed files, preventing duplicates
- **Use case**: Scheduled batch jobs, one-time data migrations
- **Supported formats**: CSV, JSON, Parquet, Avro, ORC, TEXT

### Auto Loader - Streaming Ingestion
- **Purpose**: Incrementally process new files as they arrive
- **cloudFiles format**: Uses `.format("cloudFiles")` for streaming read
- **Schema inference**: Automatically detects and evolves schema
- **Use case**: Near real-time processing, continuous data pipelines

### Key Differences

| Feature | COPY INTO | Auto Loader |
|---------|-----------|-------------|
| Processing | Batch | Streaming |
| File tracking | Built-in | Checkpoint-based |
| Schema evolution | Manual | Automatic |
| Scalability | Medium | High |
| Cost | Per execution | Per file |

---

## ðŸ“š Environment Initialization

In [0]:
%run ../00_setup

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

SCHEMA = "bronze"
CHECKPOINT_PATH = "/Volumes/ecommerce_platform_trainer/default/dataset/workshop"


spark.sql(f"USE SCHEMA {SCHEMA}")

# ÅšcieÅ¼ki do plikÃ³w danych (juÅ¼ zdefiniowane w 00_setup)
CUSTOMERS_CSV = f"{DATASET_BASE_PATH}/customers/customers.csv"
ORDERS_JSON = f"{DATASET_BASE_PATH}/orders/orders_batch.json"
PRODUCTS_PARQUET = f"{DATASET_BASE_PATH}/products/products.parquet"

## Part 1: COPY INTO - Batch Ingestion

### Task 1.1: CSV File Ingestion

**Instructions:**
1. Prepare target table `bronze_customers_batch`
2. Use `COPY INTO` to load data from `customers.csv`
3. Verify the number of loaded records

**Hints:**
- Use `BRONZE_PATH` variable for table location
- Use `SOURCE_DATA_PATH` for source file path
- Format options: `header`, `inferSchema` should be `true`

In [0]:
# TODO: Create target table
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG}.{SCHEMA}.bronze_customers_batch (
        customer_id STRING,
        first_name STRING,
        last_name STRING,
        email STRING,
        phone STRING,
        city STRING,
        state STRING,
        country STRING,
        registration_date DATE,
        customer_segment STRING,
        _ingestion_timestamp TIMESTAMP
    )
    USING DELTA
""")

In [0]:
# TODO: COPY INTO from CSV file
spark.sql(f"""
    ____ INTO {CATALOG}.{SCHEMA}.bronze_customers_batch
    FROM (
        SELECT 
            customer_id,
            first_name,
            last_name,
            email,
            phone,
            city,
            state,
            country,
            TO_DATE(registration_date) as registration_date,
            customer_segment,
            current_timestamp() as _ingestion_timestamp
        FROM '{____}'  -- Complete with CUSTOMERS_CSV variable
    )
    FILEFORMAT = ____  -- Complete format (CSV)
    FORMAT_OPTIONS (
        'header' = '____',      -- Does file have header?
        'inferSchema' = '____'  -- Infer schema?
    )
    COPY_OPTIONS (
        'mergeSchema' = '____'  -- Merge schema?
    )
""")

In [0]:
# Verification
spark.sql(f"""
 SELECT COUNT(*) as total_records 
 FROM {CATALOG}.{SCHEMA}.bronze_customers_batch
""").show()

### Task 1.2: JSON File Ingestion

**Instructions:**
1. Prepare table `bronze_orders_batch`
2. Use `COPY INTO` to load data from `orders_batch.json`
3. Handle nested JSON structure

**Hints:**
- Use `DELTA` as table format
- Use `BRONZE_PATH` for location
- FILEFORMAT should be `JSON`

In [0]:
# TODO: Create table for orders
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG}.{SCHEMA}.bronze_orders_batch (
        order_id STRING,
        customer_id STRING,
        product_id STRING,
        store_id STRING,
        order_datetime TIMESTAMP,
        quantity INT,
        unit_price DOUBLE,
        discount_percent DOUBLE,
        total_amount DOUBLE,
        payment_method STRING,
        _ingestion_timestamp TIMESTAMP
    )
    USING ____  -- Complete format (DELTA)
""")

In [0]:
# TODO: COPY INTO from JSON file
spark.sql(f"""
    COPY INTO {CATALOG}.{SCHEMA}.bronze_orders_batch
    FROM (
        SELECT 
            order_id,
            customer_id,
            product_id,
            store_id,
            TO_TIMESTAMP(order_datetime) as order_datetime,
            quantity,
            unit_price,
            discount_percent,
            total_amount,
            payment_method,
            current_timestamp() as _ingestion_timestamp
        FROM '{____}'  -- Complete with ORDERS_JSON variable
    )
    FILEFORMAT = ____  -- Complete format (JSON)
""")

In [0]:
# Verification
spark.sql(f"""
 SELECT * FROM {CATALOG}.{SCHEMA}.bronze_orders_batch LIMIT 10
""").show()

### Task 1.3: Parquet File Ingestion

**Instructions:**
1. Prepare table `bronze_products_batch`
2. Use `COPY INTO` to load data from `products.parquet`
3. Add column with source file metadata

**Hints:**
- Use `_metadata.file_path` to get source file path
- FILEFORMAT should be `PARQUET`
- Source file: `products.parquet`

In [0]:
# TODO: Create table for products
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG}.{SCHEMA}.bronze_products_batch (
        product_id STRING,
        product_name STRING,
        subcategory_code STRING,
        brand STRING,
        unit_cost DOUBLE,
        list_price DOUBLE,
        weight_kg DOUBLE,
        status STRING,
        _source_file STRING,
        _ingestion_timestamp TIMESTAMP
    )
    USING DELTA
""")

In [0]:
# TODO: COPY INTO from Parquet file
spark.sql(f"""
    COPY INTO {CATALOG}.{SCHEMA}.bronze_products_batch
    FROM (
        SELECT 
            product_id,
            product_name,
            subcategory_code,
            brand,
            unit_cost,
            list_price,
            weight_kg,
            status,
            ____ as _source_file,  -- Use _metadata.file_path
            current_timestamp() as _ingestion_timestamp
        FROM '{____}'  -- Complete with PRODUCTS_PARQUET variable
    )
    FILEFORMAT = ____  -- Complete format (PARQUET)
""")

In [0]:
# Verification
spark.sql(f"""
    SELECT product_id, product_name, brand, list_price, _source_file 
    FROM {CATALOG}.{SCHEMA}.bronze_products_batch 
    LIMIT 10
""").show(truncate=False)

### Task 1.4: Idempotency - Re-running COPY INTO

**Instructions:**
1. Re-run `COPY INTO` for the same table
2. Verify that data was not duplicated
3. Check `COPY INTO` operation history

In [0]:
# Check record count before re-running COPY INTO
before_count = spark.sql(f"""
 SELECT COUNT(*) as count 
 FROM {CATALOG}.{SCHEMA}.bronze_customers_batch
""").collect()[0]["count"]

print(f"Record count before re-running COPY INTO: {before_count}")

In [0]:
# TODO: Re-execute COPY INTO
spark.sql(f"""
    COPY INTO {CATALOG}.{SCHEMA}.bronze_customers_batch
    FROM (
        SELECT 
            customer_id,
            first_name,
            last_name,
            email,
            phone,
            city,
            state,
            country,
            TO_DATE(registration_date) as registration_date,
            customer_segment,
            current_timestamp() as _ingestion_timestamp
        FROM '{CUSTOMERS_CSV}'
    )
    FILEFORMAT = CSV
    FORMAT_OPTIONS ('header' = 'true', 'inferSchema' = 'true')
""")

In [0]:
# Idempotency verification
after_count = spark.sql(f"""
 SELECT COUNT(*) as count 
 FROM {CATALOG}.{SCHEMA}.bronze_customers_batch
""").collect()[0]["count"]

print(f"Record count after re-running COPY INTO: {after_count}")
print(f"Was data duplicated? {before_count != after_count}")

---

## Part 2: Auto Loader - Streaming Ingestion

### Task 2.1: Configuring Auto Loader for CSV

**Instructions:**
1. Prepare checkpoint location
2. Use `.format("cloudFiles")` to create streaming read
3. Configure schema inference and evolution
4. Write stream to `bronze_customers_stream` table

**Hints:**
- Format: `cloudFiles`
- cloudFiles.format: `csv`
- Schema location: use `CHECKPOINT_PATH` + subfolder name
- Use `current_timestamp()` and `input_file_name()` for metadata
- Output mode: `append`, mergeSchema: `true`

In [0]:
# TODO: Streaming read with Auto Loader
customers_stream = (
    spark.readStream
    .format("____")  # Complete format (cloudFiles)
    .option("cloudFiles.format", "____")  # Source file format (csv)
    .option("cloudFiles.schemaLocation", f"{CHECKPOINT_PATH}/____")  # Schema checkpoint
    .option("header", "true")
    .load(CUSTOMERS_CSV)
)

In [0]:
# TODO: Add metadata columns
from pyspark.sql.functions import current_timestamp, input_file_name

customers_enriched = (
 customers_stream
 .withColumn("_ingestion_timestamp", ____) # Add timestamp
 .withColumn("_source_file", ____) # Add source file name
)

In [0]:
# TODO: Write stream to Delta table
query_customers = (
 customers_enriched.writeStream
 .format("____") # Complete format
 .outputMode("____") # Write mode (append)
 .option("checkpointLocation", f"{____}/customers_stream") # Checkpoint
 .option("mergeSchema", "____") # Schema evolution
 .table(f"{CATALOG}.{SCHEMA}.bronze_customers_stream")
)

In [0]:
# Stream verification
import time
time.sleep(10) # Wait for processing

spark.sql(f"""
 SELECT COUNT(*) as total_records 
 FROM {CATALOG}.{SCHEMA}.bronze_customers_stream
""").show()

### Task 2.2: Auto Loader for JSON with Schema Hints

**Instructions:**
1. Use Auto Loader to read `orders_batch.json`
2. Add schema hints for columns with specific types
3. Configure rescue data column for invalid records

**Hints:**
- cloudFiles.format: `json`
- schemaHints example: `"order_date DATE, total_amount DOUBLE"`
- rescuedDataColumn: `_rescued_data`
- Checkpoint subfolder: `orders_stream`

In [0]:
# TODO: Auto Loader with schema hints
orders_stream = (
 spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "____") # JSON format
 .option("cloudFiles.schemaLocation", f"{CHECKPOINT_PATH}/orders_schema")
 .option("cloudFiles.schemaHints", "____") # Hint: "order_date DATE, total_amount DOUBLE"
 .option("cloudFiles.rescuedDataColumn", "____") # Column for rescue data
 .load(f"{____}/orders_batch.json") # Complete path
)

In [0]:
# TODO: Write stream
query_orders = (
 orders_stream.writeStream
 .format("delta")
 .outputMode("____") # Append mode
 .option("checkpointLocation", f"{CHECKPOINT_PATH}/____") # Checkpoint
 .table(f"{CATALOG}.{SCHEMA}.bronze_orders_stream")
)

In [0]:
# Verification
time.sleep(10)
spark.sql(f"""
 SELECT * FROM {CATALOG}.{SCHEMA}.bronze_orders_stream LIMIT 10
""").show()

### Task 2.3: Monitoring Streaming Queries

**Instructions:**
1. Display active streaming queries
2. Check status and last progress of each query
3. Retrieve metrics: number of processed records, batch duration

**Hints:**
- Use `spark.streams.active` to get active streams
- Use `stream.lastProgress` to get last progress metrics
- Progress contains: `batchId`, `numInputRows`, `batchDuration`

In [0]:
# TODO: Display active streams
active_streams = spark.streams.____ # Complete method (active)

print(f"Number of active streams: {len(active_streams)}")
for stream in active_streams:
 print(f"\nStream ID: {stream.id}")
 print(f"Name: {stream.name}")
 print(f"Status: {stream.status}")

In [0]:
# TODO: Check last progress
if len(active_streams) > 0:
 last_progress = active_streams[0].____ # Complete method (lastProgress)
 
 if last_progress:
 print(f"Batch ID: {last_progress['batchId']}")
 print(f"Processed records: {last_progress['numInputRows']}")
 print(f"Processing time: {last_progress['batchDuration']} ms")

### Task 2.4: Stopping Streaming Queries

**Instructions:**
1. Stop all active streaming queries
2. Verify that all streams are stopped

**Hints:**
- Use `stream.stop()` method to stop a stream
- Iterate over `spark.streams.active` to stop all

In [0]:
# TODO: Stop all streams
for stream in spark.streams.active:
 print(f"Stopping stream: {stream.name}")
 stream.____() # Complete method (stop)

print("\nAll streams stopped!")

In [0]:
# Verification
print(f"Number of active streams: {len(spark.streams.active)}")

---

## Part 3: COPY INTO vs Auto Loader Comparison

### Task 3.1: Performance Analysis

**Instructions:**
1. Compare record counts loaded by COPY INTO vs Auto Loader
2. Check operation history for both methods
3. Identify use cases for each method

**Hints:**
- Use `DESCRIBE HISTORY` to check table history
- Compare `version`, `operation`, `operationMetrics` columns

In [0]:
# Record count comparison
copy_into_count = spark.sql(f"""
 SELECT 'COPY INTO' as method, COUNT(*) as records 
 FROM {CATALOG}.{SCHEMA}.bronze_customers_batch
""")

auto_loader_count = spark.sql(f"""
 SELECT 'Auto Loader' as method, COUNT(*) as records 
 FROM {CATALOG}.{SCHEMA}.bronze_customers_stream
""")

copy_into_count.union(auto_loader_count).show()

In [0]:
# TODO: COPY INTO operation history
spark.sql(f"""
 ____ HISTORY {CATALOG}.{SCHEMA}.bronze_customers_batch
""").select("version", "operation", "operationMetrics").show(truncate=False)

In [0]:
# TODO: Auto Loader operation history
spark.sql(f"""
 DESCRIBE HISTORY {CATALOG}.{SCHEMA}.bronze_customers_stream
""").select("version", "operation", "operationMetrics").show(truncate=False)

---

## Workshop Summary

**Achieved Objectives:**
- Batch ingestion implementation with COPY INTO
- Auto Loader configuration for streaming ingestion
- Handling different formats (CSV, JSON, Parquet)
- Pipeline monitoring and management

**When to Use COPY INTO:**
- Batch processing with defined schedule
- Known and stable data structure
- Need for control over loading process
- Out-of-the-box idempotency

**When to Use Auto Loader:**
- Near real-time processing
- Schema evolution and automatic inference
- Continuous monitoring for new files
- Scalability and cost efficiency

---

## Solutions

Below are the complete solutions for all workshop tasks. Use them to verify your work or if you get stuck.

In [0]:
# =============================================================================
# SOLUTIONS - Part 1: COPY INTO - Batch Ingestion
# =============================================================================

# Task 1.1: CSV File Ingestion
# -----------------------------------------------------------------------------
# Create target table
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG}.{SCHEMA}.bronze_customers_batch (
        customer_id STRING,
        first_name STRING,
        last_name STRING,
        email STRING,
        phone STRING,
        city STRING,
        state STRING,
        country STRING,
        registration_date DATE,
        customer_segment STRING,
        _ingestion_timestamp TIMESTAMP
    )
    USING DELTA
""")

# COPY INTO from CSV file
spark.sql(f"""
    COPY INTO {CATALOG}.{SCHEMA}.bronze_customers_batch
    FROM (
        SELECT 
            customer_id,
            first_name,
            last_name,
            email,
            phone,
            city,
            state,
            country,
            TO_DATE(registration_date) as registration_date,
            customer_segment,
            current_timestamp() as _ingestion_timestamp
        FROM '{CUSTOMERS_CSV}'
    )
    FILEFORMAT = CSV
    FORMAT_OPTIONS (
        'header' = 'true',
        'inferSchema' = 'true'
    )
    COPY_OPTIONS (
        'mergeSchema' = 'true'
    )
""")

# Task 1.2: JSON File Ingestion
# -----------------------------------------------------------------------------
# Create table for orders
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG}.{SCHEMA}.bronze_orders_batch (
        order_id STRING,
        customer_id STRING,
        product_id STRING,
        store_id STRING,
        order_datetime TIMESTAMP,
        quantity INT,
        unit_price DOUBLE,
        discount_percent DOUBLE,
        total_amount DOUBLE,
        payment_method STRING,
        _ingestion_timestamp TIMESTAMP
    )
    USING DELTA
""")

# COPY INTO from JSON file
spark.sql(f"""
    COPY INTO {CATALOG}.{SCHEMA}.bronze_orders_batch
    FROM (
        SELECT 
            order_id,
            customer_id,
            product_id,
            store_id,
            TO_TIMESTAMP(order_datetime) as order_datetime,
            quantity,
            unit_price,
            discount_percent,
            total_amount,
            payment_method,
            current_timestamp() as _ingestion_timestamp
        FROM '{ORDERS_JSON}'
    )
    FILEFORMAT = JSON
""")

# Task 1.3: Parquet File Ingestion
# -----------------------------------------------------------------------------
# Create table for products
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG}.{SCHEMA}.bronze_products_batch (
        product_id STRING,
        product_name STRING,
        subcategory_code STRING,
        brand STRING,
        unit_cost DOUBLE,
        list_price DOUBLE,
        weight_kg DOUBLE,
        status STRING,
        _source_file STRING,
        _ingestion_timestamp TIMESTAMP
    )
    USING DELTA
""")

# COPY INTO from Parquet file
spark.sql(f"""
    COPY INTO {CATALOG}.{SCHEMA}.bronze_products_batch
    FROM (
        SELECT 
            product_id,
            product_name,
            subcategory_code,
            brand,
            unit_cost,
            list_price,
            weight_kg,
            status,
            _metadata.file_path as _source_file,
            current_timestamp() as _ingestion_timestamp
        FROM '{PRODUCTS_PARQUET}'
    )
    FILEFORMAT = PARQUET
""")

# =============================================================================
# SOLUTIONS - Part 2: Auto Loader - Streaming Ingestion
# =============================================================================

# Task 2.1: Configuring Auto Loader for CSV
# -----------------------------------------------------------------------------
from pyspark.sql.functions import current_timestamp, input_file_name

# Streaming read with Auto Loader
customers_stream = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", f"{CHECKPOINT_PATH}/customers_schema")
    .option("header", "true")
    .load(CUSTOMERS_CSV)
)

# Add metadata columns
customers_enriched = (
    customers_stream
    .withColumn("_ingestion_timestamp", current_timestamp())
    .withColumn("_source_file", input_file_name())
)

# Write stream to Delta table
query_customers = (
    customers_enriched.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", f"{CHECKPOINT_PATH}/customers_stream")
    .option("mergeSchema", "true")
    .table(f"{CATALOG}.{SCHEMA}.bronze_customers_stream")
)

# Task 2.2: Auto Loader for JSON with Schema Hints
# -----------------------------------------------------------------------------
orders_stream = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", f"{CHECKPOINT_PATH}/orders_schema")
    .option("cloudFiles.schemaHints", "order_datetime TIMESTAMP, total_amount DOUBLE")
    .option("cloudFiles.rescuedDataColumn", "_rescued_data")
    .load(ORDERS_JSON)
)

# Write stream
query_orders = (
    orders_stream.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", f"{CHECKPOINT_PATH}/orders_stream")
    .table(f"{CATALOG}.{SCHEMA}.bronze_orders_stream")
)

# Task 2.3: Monitoring Streaming Queries
# -----------------------------------------------------------------------------
# Display active streams
active_streams = spark.streams.active

print(f"Number of active streams: {len(active_streams)}")
for stream in active_streams:
    print(f"\nStream ID: {stream.id}")
    print(f"Name: {stream.name}")
    print(f"Status: {stream.status}")

# Check last progress
if len(active_streams) > 0:
    last_progress = active_streams[0].lastProgress
    
    if last_progress:
        print(f"Batch ID: {last_progress['batchId']}")
        print(f"Processed records: {last_progress['numInputRows']}")
        print(f"Processing time: {last_progress['batchDuration']} ms")

# Task 2.4: Stopping Streaming Queries
# -----------------------------------------------------------------------------
for stream in spark.streams.active:
    print(f"Stopping stream: {stream.name}")
    stream.stop()

print("\nAll streams stopped!")

# =============================================================================
# SOLUTIONS - Part 3: COPY INTO vs Auto Loader Comparison
# =============================================================================

# Task 3.1: COPY INTO operation history
spark.sql(f"""
    DESCRIBE HISTORY {CATALOG}.{SCHEMA}.bronze_customers_batch
""").select("version", "operation", "operationMetrics").show(truncate=False)

---

## Resource Cleanup (optional)

In [0]:
# WARNING: Run only if you want to delete all created tables

# Uncomment the lines below to delete tables:
# spark.sql(f"DROP TABLE IF EXISTS {CATALOG}.{SCHEMA}.bronze_customers_batch")
# spark.sql(f"DROP TABLE IF EXISTS {CATALOG}.{SCHEMA}.bronze_orders_batch")
# spark.sql(f"DROP TABLE IF EXISTS {CATALOG}.{SCHEMA}.bronze_products_batch")
# spark.sql(f"DROP TABLE IF EXISTS {CATALOG}.{SCHEMA}.bronze_customers_stream")
# spark.sql(f"DROP TABLE IF EXISTS {CATALOG}.{SCHEMA}.bronze_orders_stream")

# Clean up checkpoints
# dbutils.fs.rm(CHECKPOINT_PATH, recurse=True)

print("Resource cleanup is commented out. Uncomment to delete tables.")