In [None]:
from minio import Minio
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.csv as pv
from io import BytesIO
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, upper, avg

In [None]:
# define os nomes do bucket e do objeto a ser criado
bucket_name = "lakehouse"
object_name = "teste.parquet"

In [None]:
def get_minio_client() -> Minio:
    client = Minio(
        "minio:9000",
        access_key="minioadmin",
        secret_key="minioadmin",
        secure=False,
    )
    return client

In [None]:
def get_spark_session(app_name: str) -> SparkSession:
    spark = (
        SparkSession.builder.appName(app_name)
        .master("spark://spark-master:7077")
        .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
        .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
        .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
        .config("spark.hadoop.fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
        .config("spark.hadoop.fs.s3a.path.style.access", "true")
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
        #.config("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.788")
        .config("spark.jars","/opt/spark/jars/aws-java-sdk-bundle-1.12.788.jar,/opt/spark/jars/hadoop-aws-3.3.4.jar")
        .getOrCreate()
    )
    return spark


In [None]:
# instancia um cliente e cria um bucket caso ele não exista
client = get_minio_client()
if bucker_name not in client.list_buckets():
    client.make_bucket(bucket_name)

In [None]:
# cria o dataframe e converte em BytesIO para armazenar no bucket via pyarrow
df = pd.DataFrame({
    'id':[1, 2, 3, 4],
    'value' :[10, 20, 30, 40],
    'category': ['blue', 'blue', 'red', 'red']
})

table = pa.Table.from_pandas(df)
parquet_buffer = BytesIO()
pq.write_table(table, parquet_buffer)
parquet_buffer.seek(0)

In [None]:
# armazena o objeto no bucket no formato parquet
client.put_object(
    bucket_name=bucket_name,
    object_name=object_name,
    data=parquet_buffer,
    length=parquet_buffer.getbuffer().nbytes,
)

In [None]:
# instancia uma sessão spark
spark = get_spark_session("teste")

In [None]:
# lê o arquivo do bucket
spark_df = spark.read.parquet(f"s3a://{bucket_name}/{object_name}")

In [None]:
spark_df.show()

In [None]:
# calcula a média por categoria
spark_df_avg = spark_df.groupBy("category").agg(avg("value"))

In [None]:
# armazena no bucket
spark_df_avg.write.parquet(f"s3a://{bucket_name}/teste_avg.parquet", mode="overwrite")