# üìö Lezione 1: ETL Base in Databricks

## üìå Cos'√® ETL?

ETL significa **Extract, Transform, Load** (Estrai, Trasforma, Carica):

1. **Extract**: Leggi dati da fonti esterne (file, database, API, cloud storage)
2. **Transform**: Pulisci, filtra, aggrega, unisci i dati
3. **Load**: Salva i risultati in un sistema di destinazione (tabelle, data warehouse)

### üéØ Perch√© ETL in Databricks?
- **Scalabilit√†**: Spark processa grandi volumi distribuiti su cluster
- **Integrazione**: Accesso a cloud storage (Azure, AWS, GCP)
- **Delta Lake**: Formato ottimizzato con transazioni ACID
- **Unified Analytics**: Stesso ambiente per data engineering, ML e analytics

---

## üîó 1. EXTRACT - Leggere Dati

### Da File (CSV, JSON, Parquet)

In [None]:
# Leggere CSV
df_csv = spark.read.csv("/path/to/data.csv", header=True, inferSchema=True)

# Leggere JSON
df_json = spark.read.json("/path/to/data.json")

# Leggere Parquet (formato ottimizzato)
df_parquet = spark.read.parquet("/path/to/data.parquet")

# Esplorare
df_csv.printSchema()
df_csv.show(5)

### Da Tabelle Delta/Hive

In [None]:
# Leggere da tabella esistente
df_table = spark.read.table("catalog.schema.table_name")

# Oppure con SQL
df_sql = spark.sql("SELECT * FROM catalog.schema.table_name")

df_table.show()

### Da Azure Data Lake Storage (ADLS)

Per accedere a cloud storage, si usa il mounting:

In [None]:
# Configurazione OAuth per ADLS Gen2
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": "<client_id>",
    "fs.azure.account.oauth2.client.secret": "<client_secret>",
    "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenant_id>/oauth2/token"
}

# Mount dello storage
dbutils.fs.mount(
    source = "abfss://<container>@<storage_account>.dfs.core.windows.net/",
    mount_point = "/mnt/datalake",
    extra_configs = configs
)

# Ora puoi leggere dal mount
df = spark.read.csv("/mnt/datalake/data.csv", header=True, inferSchema=True)

---

## üîÑ 2. TRANSFORM - Trasformare Dati

### Pulizia Base

In [None]:
from pyspark.sql.functions import col, when, lit

# Rimuovere righe con valori null
df_clean = df.filter(col('important_column').isNotNull())

# Rimuovere duplicati
df_clean = df.dropDuplicates(['id', 'email'])

# Rinominare colonne
df_clean = df.withColumnRenamed('old_name', 'new_name')

# Rimuovere colonne non necessarie
df_clean = df.drop('unwanted_col1', 'unwanted_col2')

df_clean.show()

### Creare Nuove Colonne

In [None]:
from pyspark.sql.functions import col, when, lit, concat

# Nuova colonna calcolata
df_transformed = df.withColumn('total_price', col('quantity') * col('unit_price'))

# Colonna con logica condizionale
df_transformed = df.withColumn('price_category',
    when(col('price') > 1000, 'Expensive')
    .when(col('price') > 100, 'Medium')
    .otherwise('Cheap')
)

# Colonna costante
df_transformed = df.withColumn('country', lit('Italy'))

# Concatenare colonne
df_transformed = df.withColumn('full_name', 
    concat(col('first_name'), lit(' '), col('last_name'))
)

df_transformed.show()

### Filtrare e Selezionare

In [None]:
# Filtrare righe
df_filtered = df.filter(col('age') > 18)
df_filtered = df.filter((col('age') > 18) & (col('country') == 'Italy'))

# Selezionare colonne specifiche
df_selected = df.select('customer_id', 'name', 'total_amount')

# Ordinare
df_sorted = df.orderBy(col('date').desc())

df_sorted.show()

### Aggregazioni

In [None]:
from pyspark.sql.functions import sum, avg, count, max, min

# Group by semplice
df_grouped = df.groupBy('category').count()

# Aggregazioni multiple
df_agg = df.groupBy('category').agg(
    count('*').alias('total_records'),
    sum('amount').alias('total_amount'),
    avg('amount').alias('avg_amount'),
    max('amount').alias('max_amount'),
    min('amount').alias('min_amount')
)

df_agg.show()

### Join (Unire Dataset)

In [None]:
# Supponiamo di avere due DataFrame
customers = spark.read.table("customers")
orders = spark.read.table("orders")

# Inner join (solo righe con match)
df_joined = customers.join(orders, 'customer_id', 'inner')

# Left join (tutte le righe di customers, anche senza ordini)
df_left = customers.join(orders, 'customer_id', 'left')

# Join con colonne diverse
df_joined = customers.join(orders, customers.id == orders.cust_id, 'inner')

df_joined.show()

---

## üíæ 3. LOAD - Salvare Risultati

### Salvare come Tabella Delta

In [None]:
# Creare catalog e schema (una tantum)
spark.sql("CREATE CATALOG IF NOT EXISTS my_catalog")
spark.sql("CREATE SCHEMA IF NOT EXISTS my_catalog.my_schema")

# Salvare tabella
df_result.write.mode('overwrite').saveAsTable('my_catalog.my_schema.customers_clean')

# Mode options:
# 'overwrite' - Sovrascrive tabella esistente
# 'append' - Aggiunge righe a tabella esistente
# 'error' - Fallisce se tabella esiste (default)
# 'ignore' - Non fa nulla se tabella esiste

