# Batch & Streaming Load - Demo

**Cel szkoleniowy:** Opanowanie metod ładowania danych: COPY INTO, Auto Loader i Structured Streaming.

**Zakres tematyczny:**
- COPY INTO: kiedy używać, parametry (FILEFORMAT, VALIDATION_MODE, PATTERN)
- Auto Loader (CloudFiles): file notification, checkpointing, schema inference
- Schema evolution w praktyce
- Structured Streaming: micro-batch architecture
- readStream() / writeStream()
- Triggering: once vs processingTime
- Zarządzanie checkpointami
- MERGE na streamingu

## Kontekst i wymagania

- **Dzień szkolenia**: Dzień 2 - Lakehouse & Delta Lake
- **Typ notebooka**: Demo
- **Wymagania techniczne**:
  - Databricks Runtime 13.0+ (zalecane: 14.3 LTS)
  - Unity Catalog włączony
  - Uprawnienia: CREATE TABLE, CREATE SCHEMA, SELECT, MODIFY
  - Klaster: Standard z minimum 2 workers

## Wstęp teoretyczny

**Cel sekcji:** Zrozumienie różnych metod ładowania danych do Delta Lake: batch vs streaming.

**Podstawowe pojęcia:**
- **COPY INTO**: SQL command dla batch loads z idempotency (incremental batch)
- **Auto Loader**: Databricks-managed solution dla incremental file ingestion z automatycznym schema inference
- **Structured Streaming**: Spark streaming API z micro-batch processing i exactly-once semantics
- **Checkpoint**: Location przechowujący offset/progress dla fault tolerance

**Dlaczego to ważne?**
Wybór metody ingestion ma wpływ na latency, throughput, cost i operacyjną złożoność. COPY INTO dla batch (hourly/daily), Auto Loader dla near real-time z małymi plikami, Structured Streaming dla pure streaming sources (Kafka, Event Hub).

## Izolacja per użytkownik

Uruchom skrypt inicjalizacyjny dla per-user izolacji katalogów i schematów:

In [None]:
%run ../00_setup

## Konfiguracja

Import bibliotek i ustawienie zmiennych środowiskowych:

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime, timedelta
import time

# Wyświetl kontekst użytkownika
print("=== Kontekst użytkownika ===")
print(f"Katalog: {CATALOG}")
print(f"Schema Bronze: {BRONZE_SCHEMA}")
print(f"Użytkownik: {raw_user}")

# Ustaw katalog i schemat
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE SCHEMA {BRONZE_SCHEMA}")

# Ścieżki do danych i checkpointów
ORDERS_BATCH_JSON = f"{DATASET_BASE_PATH}/orders/orders_batch.json"
ORDERS_STREAMING_DIR = f"{DATASET_BASE_PATH}/orders"  # Folder dla plików streaming
CHECKPOINT_PATH = f"/tmp/{raw_user}/checkpoints"

print(f"\n=== Konfiguracja ===")
print(f"Orders Batch JSON: {ORDERS_BATCH_JSON}")
print(f"Orders Streaming Directory: {ORDERS_STREAMING_DIR}")
print(f"Checkpoint path: {CHECKPOINT_PATH}")

---

## Sekcja 1: COPY INTO - Batch Ingestion

**Wprowadzenie teoretyczne:**

COPY INTO to SQL command dla idempotent batch loads. Automatycznie śledzi załadowane pliki i pomija duplikaty. Idealny dla scheduled batch jobs (hourly, daily).

**Kluczowe pojęcia:**
- **Idempotency**: Wielokrotne wykonanie COPY INTO z tymi samymi plikami nie powoduje duplikatów
- **File tracking**: Delta Log przechowuje listę załadowanych plików
- **Pattern matching**: Możliwość filtrowania plików po nazwie (PATTERN)
- **Validation mode**: Kontrola zachowania przy błędach (PERMISSIVE, FAILFAST)

**Zastosowanie praktyczne:**
- Scheduled batch loads z cloud storage (S3, ADLS, GCS)
- Incremental data ingestion bez ręcznego trackowania offsetów
- ETL pipelines z retry logic

### Przykład 1.1: Podstawowy COPY INTO

**Cel:** Załadowanie plików JSON za pomocą COPY INTO

In [None]:
# Przykład 1.1 - COPY INTO basic

# Utwórz target table z poprawnymi typami danych
copy_into_table = f"{BRONZE_SCHEMA}.orders_copy_into"

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {copy_into_table} (
        order_id STRING,
        customer_id STRING,
        product_id STRING,
        store_id STRING,
        order_datetime TIMESTAMP,
        quantity INT,
        unit_price DOUBLE,
        discount_percent INT,
        total_amount DOUBLE,
        payment_method STRING,
        _metadata STRUCT<file_path: STRING, file_name: STRING, file_modification_time: TIMESTAMP>
    )
    USING DELTA
