# Exemplo de Conexão Spark com MinIO
Este notebook demonstra como conectar o Spark com o MinIO para ler e escrever dados.

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import logging
import sys

In [3]:
# Configuração de logs
logging.getLogger('py4j').setLevel(logging.ERROR)
logging.getLogger('pyspark').setLevel(logging.WARN)

## Configuração do Spark com MinIO

In [3]:
# Configuração do Spark com MinIO
spark = SparkSession.builder \
            .appName("MinIO Delta Example") \
            .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
            .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
            .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
            .config("spark.hadoop.fs.s3a.path.style.access", "true") \
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
            .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
            .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
            .config("spark.sql.warehouse.dir", "s3a://datalake/warehouse") \
            .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0,org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-common:3.3.4") \
            .config("spark.hadoop.fs.s3a.buffer.dir", "/tmp") \
            .config("spark.hadoop.fs.s3a.fast.upload", "true") \
            .config("spark.hadoop.fs.s3a.fast.upload.buffer", "disk") \
            .getOrCreate()

# Configuração adicional para reduzir logs
spark.sparkContext.setLogLevel("WARN")

:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
org.apache.hadoop#hadoop-common added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-dbf95f5d-7b03-46bb-bb1c-6838e4016532;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.3.0 in central
	found io.delta#delta-storage;2.3.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found org.apache.hadoop#hadoop-common;3.3.4 in central
	found org.apache.hadoop.thirdparty#hadoop-shaded-protobuf_3_7;1.1.1 in central
	found org.apache.hadoop#hadoop-annotations;3.3.4 in central
	found org.apache.hadoop.thirdparty#hadoop-shaded-guava;1.1.1 in central
	found com.google.guava#guava;27

25/04/04 02:17:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 48792)
Traceback (most recent call last):
  File "/opt/bitnami/python/lib/python3.8/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/bitnami/python/lib/python3.8/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/bitnami/python/lib/python3.8/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/bitnami/python/lib/python3.8/socketserver.py", line 747, in __init__
    self.handle()
  File "/opt/bitnami/spark/python/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/opt/bitnami/spark/python/pyspark/accumulators.py", line 253, in poll
    i

In [4]:
# Criando dados de exemplo
data = [("João", 25), ("Maria", 30), ("Pedro", 35)]

schema = StructType([
    StructField("nome", StringType(), True),
    StructField("idade", IntegerType(), True)
])
df = spark.createDataFrame(data, schema)
df.show()

25/04/04 02:17:53 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


[Stage 0:>                                                          (0 + 1) / 1]

+-----+-----+
| nome|idade|
+-----+-----+
| João|   25|
|Maria|   30|
|Pedro|   35|
+-----+-----+



                                                                                

In [5]:
# Escrevendo dados no MinIO em formato Delta
df.write.format("delta") \
    .mode("overwrite") \
    .save("s3a://raw/exemplo_pessoas")

                                                                                

25/04/04 02:18:03 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

In [6]:
# Lendo dados do MinIO
df_read = spark.read.format("delta") \
    .load("s3a://raw/exemplo_pessoas")
df_read.show()

+-----+-----+
| nome|idade|
+-----+-----+
| João|   25|
|Maria|   30|
|Pedro|   35|
+-----+-----+



In [7]:
# Verificando o schema
df_read.printSchema()

root
 |-- nome: string (nullable = true)
 |-- idade: integer (nullable = true)



In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Delta Example") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Ler a tabela Delta
df = spark.read.format("delta").load("s3a://raw/exemplo_pessoas")
df.printSchema()

root
 |-- nome: string (nullable = true)
 |-- idade: integer (nullable = true)

