In [1]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType , DoubleType
from pyspark.sql import SparkSession
import boto3
import os
from dotenv import load_dotenv

# Carrega o conteúdo do .env
load_dotenv(dotenv_path=".env")

# Usa as variáveis
access_key = os.getenv("MINIO_ROOT_USER")
secret_key = os.getenv("MINIO_ROOT_PASSWORD")

In [2]:
spark = ( 
 SparkSession
 .builder
 .master("spark://spark-master:7077")
 .appName('MinIO - Teste')
 .config('spark.hadoop.fs.s3a.endpoint','http://minio:9000')
 .config('spark.hadoop.fs.s3a.access.key','minioadmin')
 .config('spark.hadoop.fs.s3a.secret.key','minioadmin')
 .config('spark.hadoop.fs.s3a.path.style.access','true')
 .config('spark.hadoop.fs.s3a.connection.ssl.enabled','false')
 #.config('spark.hadoop.fs.s3a.impl','org.apache.hadoop:hadoop-aws:3.3.4,io.delta:delta-spark_2.12:3.3.4')
 .config('spark.jars.packages','io.delta:delta-spark_2.12:3.3.1')
 .getOrCreate()
)

print('Processo finalizado!')

:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-acbf980e-fd8b-4e11-8083-c70ef936da91;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found io.delta#delta-spark_2.12;3.3.1 in central
	found io.delta#delta-storage;3.3.1 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
downloading https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar ...
	[SUCCESSFUL ] org.apache.hadoop#hadoop-aws;3.3.4!hadoop-aws.jar (848ms)
downloading https://repo1.maven.org/maven2/io/delta/delta-spark_2.12/3.3.1/delta-spark_2.12-3.3.1.jar ...
	[SUCCESSFUL ] io.delta#delta-spark_2.12;3.3.1!delta-spark_2.12.jar (7

Processo finalizado!


In [None]:
print('########### INICIANDO LEITURA DOS DADOS DA CAMADA BRONZE (INGESTÃO) ###########')

print('Configurando boto3 para acessar MinIO...')
#Configurando boto3 pra acessar o minio
s3 = boto3.client(
    's3',
    endpoint_url='http://minio:9000',
    aws_access_key_id=access_key,
    aws_secret_access_key=secret_key
)
print('✅')

#definindo bucket da primeira camada do medalhão, que será a de ingestão
bucket = 'bronze'

#lendo os arquivos csv dentro do bucket, para que sempre que chegue um novo arquivo, ele seja incluído no df_total
print('Lendo os arquivos CSV existentes na camada bronze...')
resposta = s3.list_objects_v2(Bucket=bucket)
arquivos = [
    obj['Key'] for obj in resposta.get('Contents', [])
    if obj['Key'].endswith('.csv')
]
print(f'Arquivos: {arquivos}')
print('✅')

#Criando schema para o dataframe
schema = StructType([
    StructField("date", StringType(), True),
    StructField("datetime", StringType(), True),
    StructField("cash_type", StringType(), True),
    StructField("card", StringType(), True),
    StructField("money", DoubleType(), True),
    StructField("coffee_name", StringType(), True)
])

#Criando um DataFrame vazio com esse schema
df_total = spark.createDataFrame([], schema)

#Efetuando iteração para leitura de cada arquivo no MinIO
print('Iniciando a leitura dos arquivos...')
for arquivo in arquivos:
    caminho = f"s3a://{bucket}/{arquivo}"
    print(f"Lendo: {caminho}")
    df = spark.read.csv(caminho, header=True, schema=schema)
    df_total = df_total.union(df)
    print(f"{arquivo} finalizado!")
    
print('✅')
#Mostra os dados combinados
print('Plotando os dados em uma tabela...')
df_total.show(2)
print('✅ \n \n')

print('########### INICIANDO INGESTÃO DOS DADOS NA CAMADA SILVER (DADOS AGRUPADOS) ########### \n')

path_silver = 's3a://silver'
name_parquet_silver = 'coffe_sales.parquet'

print(f'Salvando df_total em {path_silver} ...')
df_total.coalesce(1).write.mode('overwrite').parquet(f'{path_silver}/{name_parquet_silver}')
print('✅')

print('########### INICIANDO O CONSUMO DOS DADOS DA CAMADA SILVER (DADOS AGRUPADOS) ########### \n')

print(f'Iniciando leitura dos dados: {path_silver}/{name_parquet_silver} ...')

df_coffe = spark.read.parquet(f'{path_silver}/{name_parquet_silver}')

print('Plotando os dados em uma tabela...')
df_coffe.show(2)

########### INICIANDO LEITURA DOS DADOS DA CAMADA BRONZE (INGESTÃO) ###########
Configurando boto3 para acessar MinIO...
✅
Lendo os arquivos CSV existentes na camada bronze...
Arquivos: ['index_1.csv', 'index_2.csv']
✅
Iniciando a leitura dos arquivos...
Lendo: s3a://bronze/index_1.csv
index_1.csv finalizado!
Lendo: s3a://bronze/index_2.csv
index_2.csv finalizado!
✅
Plotando os dados em uma tabela...
+----------+--------------------+---------+-------------------+-----+-------------+
|      date|            datetime|cash_type|               card|money|  coffee_name|
+----------+--------------------+---------+-------------------+-----+-------------+
|2024-03-01|2024-03-01 10:15:...|     card|ANON-0000-0000-0001| 38.7|        Latte|
|2024-03-01|2024-03-01 12:19:...|     card|ANON-0000-0000-0002| 38.7|Hot Chocolate|
+----------+--------------------+---------+-------------------+-----+-------------+
only showing top 2 rows

✅ 
 

########### INICIANDO INGESTÃO DOS DADOS NA CAMADA SILVER (DA

                                                                                

✅
########### INICIANDO O CONSUMO DOS DADOS DA CAMADA SILVER (DADOS AGRUPADOS) ########### 

Iniciando leitura dos dados: s3a://silver/coffe_sales.parquet ...


In [8]:
df_teste = spark.read.parquet('s3a://land/data.parquet')

df_teste.show()

25/06/02 00:38:17 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|    James|          |   Smith|36636|     M|  3000|
|  Michael|      Rose|        |40288|     M|  4000|
|   Robert|          |Williams|42114|     M|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+



25/06/02 01:04:18 ERROR TaskSchedulerImpl: Lost executor 1 on 172.18.0.6: worker lost: Not receiving heartbeat for 60 seconds
25/06/02 01:04:18 ERROR TaskSchedulerImpl: Lost executor 0 on 172.18.0.5: worker lost: Not receiving heartbeat for 60 seconds