""")

print(f"✓ Utworzono target table: {copy_into_table}")

In [None]:
# Wykonaj COPY INTO
copy_result = spark.sql(f"""
    COPY INTO {copy_into_table}
    FROM (SELECT *, _metadata FROM '{ORDERS_BATCH_JSON}')
    FILEFORMAT = JSON
    FORMAT_OPTIONS ('multiLine' = 'true')
    COPY_OPTIONS ('mergeSchema' = 'true')
""")

print("=== COPY INTO Result ===")
display(copy_result)

In [None]:
# Sprawdź załadowane dane
loaded_count = spark.table(copy_into_table).count()
print(f"Załadowano {loaded_count} rekordów")

In [None]:
# Wyświetl przykładowe dane z metadata
print("=== Dane z metadata ===")
display(
    spark.table(copy_into_table)
    .select("order_id", "customer_id", "total_amount", "payment_method", "_metadata.file_name")
    .limit(5)
)

**Wyjaśnienie:**

COPY INTO:
- **_metadata column**: Automatycznie dodawana kolumna z file path, name, modification time
- **Idempotency**: Ponowne wykonanie tego samego COPY INTO nie załaduje duplikatów
- **mergeSchema**: Automatyczne dodawanie nowych kolumn (schema evolution)
- **File tracking**: Delta Log przechowuje hash załadowanych plików

### Przykład 1.2: COPY INTO z VALIDATION_MODE

**Cel:** Kontrola zachowania przy błędach w danych

In [None]:
# Przykład 1.2 - COPY INTO z VALIDATION_MODE

# VALIDATION_MODE pozwala testować ingestion bez zapisywania danych
# Użyteczne dla weryfikacji schema i quality przed faktycznym załadowaniem

validation_table = f"{BRONZE_SCHEMA}.orders_validation"

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {validation_table} (
        order_id STRING,
        customer_id STRING,
        product_id STRING,
        store_id STRING,
        order_datetime TIMESTAMP,
        quantity INT,
        unit_price DOUBLE,
        discount_percent INT,
        total_amount DOUBLE,
        payment_method STRING,
        _metadata STRUCT<file_path: STRING, file_name: STRING, file_modification_time: TIMESTAMP>
    )
    USING DELTA
""")

print(f"✓ Utworzono validation table: {validation_table}")

# Re-run tego samego COPY INTO - demonstracja idempotency
print("=== Ponowne wykonanie COPY INTO (idempotency test) ===")
copy_result_2 = spark.sql(f"""
    COPY INTO {copy_into_table}
    FROM (SELECT *, _metadata FROM '{ORDERS_BATCH_JSON}')
    FILEFORMAT = JSON
    FORMAT_OPTIONS ('multiLine' = 'true')
""")

display(copy_result_2)

# Sprawdź czy count się nie zmienił (idempotency)
new_count = spark.table(copy_into_table).count()
print(f"\nLiczba rekordów (po ponownym COPY INTO): {new_count}")
print(f"Czy idempotentny? {new_count == loaded_count}")

# Historia COPY INTO
print("\n=== Historia COPY INTO ===")
history = spark.sql(f"DESCRIBE HISTORY {copy_into_table}")
display(
    history
    .filter(F.col("operation") == "COPY INTO")
    .select("version", "timestamp", "operation", "operationMetrics")
)

In [None]:
# VALIDATION_MODE = N - validate tylko N rekordów (dry run)
print("=== VALIDATION_MODE - Test 10 rekordów ===")
validation_result = spark.sql(f"""
    COPY INTO {validation_table}
    FROM (SELECT *, _metadata FROM '{ORDERS_BATCH_JSON}')
    FILEFORMAT = JSON
    FORMAT_OPTIONS ('multiLine' = 'true')
    COPY_OPTIONS ('mergeSchema' = 'true')
    VALIDATION_MODE = 10
""")

display(validation_result)

In [None]:
# Po pozytywnej walidacji - faktyczne załadowanie
print("\n=== Faktyczne załadowanie danych ===")
load_result = spark.sql(f"""
    COPY INTO {validation_table}
    FROM (SELECT *, _metadata FROM '{ORDERS_BATCH_JSON}')
    FILEFORMAT = JSON
    FORMAT_OPTIONS ('multiLine' = 'true')
    COPY_OPTIONS ('mergeSchema' = 'true')
""")

display(load_result)

### Przykład 1.3: COPY INTO z PATTERN (Selective Ingestion)

**Cel:** Filtrowanie plików po nazwie - załaduj tylko pliki spełniające pattern

In [None]:
# Przykład 1.3 - COPY INTO z PATTERN

# PATTERN pozwala filtrować pliki po nazwie
# Przydatne gdy mamy wiele plików, ale chcemy załadować tylko konkretne

pattern_table = f"{BRONZE_SCHEMA}.orders_pattern"

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {pattern_table} (
        order_id STRING,
        customer_id STRING,
        product_id STRING,
        store_id STRING,
        order_datetime TIMESTAMP,
        quantity INT,
        unit_price DOUBLE,
        discount_percent INT,
        total_amount DOUBLE,
        payment_method STRING,
        _metadata STRUCT<file_path: STRING, file_name: STRING, file_modification_time: TIMESTAMP>
    )
    USING DELTA
