# 02 - Load and Create Tables (Delta & Iceberg)

Este notebook mostra um fluxo passo-a-passo para:
1. Criar uma SparkSession configurada para **Delta Lake** e **Apache Iceberg**;
2. Carregar um CSV de exemplo (NYC TLC Green Taxi);
3. Criar tabelas Delta e Iceberg;
4. Mostrar exemplos de `INSERT`, `UPDATE`, `DELETE` e `MERGE` para ambas tabelas.


## 1) Preparacao e variáveis úteis

Defina (no terminal antes de abrir o Jupyter) as variáveis de ambiente para garantir que os jars do Delta e Iceberg sejam carregados pelo kernel do PySpark:

```bash
export SPARK_PACKAGES="io.delta:delta-core_2.12:2.2.0,org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.0"
PYSPARK_SUBMIT_ARGS="--packages $SPARK_PACKAGES pyspark-shell" jupyter lab
```

No Windows PowerShell use `setx`/`$env:SPARK_PACKAGES=...` conforme necessário.

In [None]:
# Imports e criação da SparkSession usando src.spark_config.build_spark
import os, sys
proj_root = os.path.abspath(os.path.join('..'))
sys.path.append(os.path.abspath('../src'))

try:
    from spark_config import build_spark
except Exception as e:
    # caso o import falhe (por exemplo pasta não no path), tente importar pelo pacote src
    try:
        sys.path.append(os.path.abspath('..'))
        from src.spark_config import build_spark
    except Exception:
        print("Não foi possível importar build_spark automaticamente. Verifique o PYTHONPATH e a estrutura do projeto.")
        raise

spark = build_spark('pyspark-datalakes-notebook')
print('Spark version:', spark.version)
spark.conf.get('spark.jars.packages') if 'spark.jars.packages' in spark.conf.getAll() else None


## 2) Baixar/Localizar dados de exemplo
Este notebook espera um CSV em `data/green_tripdata_sample.csv`. Se não existir, ele tentará baixar um subconjunto público (NYC TLC).

In [None]:
data_path = os.path.abspath(os.path.join('..','data','green_tripdata_sample.csv'))
os.makedirs(os.path.dirname(data_path), exist_ok=True)

if not os.path.exists(data_path):
    print('Arquivo não encontrado em', data_path)
    print('Tentando baixar uma amostra pública (se houver internet disponível)...')
    # tentativa de download com curl (fallback para python requests não garantido)
    url = 'https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2022-01.csv'
    cmd = f"curl -sS -o '{data_path}' '{url}' || wget -q -O '{data_path}' '{url}'"
    rc = os.system(cmd)
    if rc == 0 and os.path.exists(data_path):
        print('Download concluído:', data_path)
    else:
        print('Não foi possível baixar o arquivo automaticamente. Você pode colocar um CSV em', data_path)
else:
    print('Dados encontrados em', data_path)


In [None]:
# 3) Ler o CSV com inferSchema (cuidado: infer pode ser lento para arquivos grandes)
if os.path.exists(data_path):
    trips_raw = (
        spark.read
        .option('header', True)
        .option('inferSchema', True)
        .csv(data_path)
    )
    print('Número de linhas (amostra):', trips_raw.count())
    display_cols = trips_raw.columns[:12]
    print('Colunas (amostra):', display_cols)
    trips_raw.printSchema()
else:
    print('Arquivo CSV não disponível — pare aqui e adicione um CSV em data/green_tripdata_sample.csv')


## 4) Modelagem e criação do DataFrame final `trips_df`
Selecionamos apenas as colunas necessárias e normalizamos nomes. Também geramos um `trip_id` único.

In [None]:
from pyspark.sql.functions import col, expr, monotonically_increasing_id, lit
from pyspark.sql.types import StringType

