# Importando Pacotes Necessários

In [6]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import lit
from minio import Minio

# Indicando variáveis 

In [5]:
# # Variáveis de ambiente
MINIO_ENDPOINT = "seu_end_point"
MINIO_ACCESS_KEY = "sua_acess_key"
MINIO_SECRET_KEY = "seu_secret_key"

# Conexão Minio
storage_options = {
    "AWS_ACCESS_KEY_ID": "sua_acess_key",
    "AWS_SECRET_ACCESS_KEY": "seu_secret_key",
    "AWS_ENDPOINT_URL":"seu_end_point",
    "AWS_REGION": "us-east-1",
    "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}

Conexão bem-sucedida. Buckets disponíveis:
hive-metastore-trusted
payments
pessoas-e-organizacoes
risk
serasa-parser
teste-felipe
teste-joao
teste-sync
tests-schemas
vini-teste


# Criando sessão Spark

In [3]:
spark = SparkSession.builder \
    .appName(# Dando um Nome para a instância 
        "Delta_Spark") \
        \
    .config(# Jars Necessários para utilizar os recursos
        "spark.jars", 
        "C:/Local/Onde/Esta/OSpark/jars/hadoop-aws-3.3.4.jar,"
        "C:/Local/Onde/Esta/OSpark/jars/aws-java-sdk-bundle-1.12.732.jar,"
        "C:/Local/Onde/Esta/OSpark/jars/delta-storage-3.2.0.jar,"
        "C:/Local/Onde/Esta/OSpark/jars/delta-spark_2.12-3.2.0.jar") \
        \
    .config(# Credenciais MiniO - MINIO_ACCESS_KEY
        "spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY) \
        \
    .config(# Credenciais MiniO - MINIO_SECRET_KEY
        "spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY) \
        \
    .config(# Credenciais MiniO - MINIO_ENDPOINT
        "spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT) \
        \
    .config(# Ativa o acesso baseado em caminho (path-style) ao invés de virtual-hosted style. 
        "fs.s3a.path.style.access", "true") \
        \
    .config(# Define que o Spark deve usar a implementação do sistema de arquivos S3A para acessar o S3
        "spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        \
    .config(# Usa a implementação de armazenamento de log S3SingleDriverLogStore para operações Delta Lake
        "spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
        \
    .config(# Ativa extensões Delta Lake no Spark SQL, adicionando funcionalidades específicas do Delta Lake
        "spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        \
    .config(# Define o catálogo Delta Lake como o catálogo SQL padrão do Spark, permitindo a criação e gestão de tabelas Delta Lake usando comandos SQL
        "spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        \
    .getOrCreate()

# Exemplo DataFrame

In [4]:
# Criando um DataFrame de Exemplo
df = spark.createDataFrame([("joao", 47), ("maria", 35), ("pedro", 51)]).toDF(
    "nome", "idade"
)

df.show()

+----------+---+
|first_name|age|
+----------+---+
|       bob| 47|
|        li| 23|
|   leonard| 51|
+----------+---+



# Salvando em um Bucket

In [6]:
# Salvando no MiniO
try:
    df.write \
      .option("mergeSchema", "true") \
      .mode("append") \
      .format("delta") \
      .save("s3a://seu_caminho", storage_options = storage_options)

    print("DataFrame salvo com sucesso!")
    
except Exception as e:
    print("Erro ao salvar o DataFrame como Delta Lake:")
    print(e)

DataFrame salvo como Delta Lake com sucesso!


# Lendo o arquivo salvo

In [4]:
df = spark.read.format("delta").load("s3a://seu_caminho")

df.show()

+----------+---+-------------+---------+
|first_name|age|nacionalidade|     sexo|
+----------+---+-------------+---------+
|   leonard| 51|   brasileiro|masculino|
|       bob| 47|   brasileiro|masculino|
|        li| 23|   brasileiro|masculino|
|   leonard| 51|   brasileiro|     NULL|
|       bob| 47|   brasileiro|     NULL|
|        li| 23|   brasileiro|     NULL|
+----------+---+-------------+---------+



# Finalizando Sessão

In [18]:
# Encerrando Spark
spark.stop()