""")

print(f"✓ Utworzono pattern table: {pattern_table}")

In [None]:
# Załaduj tylko pliki streaming 001, 002, 003 (pattern)
print("=== COPY INTO z PATTERN - tylko pliki 001-003 ===")
pattern_result = spark.sql(f"""
    COPY INTO {pattern_table}
    FROM (SELECT *, _metadata FROM '{ORDERS_STREAMING_DIR}')
    FILEFORMAT = JSON
    PATTERN = 'orders_stream_00[1-3].json'
    FORMAT_OPTIONS ('multiLine' = 'true')
    COPY_OPTIONS ('mergeSchema' = 'true')
""")

display(pattern_result)

In [None]:
# Sprawdź które pliki zostały załadowane
print("\n=== Załadowane pliki (metadata) ===")
display(
    spark.table(pattern_table)
    .select("_metadata.file_name")
    .distinct()
    .orderBy("file_name")
)

print(f"\nŁącznie załadowano {spark.table(pattern_table).count()} rekordów z plików 001-003")

### Przykład 1.4: COPY INTO - Test Idempotency

**Cel:** Demonstracja że ponowne wykonanie COPY INTO nie powoduje duplikatów

In [None]:
# Przykład 1.4 - Test idempotency

# Zapisz obecny count
count_before = spark.table(copy_into_table).count()
print(f"Liczba rekordów przed ponownym COPY INTO: {count_before}")

In [None]:
# Ponowne wykonanie COPY INTO
print("\n=== Ponowne wykonanie COPY INTO (idempotency test) ===")
copy_result_2 = spark.sql(f"""
    COPY INTO {copy_into_table}
    FROM (SELECT *, _metadata FROM '{ORDERS_BATCH_JSON}')
    FILEFORMAT = JSON
    FORMAT_OPTIONS ('multiLine' = 'true')