if 'trips_raw' in globals():
    # adaptamos os nomes conforme a disponibilidade no CSV
    cols = trips_raw.columns
    # heurística: procurar colunas de pickup/dropoff datetime
    pickup_col = next((c for c in cols if 'pickup' in c.lower()), None)
    dropoff_col = next((c for c in cols if 'dropoff' in c.lower()), None)
    vendor_col = next((c for c in cols if 'vendor' in c.lower()), None)
    fare_col = next((c for c in cols if 'fare' in c.lower()), None)
    pass_col = next((c for c in cols if 'passenger' in c.lower()), None)
    dist_col = next((c for c in cols if 'distance' in c.lower()), None)

    print('Usando colunas:', pickup_col, dropoff_col, vendor_col, fare_col, pass_col, dist_col)

    trips_df = trips_raw.select(
        expr('cast(uuid() as string) as trip_id'),
        col(pickup_col).alias('pickup_datetime') if pickup_col is not None else lit(None).cast(StringType()).alias('pickup_datetime'),
        col(dropoff_col).alias('dropoff_datetime') if dropoff_col is not None else lit(None).cast(StringType()).alias('dropoff_datetime'),
        col(vendor_col).alias('vendor_id') if vendor_col is not None else lit('unknown').alias('vendor_id'),
        col(pass_col).cast('int').alias('passenger_count') if pass_col is not None else lit(1).alias('passenger_count'),
        col(dist_col).cast('double').alias('trip_distance') if dist_col is not None else lit(0.0).alias('trip_distance'),
        col(fare_col).cast('double').alias('fare_amount') if fare_col is not None else lit(0.0).alias('fare_amount')
    )
    print('trips_df schema:')
    trips_df.printSchema()
    print('Exemplo de linhas:')
    trips_df.show(5, truncate=False)
else:
    print('trips_raw não definido — carregue o CSV')


## 5) Criar tabela Delta (local path)
Usamos um diretório local (`/tmp/datalake/delta_trips`) para armazenar os arquivos Delta.

In [None]:
delta_path = '/tmp/datalake/delta_trips'
if 'trips_df' in globals():
    # sobrescreve para facilitar testes iterativos
    trips_df.write.format('delta').mode('overwrite').save(delta_path)
    # registrar no catálogo do Spark (opcional)
    spark.sql(f"CREATE TABLE IF NOT EXISTS default.delta_trips USING DELTA LOCATION '{delta_path}'")
    print('Tabela Delta criada em', delta_path)
    print('Contagem Delta:', spark.read.format('delta').load(delta_path).count())
else:
    print('trips_df não existe — não é possível criar a tabela Delta')


## 6) Criar tabela Iceberg (catalog `local_iceberg` configurado em spark_config)
Usamos o catálogo `local_iceberg` (type=hadoop) configurado no `spark_config.py`.

In [None]:
try:
    # criar/registrar tabela Iceberg via writeTo (Spark 3.4+ com Iceberg runtime)
    spark.sql('CREATE NAMESPACE IF NOT EXISTS local_iceberg.default')
except Exception:
    pass

try:
    if 'trips_df' in globals():
        # writeTo API pode exigir Spark 3.4+ e o runtime iceberg-spark
        trips_df.writeTo('local_iceberg.default.iceberg_trips').tableProperty('format-version','2').createOrReplace()
        print('Tabela Iceberg criada: local_iceberg.default.iceberg_trips')
        print('Contagem Iceberg:', spark.table('local_iceberg.default.iceberg_trips').count())
    else:
        print('trips_df não existe — não é possível criar a tabela Iceberg')
except Exception as e:
    print('Falha ao criar tabela Iceberg — verifique se o runtime iceberg-spark está disponível nos jars do Spark')
    print('Erro:', e)


## 7) Exemplos de operações na tabela Delta
Demonstramos `INSERT` (append), `UPDATE` e `DELETE` usando a API Delta.

In [None]:
from pyspark.sql import Row
from datetime import datetime