### Salvare come File

In [None]:
# Salvare come Parquet (raccomandato per performance)
df_result.write.mode('overwrite').parquet('/mnt/datalake/output.parquet')

# Salvare come CSV
df_result.write.mode('overwrite').csv('/mnt/datalake/output.csv', header=True)

# Salvare come Delta (formato ottimizzato)
df_result.write.format('delta').mode('overwrite').save('/mnt/datalake/output_delta')

---

## üöÄ Esempio Completo: Pipeline ETL

### Scenario: Analisi Vendite E-commerce

In [None]:
from pyspark.sql.functions import col, when, sum, avg, count

# ========== EXTRACT ==========
# Leggi dati grezzi
orders_raw = spark.read.csv("/mnt/datalake/orders.csv", header=True, inferSchema=True)
customers_raw = spark.read.table("bronze.customers")

print("=== Dati Grezzi ===")
orders_raw.printSchema()
orders_raw.show(5)
print(f"Totale ordini: {orders_raw.count()}")

# ========== TRANSFORM ==========
# 1. Pulizia ordini
orders_clean = (
    orders_raw
    .filter(col('amount').isNotNull())  # Rimuovi null
    .filter(col('amount') > 0)          # Rimuovi valori negativi
    .dropDuplicates(['order_id'])       # Rimuovi duplicati
)

# 2. Arricchimento: aggiungi categoria di spesa
orders_enriched = orders_clean.withColumn('spending_level',
    when(col('amount') > 1000, 'High')
    .when(col('amount') > 100, 'Medium')
    .otherwise('Low')
)

# 3. Join con customers
orders_with_customers = orders_enriched.join(
    customers_raw.select('customer_id', 'customer_name', 'country'),
    'customer_id',
    'inner'
)

print("=== Dati Trasformati ===")
orders_with_customers.show(5)

# 4. Aggregazione: statistiche per paese
country_stats = orders_with_customers.groupBy('country').agg(
    count('*').alias('total_orders'),
    sum('amount').alias('total_revenue'),
    avg('amount').alias('avg_order_value')
).orderBy(col('total_revenue').desc())

print("=== Risultati Finali ===")
country_stats.show()

# ========== LOAD ==========
# Salva risultati in Silver/Gold layer
orders_with_customers.write.mode('overwrite').saveAsTable('silver.orders_enriched')
country_stats.write.mode('overwrite').saveAsTable('gold.country_revenue')

print("‚úÖ Pipeline ETL completata!")

---

## üéØ Architettura Medallion (Bronze-Silver-Gold)

Best practice per organizzare i dati in Databricks:

### üü§ Bronze Layer (Dati Grezzi)
- Dati come arrivano dalla fonte
- Nessuna trasformazione
- Schema originale preservato
```python
df_raw.write.saveAsTable('bronze.orders_raw')
```

### ü•à Silver Layer (Dati Puliti)
- Pulizia, deduplicazione, validazione
- Standardizzazione tipi di dati
- Join base
```python
df_clean.write.saveAsTable('silver.orders_clean')
```

### ü•á Gold Layer (Dati Aggregati)
- Dati pronti per analytics/BI
- Aggregazioni, metriche business
- Ottimizzati per query
```python
df_aggregated.write.saveAsTable('gold.daily_sales')
```

---

## üí° Best Practices ETL

### 1. **Usa sempre `inferSchema=True` con cautela**
```python
# ‚ö†Ô∏è Va bene per esplorare, ma lento su grandi dataset
df = spark.read.csv("data.csv", header=True, inferSchema=True)

# ‚úÖ Meglio: definisci schema esplicito per performance
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
])
df = spark.read.csv("data.csv", header=True, schema=schema)
```

### 2. **Partiziona i dati grandi**
```python
# Salva partizionato per query pi√π veloci
df.write.partitionBy('year', 'month').saveAsTable('my_table')
```

### 3. **Cache per riutilizzo**
```python
# Se usi lo stesso DataFrame molte volte
df_clean = df.filter(col('valid') == True).cache()
df_clean.count()  # Materializza cache
```

### 4. **Monitora sempre le trasformazioni**
```python
# Verifica conteggi a ogni step
print(f"Righe iniziali: {df_raw.count()}")
print(f"Dopo pulizia: {df_clean.count()}")
print(f"Dopo join: {df_joined.count()}")
```

### 5. **Delta Lake per produzione**
```python
# Sempre usa Delta per ACID transactions e time travel
df.write.format('delta').mode('overwrite').saveAsTable('my_table')
```

---

## üìö Prossimi Passi

Ora che conosci ETL base:
1. **Pratica**: Vedi [PRATICA.ipynb](../PRATICA.ipynb) per esercizi con Delta Live Tables
2. **Architettura**: Vedi [LEZIONE 2 APPUNTI.IPYNB](LEZIONE 2 APPUNTI.IPYNB) per Lakehouse Architecture
3. **Avanzato**: Vedi [LEZIONE 3 APPUNTI.IPYNB](LEZIONE 3 APPUNTI.IPYNB) per Unity Catalog e integrazione ADLS

### üîë Ricorda:
- **Extract**: `spark.read.*`
- **Transform**: `.filter()`, `.withColumn()`, `.groupBy()`, `.join()`
- **Load**: `.write.saveAsTable()` o `.write.parquet()`