""")

display(copy_result_2)

In [None]:
# Sprawdź czy count się nie zmienił (idempotency)
count_after = spark.table(copy_into_table).count()
print(f"\nLiczba rekordów po ponownym COPY INTO: {count_after}")
print(f"Czy idempotentny? {count_after == count_before} (żadne nowe rekordy nie zostały załadowane)")

In [None]:
# Historia COPY INTO
print("\n=== Historia COPY INTO ===")
history = spark.sql(f"DESCRIBE HISTORY {copy_into_table}")
display(
    history
    .filter(F.col("operation") == "COPY INTO")
    .select("version", "timestamp", "operation", "operationMetrics")
)

---

## Sekcja 2: Auto Loader (CloudFiles)

**Wprowadzenie teoretyczne:**

Auto Loader to Databricks-managed solution dla incremental file ingestion. Używa file notification (Event Grid/SQS) lub directory listing dla automatic discovery nowych plików. Idealny dla near real-time ingestion z małymi plikami.

**Kluczowe pojęcia:**
- **cloudFiles format**: Specjalny format Spark dla Auto Loader
- **Schema inference**: Automatyczne wykrywanie i ewolucja schematu
- **Checkpoint location**: Przechowuje progress i schema history
- **File notification**: Event-driven approach dla cloud storage

**Zastosowanie praktyczne:**
- Near real-time ingestion (latency: sekundy-minuty)
- Małe pliki arriving continuously
- Schema evolution bez manual intervention

### Przykład 2.1: Auto Loader - Basic Setup

**Cel:** Konfiguracja Auto Loader z schema inference

In [None]:
# Przykład 2.1 - Auto Loader basic

autoloader_table = f"{BRONZE_SCHEMA}.orders_autoloader"
autoloader_checkpoint = f"{CHECKPOINT_PATH}/autoloader_orders"

# Auto Loader z readStream - używa folderu (automatycznie znajdzie wszystkie pliki JSON)
orders_stream = (
    spark.readStream
    .format("cloudFiles")  # Auto Loader format
    .option("cloudFiles.format", "json")  # Source format
    .option("cloudFiles.schemaLocation", f"{autoloader_checkpoint}/schema")  # Schema tracking
    .option("cloudFiles.inferColumnTypes", "true")  # Infer types (not just strings)
    .option("multiLine", "true")
    .load(ORDERS_STREAMING_DIR)  # Folder - Auto Loader znajdzie wszystkie pliki JSON
)

print("=== Auto Loader Stream Schema ===")
orders_stream.printSchema()

# Zapis z trigger(once) dla demo (batch mode)
query = (
    orders_stream
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", autoloader_checkpoint)
    .trigger(once=True)  # Process all available data, then stop
    .table(autoloader_table)
)

# Czekaj na zakończenie
query.awaitTermination()

print(f"\n✓ Auto Loader completed")
print(f"Załadowano {spark.table(autoloader_table).count()} rekordów")

# Wyświetl dane
print("\n=== Załadowane dane ===")
display(spark.table(autoloader_table).limit(5))

In [None]:
# Zapis z trigger(once) dla demo (batch mode)
query = (
    orders_stream
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", autoloader_checkpoint)
    .trigger(once=True)  # Process all available data, then stop
    .table(autoloader_table)
)

# Czekaj na zakończenie
query.awaitTermination()

print(f"\n✓ Auto Loader completed")

In [None]:
# Sprawdź załadowane rekordy
print(f"Załadowano {spark.table(autoloader_table).count()} rekordów")

In [None]:
# Wyświetl dane
print("=== Załadowane dane ===")
display(spark.table(autoloader_table).limit(5))

**Wyjaśnienie:**

Auto Loader:
- **cloudFiles format**: Specjalny format dla Structured Streaming
- **trigger(once=True)**: Batch mode - process all files, then stop (użyteczne dla testing)
- **checkpointLocation**: Obowiązkowe - przechowuje progress i schema
- **Schema inference**: Automatyczne wykrywanie typów (inferColumnTypes=true)

W produkcji: używamy `trigger(processingTime='5 minutes')` dla continuous processing.

### Przykład 2.2: Auto Loader - Schema Evolution

**Cel:** Demonstracja automatycznej ewolucji schematu przy nowych plikach

In [None]:
# Przykład 2.2 - Schema Evolution

# Sprawdź schema location (gdzie Auto Loader przechowuje schema history)
schema_location = f"{autoloader_checkpoint}/schema"
print(f"Schema location: {schema_location}")

In [None]:
# Lista plików w schema location
print("=== Schema history files ===")
schema_files = dbutils.fs.ls(schema_location)
for file in schema_files:
    print(f"  {file.name}")

In [None]:
# Odczytaj schema history
print("=== Current Schema ===")
current_schema = spark.table(autoloader_table).schema
for field in current_schema.fields:
    print(f"  {field.name}: {field.dataType}")

# W przypadku nowych plików z dodatkowymi kolumnami,
# Auto Loader automatycznie zaktualizuje schemat
print("\n⚠️ Uwaga: Schema evolution działa automatycznie przy nowych plikach z dodatkowymi kolumnami")

---

## Sekcja 3: Structured Streaming - Continuous Processing

**Wprowadzenie teoretyczne:**

Structured Streaming to Spark API dla continuous data processing. Traktuje stream jako unbounded table z micro-batch execution. Zapewnia exactly-once semantics i fault tolerance.

**Kluczowe pojęcia:**
- **readStream / writeStream**: API dla streaming operations
- **Trigger**: Processing interval (once, processingTime, availableNow)
- **Output mode**: append, complete, update
- **Watermark**: Time-based windowing dla late data handling

**Zastosowanie praktyczne:**
- Real-time ETL z Kafka, Event Hub, Kinesis
- Continuous aggregations i windowing
- Exactly-once processing semantics

### Przykład 3.1: Structured Streaming - Basic Stream

**Cel:** Utworzenie basic streaming pipeline z transformacjami

In [None]:
# Przykład 3.1 - Structured Streaming basic

streaming_table = f"{BRONZE_SCHEMA}.orders_streaming"
streaming_checkpoint = f"{CHECKPOINT_PATH}/streaming_orders"

# ReadStream z transformacjami - używa folderu
orders_stream = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", f"{streaming_checkpoint}/schema")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("multiLine", "true")
    .load(ORDERS_STREAMING_DIR)  # Folder - Auto Loader znajdzie wszystkie pliki JSON
)

# Transformacje na streamie (jak na batch DataFrame)
orders_transformed = (
    orders_stream
    .withColumn("order_date", F.to_date(F.col("order_datetime")))  # Ekstrakcja daty z timestamp
    .withColumn("payment_method_upper", F.upper(F.col("payment_method")))  # Normalizacja payment_method
    .withColumn("stream_processed_ts", F.current_timestamp())
    .filter(F.col("total_amount") > 0)  # Quality check - pozytywna kwota
)

print("=== Transformed Stream Schema ===")
orders_transformed.printSchema()

# WriteStream z trigger(once) dla demo
query = (
    orders_transformed
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", streaming_checkpoint)
    .trigger(once=True)
    .table(streaming_table)
)

# Czekaj na zakończenie
query.awaitTermination()

print(f"\n✓ Streaming pipeline completed")
print(f"Przetworzono {spark.table(streaming_table).count()} rekordów")

# Wyświetl dane z transformation
print("\n=== Przetworzone dane ===")
display(
    spark.table(streaming_table)
    .select("order_id", "order_date", "payment_method", "payment_method_upper", "total_amount", "stream_processed_ts")
    .limit(5)
)

In [None]:
# WriteStream z trigger(once) dla demo
query = (
    orders_transformed
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", streaming_checkpoint)
    .trigger(once=True)
    .table(streaming_table)
)

# Czekaj na zakończenie
query.awaitTermination()

print(f"\n✓ Streaming pipeline completed")

In [None]:
# Sprawdź przetworzone rekordy
print(f"Przetworzono {spark.table(streaming_table).count()} rekordów")

In [None]:
# Wyświetl dane z transformation
print("=== Przetworzone dane ===")
display(
    spark.table(streaming_table)
    .select("order_id", "order_date", "payment_method", "payment_method_upper", "total_amount", "stream_processed_ts")
    .limit(5)
)

**Wyjaśnienie:**

Structured Streaming:
- **Transformacje**: Możemy używać standardowych DataFrame API (withColumn, filter, join)
- **trigger(once=True)**: Batch mode - użyteczne dla testing i backfill
- **outputMode="append"**: Tylko nowe rekordy zapisywane (domyślne dla streaming)
- **checkpointLocation**: Fault tolerance - możliwość recovery po failure
- **Przykładowe transformacje**: ekstrakcja daty z timestamp, normalizacja payment_method, quality checks

### Przykład 3.2: Streaming Aggregations

**Cel:** Continuous aggregations na streaming data

In [None]:
# Przykład 3.2 - Streaming Aggregations

streaming_agg_table = f"{BRONZE_SCHEMA}.orders_streaming_agg"
agg_checkpoint = f"{CHECKPOINT_PATH}/streaming_agg"

# ReadStream - używa folderu
orders_stream = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", f"{agg_checkpoint}/schema")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("multiLine", "true")
    .load(ORDERS_STREAMING_DIR)  # Folder - Auto Loader znajdzie wszystkie pliki JSON
)

# Agregacje: count i sum per payment_method
orders_agg = (
    orders_stream
    .withColumn("payment_method_upper", F.upper(F.col("payment_method")))
    .groupBy("payment_method_upper")
    .agg(
        F.count("order_id").alias("total_orders"),
        F.sum("total_amount").alias("total_revenue"),
        F.avg("total_amount").alias("avg_order_value")
    )
)

# WriteStream z outputMode="complete" dla agregacji
query = (
    orders_agg
    .writeStream
    .format("delta")
    .outputMode("complete")  # Complete mode dla groupBy bez watermark
    .option("checkpointLocation", agg_checkpoint)
    .trigger(once=True)
    .table(streaming_agg_table)
)

query.awaitTermination()

print(f"\n✓ Streaming aggregation completed")
print("\n=== Wyniki agregacji per payment method ===")
display(spark.table(streaming_agg_table).orderBy("payment_method_upper"))

In [None]:
# WriteStream z outputMode="complete" dla agregacji
query = (
    orders_agg
    .writeStream
    .format("delta")
    .outputMode("complete")  # Complete mode dla groupBy bez watermark
    .option("checkpointLocation", agg_checkpoint)
    .trigger(once=True)
    .table(streaming_agg_table)
)

query.awaitTermination()

print(f"\n✓ Streaming aggregation completed")

In [None]:
# Wyświetl wyniki agregacji per payment method
print("=== Wyniki agregacji per payment method ===")
display(spark.table(streaming_agg_table).orderBy("payment_method_upper"))

**Wyjaśnienie:**

Streaming Aggregations:
- **outputMode="complete"**: Cała tabela wynikowa zapisywana przy każdym micro-batch (wymagane dla groupBy bez watermark)
- **outputMode="update"**: Tylko zaktualizowane wiersze (użyteczne z watermark)
- **Stateful operations**: GroupBy/Aggregations wymagają state management (przechowywany w checkpoint)
- **Use case**: Agregacje per payment method (Cash, Credit Card, Debit Card, PayPal) w czasie rzeczywistym

W produkcji: używamy watermark dla windowed aggregations i outputMode="update".

### Przykład 3.3: Różne tryby Triggering

**Cel:** Demonstracja różnych trybów triggering: once, availableNow, processingTime

In [None]:
# Przykład 3.3a - trigger(once=True)
# Process all available data in single batch, then stop

trigger_once_table = f"{BRONZE_SCHEMA}.orders_trigger_once"
trigger_once_checkpoint = f"{CHECKPOINT_PATH}/trigger_once"

print("=== Trigger: once=True ===")
print("Use case: Testing, backfill, one-time ingestion")

orders_stream = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", f"{trigger_once_checkpoint}/schema")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("multiLine", "true")
    .load(ORDERS_STREAMING_DIR)
)

query = (
    orders_stream
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", trigger_once_checkpoint)
    .trigger(once=True)  # Process all data once, then stop
    .table(trigger_once_table)
)

query.awaitTermination()
print(f"✓ Trigger(once) completed: {spark.table(trigger_once_table).count()} rekordów")

In [None]:
# Przykład 3.3b - trigger(availableNow=True)
# Process all available data in multiple micro-batches, then stop

trigger_available_table = f"{BRONZE_SCHEMA}.orders_trigger_available"
trigger_available_checkpoint = f"{CHECKPOINT_PATH}/trigger_available"

print("\n=== Trigger: availableNow=True ===")
print("Use case: Backfill z zachowaniem micro-batch semantics")

orders_stream = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", f"{trigger_available_checkpoint}/schema")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("multiLine", "true")
    .load(ORDERS_STREAMING_DIR)
)

query = (
    orders_stream
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", trigger_available_checkpoint)
    .trigger(availableNow=True)  # Process all available data in micro-batches, then stop
    .table(trigger_available_table)
)

query.awaitTermination()
print(f"✓ Trigger(availableNow) completed: {spark.table(trigger_available_table).count()} rekordów")

In [None]:
# Przykład 3.3c - trigger(processingTime='10 seconds')
# Continuous processing z micro-batch co 10 sekund

trigger_continuous_table = f"{BRONZE_SCHEMA}.orders_trigger_continuous"
trigger_continuous_checkpoint = f"{CHECKPOINT_PATH}/trigger_continuous"

print("\n=== Trigger: processingTime='10 seconds' ===")
print("Use case: Near real-time continuous processing")
print("⚠️ Ten przykład uruchomi streaming query na 30 sekund, potem zatrzyma")

orders_stream = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", f"{trigger_continuous_checkpoint}/schema")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("multiLine", "true")
    .load(ORDERS_STREAMING_DIR)
)

query = (
    orders_stream
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", trigger_continuous_checkpoint)
    .trigger(processingTime='10 seconds')  # Continuous: micro-batch co 10 sekund
    .table(trigger_continuous_table)
)

# Start query (non-blocking)
query_handle = query.start()

print("Query running...")
print("Czekam 30 sekund (pozwoli na ~3 micro-batches)...")
time.sleep(30)

# Stop query
query_handle.stop()
print(f"\n✓ Trigger(processingTime) stopped after 30s: {spark.table(trigger_continuous_table).count()} rekordów")

**Porównanie triggerów:**

| Trigger | Use Case | Zatrzyma się? | Micro-batches |
|---------|----------|---------------|---------------|
| `once=True` | Testing, backfill | ✅ Tak | 1 batch |
| `availableNow=True` | Backfill z micro-batch | ✅ Tak | Multiple |
| `processingTime='X'` | Continuous production | ❌ Nie | Infinite |

**Rekomendacje:**
- **once**: Testing w notebookach, one-time data load
- **availableNow**: Backfill historycznych danych z zachowaniem micro-batch semantics
- **processingTime**: Production continuous streaming (Kafka, Event Hub)

---

## Sekcja 4: MERGE na Streamingu (Upsert)

**Wprowadzenie teoretyczne:**

Structured Streaming może pisać do Delta z MERGE logic (upsert). Używamy foreachBatch dla custom write logic w każdym micro-batch.

**Zastosowanie:**
- CDC (Change Data Capture) streaming
- Upsert streaming events do dimension tables
- Deduplication w real-time

### Przykład 4.1: Streaming MERGE (Upsert)

**Cel:** Implementacja streaming upsert z MERGE INTO

In [None]:
# Przykład 4.1 - Streaming MERGE

from delta.tables import DeltaTable

# Target table dla upsert
upsert_table = f"{BRONZE_SCHEMA}.orders_upsert"
upsert_checkpoint = f"{CHECKPOINT_PATH}/streaming_upsert"

# Utwórz target table jeśli nie istnieje - poprawne typy danych
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {upsert_table} (
        order_id STRING,
        customer_id STRING,
        product_id STRING,
        store_id STRING,
        order_datetime TIMESTAMP,
        quantity INT,
        unit_price DOUBLE,
        discount_percent INT,
        total_amount DOUBLE,
        payment_method STRING,
        last_updated TIMESTAMP
    )
    USING DELTA
""")