if os.path.exists('/tmp/datalake/delta_trips'):
    # 1) INSERT (append)
    new_rows = [
        Row(trip_id='demo-1', pickup_datetime=str(datetime.utcnow()), dropoff_datetime=str(datetime.utcnow()), vendor_id='v_demo', passenger_count=1, trip_distance=1.2, fare_amount=10.0),
        Row(trip_id='demo-2', pickup_datetime=str(datetime.utcnow()), dropoff_datetime=str(datetime.utcnow()), vendor_id='v_demo', passenger_count=2, trip_distance=3.4, fare_amount=20.0)
    ]
    new_df = spark.createDataFrame(new_rows)
    new_df.write.format('delta').mode('append').save('/tmp/datalake/delta_trips')
    print('Inseridos rows na Delta')
    
    # 2) UPDATE
    try:
        from delta.tables import DeltaTable
        dt = DeltaTable.forPath(spark, '/tmp/datalake/delta_trips')
        dt.update("trip_id = 'demo-1'", {"fare_amount": "fare_amount * 1.5"})
        print('Atualizado demo-1 na Delta')
    except Exception as e:
        print('Erro no UPDATE Delta (verifique delta-spark estar presente):', e)

    # 3) DELETE
    try:
        dt.delete("trip_id = 'demo-2'")
        print('Deletado demo-2 da Delta')
    except Exception as e:
        print('Erro no DELETE Delta (verifique delta-spark):', e)

    print('Contagem final Delta:', spark.read.format('delta').load('/tmp/datalake/delta_trips').count())
else:
    print('Delta path não existe — pule esta seção')


## 8) Exemplos de operações na tabela Iceberg
Mostramos `INSERT` (append), `UPDATE` e `DELETE` usando a escrita DataFrame/SQL via catálogo `local_iceberg`.

In [None]:
try:
    # INSERT (append) para Iceberg
    new_rows_ice = [
        {'trip_id':'ice-1', 'pickup_datetime':None, 'dropoff_datetime':None, 'vendor_id':'v_ice', 'passenger_count':1, 'trip_distance':2.2, 'fare_amount':15.0},
        {'trip_id':'ice-2', 'pickup_datetime':None, 'dropoff_datetime':None, 'vendor_id':'v_ice', 'passenger_count':3, 'trip_distance':5.1, 'fare_amount':25.0}
    ]
    new_df_ice = spark.createDataFrame(new_rows_ice)
    try:
        new_df_ice.writeTo('local_iceberg.default.iceberg_trips').append()
        print('Inseridos rows na Iceberg')
    except Exception as e:
        print('Erro ao inserir via writeTo — pode faltar o runtime iceberg-spark nos jars:', e)

    # UPDATE via SQL (se suportado)
    try:
        spark.sql("UPDATE local_iceberg.default.iceberg_trips SET fare_amount = fare_amount * 1.2 WHERE trip_id = 'ice-1'")
        print('Atualizado ice-1 na Iceberg (SQL)')
    except Exception as e:
        print('UPDATE SQL falhou para Iceberg — verifique suporte no runtime e permissões:', e)

    # DELETE via SQL
    try:
        spark.sql("DELETE FROM local_iceberg.default.iceberg_trips WHERE trip_id = 'ice-2'")
        print('Deletado ice-2 da Iceberg (SQL)')
    except Exception as e:
        print('DELETE SQL falhou para Iceberg:', e)

    # Mostrar contagem (se tabela existir)
    try:
        cnt = spark.table('local_iceberg.default.iceberg_trips').count()
        print('Contagem Iceberg:', cnt)
    except Exception as e:
        print('Não foi possível ler a tabela Iceberg — talvez não esteja criada ou falta runtime:', e)

except Exception as e:
    print('Seção Iceberg falhou:', e)


## 9) Limpeza e leitura final
Leitura das tabelas para verificar o estado final.

In [None]:
print('Delta rows sample:')
try:
    display(spark.read.format('delta').load(delta_path).limit(5).toPandas())
except Exception as e:
    print('Não foi possível ler Delta para exibir (verifique delta jar):', e)

print('\nIceberg rows sample:')
try:
    display(spark.table('local_iceberg.default.iceberg_trips').limit(5).toPandas())
except Exception as e:
    print('Não foi possível ler Iceberg para exibir (verifique iceberg runtime):', e)
