## Spark Iceberg


## Criando sessão spark
Esta célula cria uma **SparkSession** configurada para integrar com o **Iceberg** usando o **Hadoop Catalog**.  
Ela define os parâmetros de conexão com o **MinIO** como storage S3, incluindo credenciais, endpoint e estilo de acesso, permitindo que o Spark leia e escreva tabelas Iceberg diretamente no nosso Lakehouse.


In [1]:
from pyspark.sql import SparkSession

# Cria SparkSession com Iceberg + Hadoop Catalog
spark = (
    SparkSession.builder
    .appName("MinIO Parquet to Iceberg")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.hadoop_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.hadoop_catalog.type", "hadoop")
    .config("spark.sql.catalog.hadoop_catalog.warehouse", "s3a://lakehouse/")
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .getOrCreate()
)




# Extrair Campos e Expandir Arrays

Esta célula transforma o dataset para um formato tabular mais fácil de analisar:  

1. **Extrair campos do cliente:** cria colunas separadas para `gender`, `age`, `email` e `satisfaction`.  
2. **Expandir array de itens:** cada item comprado se torna uma linha própria com `item_name`, `item_price` e `item_quantity`.  
3. **Expandir array de tags:** cada tag de item vira uma linha na coluna `item_tag`.  

O resultado é um dataframe totalmente normalizado pronto para converter em tabela iceberg



In [2]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, ArrayType

# -------------------------------
# Ler dados do MinIO
# -------------------------------
raw_path = "s3a://raw/sales.parquet"
df = spark.read.parquet(raw_path)

# -------------------------------
# Achatar customer
# -------------------------------
if "customer" in df.columns:
    df = df.withColumn("customer_gender", F.col("customer.gender")) \
           .withColumn("customer_age", F.col("customer.age")) \
           .withColumn("customer_email", F.col("customer.email")) \
           .withColumn("customer_satisfaction", F.col("customer.satisfaction")) \
           .drop("customer")

# -------------------------------
# Explodir items (array -> linhas)
# -------------------------------
if "items" in df.columns:
    # Converter items de JSON string para StructType se for string
    if isinstance(df.schema["items"].dataType, StringType):
        items_schema = ArrayType(
            StructType([
                StructField("name", StringType(), True),
                StructField("tags", ArrayType(StringType(), True), True),
                StructField("price", DoubleType(), True),
                StructField("quantity", IntegerType(), True)
            ])
        )
        df = df.withColumn("items", F.from_json(F.col("items"), items_schema))

    # Explodir array items
    df = df.withColumn("item", F.explode_outer("items")) \
           .withColumn("item_name", F.col("item.name")) \
           .withColumn("item_tags", F.col("item.tags")) \
           .withColumn("item_price", F.col("item.price")) \
           .withColumn("item_quantity", F.col("item.quantity")) \
           .drop("items", "item")

# -------------------------------
# Explodir item_tags (array -> linhas)
# -------------------------------
if "item_tags" in df.columns:
    df = df.withColumn("item_tag", F.explode_outer("item_tags")) \
           .drop("item_tags")



# Criar/Atualizar Tabela Iceberg

Esta célula salva o DataFrame transformado como uma **tabela gerenciada Iceberg**:  

- O nome da tabela é `hadoop_catalog.sales`.  
- Caso a tabela já exista, ela será **substituída** com os novos dados.  
- A tabela fica armazenada no **catálogo Hadoop** configurado para o Iceberg, permitindo consultas SQL futuras via Spark.


In [3]:
# Nome da tabela Iceberg
table_name = "hadoop_catalog.sales"

# Cria/atualiza tabela Iceberg
df.writeTo(table_name).using("iceberg").createOrReplace()

# Consultar Todos os Dados

Esta célula executa uma **consulta SQL** na tabela Iceberg `hadoop_catalog.sales` e exibe as **10 primeiras linhas**:  

- `spark.sql(...)` permite escrever queries SQL diretamente no Spark.  
- `show(10, truncate=False)` garante que todas as colunas sejam exibidas **completas**, sem cortar valores longos.  
- Útil para **inspecionar os dados** após as transformações e salvar no Iceberg.


In [5]:
# Consulta todos os dados
spark.sql("SELECT * FROM hadoop_catalog.sales").show(30, truncate=False)

+------------------------+--------------------------+-------------+----------+--------------+---------------+------------+----------------+---------------------+-------------+----------+-------------+------------+
|_id                     |saleDate                  |storeLocation|couponUsed|purchaseMethod|customer_gender|customer_age|customer_email  |customer_satisfaction|item_name    |item_price|item_quantity|item_tag    |
+------------------------+--------------------------+-------------+----------+--------------+---------------+------------+----------------+---------------------+-------------+----------+-------------+------------+
|5bd761dcae323e45a93ccfe8|2015-03-23T21:06:49.506000|Denver       |true      |Online        |M              |42          |cauho@witwuta.sv|4                    |printer paper|40.01     |2            |office      |
|5bd761dcae323e45a93ccfe8|2015-03-23T21:06:49.506000|Denver       |true      |Online        |M              |42          |cauho@witwuta.sv|4    