print(f"✓ Utworzono upsert table: {upsert_table}")

# ForeachBatch function dla MERGE
def upsert_to_delta(microBatchDF, batchId):
    # Dodaj last_updated timestamp
    microBatchDF = microBatchDF.withColumn("last_updated", F.current_timestamp())
    
    # DeltaTable dla MERGE
    deltaTable = DeltaTable.forName(spark, upsert_table)
    
    # MERGE INTO logic
    (
        deltaTable.alias("target")
        .merge(
            microBatchDF.alias("source"),
            "target.order_id = source.order_id"
        )
        .whenMatchedUpdate(set = {
            "payment_method": "source.payment_method",
            "total_amount": "source.total_amount",
            "order_datetime": "source.order_datetime",
            "quantity": "source.quantity",
            "last_updated": "source.last_updated"
        })
        .whenNotMatchedInsert(values = {
            "order_id": "source.order_id",
            "customer_id": "source.customer_id",
            "product_id": "source.product_id",
            "store_id": "source.store_id",
            "order_datetime": "source.order_datetime",
            "quantity": "source.quantity",
            "unit_price": "source.unit_price",
            "discount_percent": "source.discount_percent",
            "total_amount": "source.total_amount",
            "payment_method": "source.payment_method",
            "last_updated": "source.last_updated"
        })
        .execute()
    )
    
    print(f"Batch {batchId}: Merged {microBatchDF.count()} records")

