In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

spark = (
    SparkSession.builder.appName("Delta_Iceberg_Poetry_Lab")
    .config("spark.sql.warehouse.dir", os.path.join(os.getcwd(), "warehouse/delta"))
    .enableHiveSupport() # Habilitar suporte a Hive é bom para catálogos
    .getOrCreate()
)

print("Spark Session iniciada com sucesso!")
print(f"Versão do Spark: {spark.version}")


print("\n--- Verificando Configurações ---")
sql_extensions = spark.conf.get("spark.sql.extensions")
spark_catalog = spark.conf.get("spark.sql.catalog.spark_catalog")
iceberg_catalog_warehouse = spark.conf.get("spark.sql.catalog.local_iceberg.warehouse")

print(f"Extensões SQL ativas: {sql_extensions}")
print(f"Catálogo Spark (Delta): {spark_catalog}")
print(f"Warehouse do Catálogo Iceberg: {iceberg_catalog_warehouse}")


print("\n--- Trabalhando com Delta Lake ---")

# Criar dados de exemplo
data_delta = [
    (1, "Alice", 34),
    (2, "Bob", 45),
    (3, "Charlie", 28)
]
columns_delta = ["id", "nome", "idade"]
df_delta = spark.createDataFrame(data_delta, columns_delta)

# Definir o nome da tabela Delta
delta_table_name = "funcionarios_delta"

# Salvar o DataFrame como uma tabela Delta (sobrescreve se existir)
print(f"Salvando dados na tabela Delta: {delta_table_name}")
df_delta.write.format("delta").mode("overwrite").saveAsTable(delta_table_name)

# Ler os dados da tabela Delta
print("Lendo dados da tabela Delta:")
spark.read.table(delta_table_name).show()

# Realizar uma atualização (operação UPDATE)
print("Atualizando a idade de Alice para 35...")
spark.sql(f"UPDATE {delta_table_name} SET idade = 35 WHERE nome = 'Alice'")

# Mostrar o resultado após a atualização
print("Dados após a atualização:")
spark.read.table(delta_table_name).show()


print("\n--- Trabalhando com Apache Iceberg ---")

# Criar dados de exemplo
data_iceberg = [
    (101, "Produto A", "Eletrônicos", 999.90),
    (102, "Produto B", "Livros", 79.90),
    (103, "Produto C", "Eletrônicos", 1500.00)
]
columns_iceberg = ["produto_id", "nome", "categoria", "preco"]
df_iceberg = spark.createDataFrame(data_iceberg, columns_iceberg)

# Definir o nome da tabela Iceberg (incluindo o catálogo)
iceberg_table_name = "local_iceberg.db.produtos_iceberg"

# Criar o namespace (database) se não existir
spark.sql("CREATE DATABASE IF NOT EXISTS local_iceberg.db")

# Salvar o DataFrame como uma tabela Iceberg
print(f"Salvando dados na tabela Iceberg: {iceberg_table_name}")
df_iceberg.write.format("iceberg").mode("overwrite").saveAsTable(iceberg_table_name)

# Ler os dados da tabela Iceberg
print("Lendo dados da tabela Iceberg:")
spark.read.table(iceberg_table_name).show()

# Adicionar uma nova coluna (evolução de esquema)
print("Adicionando a coluna 'em_estoque'...")
spark.sql(f"ALTER TABLE {iceberg_table_name} ADD COLUMN em_estoque BOOLEAN")

# Inserir novos dados com a coluna adicional
spark.sql(f"INSERT INTO {iceberg_table_name} VALUES (104, 'Produto D', 'Casa', 250.0, true)")

print("Dados após a evolução de esquema e inserção:")
spark.read.table(iceberg_table_name).show()



ModuleNotFoundError: No module named 'pyspark'