# ReadStream - używa wildcard dla plików streaming
orders_stream = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", f"{upsert_checkpoint}/schema")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("multiLine", "true")
    .load(ORDERS_STREAMING_JSON)  # Wildcard dla orders_stream_*.json
)

# WriteStream z foreachBatch
query = (
    orders_stream
    .writeStream
    .foreachBatch(upsert_to_delta)  # Custom MERGE logic
    .option("checkpointLocation", upsert_checkpoint)
    .trigger(once=True)
    .start()
)

query.awaitTermination()

print(f"\n✓ Streaming MERGE completed")
print(f"Final count: {spark.table(upsert_table).count()}")

print("\n=== Upserted data ===")
display(spark.table(upsert_table).orderBy("order_id").limit(10))

In [None]:
# ForeachBatch function dla MERGE
def upsert_to_delta(microBatchDF, batchId):
    """
    Custom MERGE logic dla każdego micro-batch.
    Wykonuje UPSERT (UPDATE jeśli istnieje, INSERT jeśli nie).
    """
    # Dodaj last_updated timestamp
    microBatchDF = microBatchDF.withColumn("last_updated", F.current_timestamp())
    
    # DeltaTable dla MERGE
    deltaTable = DeltaTable.forName(spark, upsert_table)
    
    # MERGE INTO logic
    (
        deltaTable.alias("target")
        .merge(
            microBatchDF.alias("source"),
            "target.order_id = source.order_id"
        )
        .whenMatchedUpdate(set = {
            "payment_method": "source.payment_method",
            "total_amount": "source.total_amount",
            "order_datetime": "source.order_datetime",
            "quantity": "source.quantity",
            "last_updated": "source.last_updated"
        })
        .whenNotMatchedInsert(values = {
            "order_id": "source.order_id",
            "customer_id": "source.customer_id",
            "product_id": "source.product_id",
            "store_id": "source.store_id",
            "order_datetime": "source.order_datetime",
            "quantity": "source.quantity",
            "unit_price": "source.unit_price",
            "discount_percent": "source.discount_percent",
            "total_amount": "source.total_amount",
            "payment_method": "source.payment_method",
            "last_updated": "source.last_updated"
        })
        .execute()
    )
    
    print(f"Batch {batchId}: Merged {microBatchDF.count()} records")

print("✓ Funkcja upsert_to_delta zdefiniowana")

In [None]:
# ReadStream - używa folderu
orders_stream = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", f"{upsert_checkpoint}/schema")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("multiLine", "true")
    .load(ORDERS_STREAMING_DIR)  # Folder - Auto Loader znajdzie wszystkie pliki JSON
)

print("✓ Stream configured dla MERGE")

In [None]:
# WriteStream z foreachBatch
query = (
    orders_stream
    .writeStream
    .foreachBatch(upsert_to_delta)  # Custom MERGE logic
    .option("checkpointLocation", upsert_checkpoint)
    .trigger(once=True)
    .start()
)

query.awaitTermination()

print(f"\n✓ Streaming MERGE completed")

In [None]:
# Sprawdź final count
print(f"Final count: {spark.table(upsert_table).count()}")

In [None]:
# Wyświetl upserted data
print("=== Upserted data ===")
display(spark.table(upsert_table).orderBy("order_id").limit(10))

**Wyjaśnienie:**

Streaming MERGE:
- **foreachBatch**: Custom function wykonywana na każdym micro-batch
- **MERGE INTO**: Upsert logic - UPDATE jeśli istnieje, INSERT jeśli nie
- **Idempotency**: Ponowne procesowanie tego samego batch ID daje ten sam rezultat
- **Use case**: CDC streaming, real-time dimension updates, deduplication

### Przykład 4.2: Zarządzanie Checkpointami

**Cel:** Demonstracja jak działa checkpoint location i jak go zarządzać

In [None]:
# Przykład 4.2 - Zarządzanie checkpointami

# Checkpoint location przechowuje state streaming query
# Jest krytyczny dla fault tolerance i exactly-once semantics

print("=== Checkpoint Locations ===")
print(f"Auto Loader: {autoloader_checkpoint}")
print(f"Streaming: {streaming_checkpoint}")
print(f"Upsert: {upsert_checkpoint}")

# Sprawdź strukturę checkpoint location
print(f"\n=== Struktura checkpoint (autoloader) ===")
try:
    checkpoint_files = dbutils.fs.ls(autoloader_checkpoint)
    for file in checkpoint_files:
        print(f"  {file.name}")
except Exception as e:
    print(f"Checkpoint nie istnieje lub jest pusty: {e}")

In [None]:
# Checkpoint zawiera:
# - offsets/: Offset każdego micro-batch (dla recovery)
# - commits/: Commited batches (dla exactly-once semantics)
# - metadata: Stream metadata (id, configuration)
# - sources/: Source-specific tracking (np. file tracking dla Auto Loader)

print("\n=== Checkpoint Best Practices ===")
print("✅ Zawsze używaj checkpoint location dla production streams")
print("✅ Przechowuj w external location (S3, ADLS, DBFS)")
print("✅ Backup przed schema changes")
print("⚠️ Nie usuwaj checkpoint - loss of progress!")
print("⚠️ Nie współdziel checkpoint między różnymi queries")

---

## Porównanie metod ingestion

| Feature | COPY INTO | Auto Loader | Structured Streaming |
|---------|-----------|-------------|---------------------|
| **Latency** | Minuty-godziny | Sekundy-minuty | Sub-sekundy |
| **Use case** | Scheduled batch | Near real-time files | Pure streaming (Kafka) |
| **Idempotency** | ✅ Built-in | ✅ Built-in | ⚠️ Requires checkpoint |
| **Schema evolution** | ⚠️ Manual | ✅ Automatic | ⚠️ Manual |
| **Complexity** | Low | Medium | High |
| **Cost** | Lowest | Medium | Highest |
| **File tracking** | Delta Log | Checkpoint | Checkpoint |

**Rekomendacje:**
- **COPY INTO**: Batch loads (hourly, daily), duże pliki, niskie koszty
- **Auto Loader**: Near real-time, małe pliki, schema evolution
- **Structured Streaming**: Pure streaming sources (Kafka), sub-second latency

---

## Best Practices

**COPY INTO:**
- Używaj dla scheduled batch jobs (daily, hourly)
- Zawsze dodawaj _metadata column dla audytu
- Używaj PATTERN dla filtrowania plików
- Monitor operationMetrics w DESCRIBE HISTORY

**Auto Loader:**
- Włącz schema inference (inferColumnTypes=true)
- Używaj trigger(availableNow=True) dla backfill
- Monitor schema evolution w schemaLocation
- Rozważ file notification dla dużej liczby plików (>10k)

**Structured Streaming:**
- Zawsze ustawiaj checkpointLocation
- Używaj trigger(processingTime) dla continuous streams
- Implementuj watermark dla windowed aggregations
- Monitor stream metrics (numInputRows, inputRowsPerSecond)

**Checkpoints:**
- Przechowuj w niezależnej lokalizacji (nie w table location)
- Backup przed schema changes
- Nie usuwaj checkpoint - loss of progress!
- Używaj external location (S3, ADLS) dla durability

---

## Troubleshooting

**Problem 1: "Stream stopped unexpectedly"**
**Rozwiązanie:**
- Sprawdź checkpoint location - czy istnieje i jest writable
- Sprawdź logi streaming query: `query.lastProgress`
- Monitor exceptions: `query.exception()`

**Problem 2: "Schema mismatch in Auto Loader"**
**Rozwiązanie:**
```python
# Włącz schema evolution
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("mergeSchema", "true")
```

**Problem 3: COPY INTO nie wykrywa nowych plików**
**Rozwiązanie:**
- COPY INTO śledzi tylko file path - zmiana zawartości nie jest wykrywana
- Użyj COPY_OPTIONS ('force' = 'true') dla re-ingestion

**Problem 4: Streaming aggregation state grows indefinitely**
**Rozwiązanie:**
```python
# Dodaj watermark dla time-based cleanup
.withWatermark("event_time", "1 hour")
```

---

## Podsumowanie

**W tym notebooku nauczyliśmy się:**

✅ **COPY INTO:**
- Idempotent batch loads z automatic file tracking
- Pattern matching dla selective ingestion
- _metadata column dla audytu

✅ **Auto Loader:**
- Near real-time file ingestion z cloudFiles format
- Automatic schema inference i evolution
- Checkpoint-based progress tracking

✅ **Structured Streaming:**
- Continuous processing z micro-batch architecture
- Transformacje i agregacje na streaming data
- foreachBatch dla custom write logic (MERGE)

✅ **Triggering modes:**
- trigger(once=True) - batch mode dla testing
- trigger(processingTime) - continuous processing
- trigger(availableNow=True) - backfill mode

**Kluczowe wnioski:**
1. Wybór metody ingestion zależy od latency requirements i source type
2. COPY INTO dla scheduled batch, Auto Loader dla near real-time files
3. Structured Streaming dla pure streaming sources (Kafka)
4. Checkpoint location jest krytyczny dla fault tolerance

**Następne kroki:**
- **Kolejny notebook**: 04_bronze_silver_gold_pipeline.ipynb
- **Warsztat praktyczny**: 02_ingestion_pipeline_workshop.ipynb
- **Delta Live Tables**: Declarative pipelines z automatic orchestration

---

## Cleanup

Posprzątaj zasoby utworzone podczas notebooka:

In [None]:
# Opcjonalne czyszczenie zasobów testowych
# UWAGA: Uruchom tylko jeśli chcesz usunąć wszystkie utworzone dane

# Usuń tabele
# spark.sql(f"DROP TABLE IF EXISTS {copy_into_table}")
# spark.sql(f"DROP TABLE IF EXISTS {validation_table}")
# spark.sql(f"DROP TABLE IF EXISTS {pattern_table}")
# spark.sql(f"DROP TABLE IF EXISTS {autoloader_table}")
# spark.sql(f"DROP TABLE IF EXISTS {streaming_table}")
# spark.sql(f"DROP TABLE IF EXISTS {streaming_agg_table}")
# spark.sql(f"DROP TABLE IF EXISTS {trigger_once_table}")
# spark.sql(f"DROP TABLE IF EXISTS {trigger_available_table}")
# spark.sql(f"DROP TABLE IF EXISTS {trigger_continuous_table}")
# spark.sql(f"DROP TABLE IF EXISTS {upsert_table}")

# Usuń checkpointy
# dbutils.fs.rm(CHECKPOINT_PATH, recurse=True)

# spark.catalog.clearCache()
# print("Zasoby zostały wyczyszczone")