In [1]:
#Delta Bronze Layer para Silver Bronze Layer
#Agrupar todas subdivisões em uma tabela unica, exemplo, tabela players_bronze conterá todas as subdivisões

In [2]:
from pyspark.sql.types import StructType
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import DataFrame
from pyspark.sql.streaming import DataStreamWriter
from minio import Minio
from delta.tables import *
import os

def minio_session_spark():
    spark = (
        SparkSession.builder
            .master("local[*]")
            .appName("appMinIO")
            ### Config Fields
            .config('spark.sql.debug.maxToStringFields', 5000)
            .config('spark.debug.maxToStringFields', 5000)
            ### Optimize
            .config("delta.autoOptimize.optimizeWrite", "true")
            .config("delta.autoOptimize.autoCompact", "true")
            ### Delta Table
            .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0")
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
            ## MinIO
            #.config("spark.hadoop.fs.s3a.endpoint", "http://172.20.0.2:9000")
             .config("spark.hadoop.fs.s3a.endpoint", "minio:9000")

            .config("spark.hadoop.fs.s3a.access.key", "tcc_user")
            .config("spark.hadoop.fs.s3a.secret.key", "Acnmne@a9h!")
            .config("spark.hadoop.fs.s3a.path.style.access", "true")
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
            .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
            ## Jars
            .config("spark.jars", "/home/jovyan/work/jars/hadoop-common-3.3.2.jar,\
                                    /home/jovyan/work/jars/hadoop-aws-3.3.2.jar, \
                                    /home/jovyan/work/jars/aws-java-sdk-bundle-1.11.874.jar")
            .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
            .getOrCreate()
    )
    return spark

In [3]:
spark = minio_session_spark()

# spark
print(f"Spark version = {spark.version}")

# hadoop
print(f"Hadoop version = {spark._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}")

Spark version = 3.3.2
Hadoop version = 3.3.2


In [5]:
# Configure Minio connection
minio_endpoint = 'minio:9000'
access_key = 'tcc_fia'
secret_key = 'tcc_fia_2024'
secure = False  # Set to True for HTTPS
minio_client = Minio(endpoint=minio_endpoint, access_key=access_key, secret_key=secret_key, secure=secure)

# Specify the Minio bucket and path
minio_bucket = 'bronze'
minio_path_bronze_players = ['bronze_I/', 'bronze_II/','bronze_III/','bronze_IV/']
# minio_path_silver_players = ['silver_I/', 'silver_II/','silver_III/','silver_IV/']
# minio_path_ouro_players = ['gold_I/', 'gold_II/','gold_III/','gold_IV/']
# minio_path_platina_players = ['platinum_I/', 'platinum_II/','platinum_III/','platinum_IV/']
# minio_path_esmeralda_players = ['emerald_I/', 'emerald_II/','emerald_III/','emerald_IV/']
# minio_path_diamante_players = ['diamond_I/', 'diamond_II/','diamond_III/','diamond_IV/']

In [23]:
df = (
        spark
        .read
        .format('delta')
        .load(f"s3a://{minio_bucket}/{minio_path_bronze_players[0]}")
)
# print(len(df.columns))
# df.columns
df.show()

+----------+---------+--------+--------------------+------------+------+---------------+----+--------------------+------+-------+----+
|freshBlood|hotStreak|inactive|            leagueId|leaguePoints|losses|      queueType|rank|          summonerId|  tier|veteran|wins|
+----------+---------+--------+--------------------+------------+------+---------------+----+--------------------+------+-------+----+
|     false|     true|   false|f5526c78-08f1-438...|          67|     7|RANKED_SOLO_5x5|   I|3XiEGneD0XOL_w4Zv...|BRONZE|  false|  11|
|     false|    false|   false|f5108b66-535b-4ee...|          46|     9|RANKED_SOLO_5x5|   I|lNVCHVKkFMAsJeqA1...|BRONZE|  false|   9|
|     false|    false|   false|0d42becf-d119-4a1...|          64|     9|RANKED_SOLO_5x5|   I|-tGpNqAEWg87IsP27...|BRONZE|  false|  11|
|     false|    false|   false|f2dc4e41-d375-4d9...|          30|     2|RANKED_SOLO_5x5|   I|hhp0oCwIfVEcUAP6G...|BRONZE|  false|   4|
|      true|    false|   false|eb6870ad-26c9-451...|   

In [19]:
df = (
        spark
        .read
        .format('delta')
        .load(f"s3a://{minio_bucket}/{minio_path_bronze_players[1]}")
)
print(len(df.columns))
df.columns

13


['freshBlood',
 'hotStreak',
 'inactive',
 'leagueId',
 'leaguePoints',
 'losses',
 'queueType',
 'rank',
 'summonerId',
 'summonerName',
 'tier',
 'veteran',
 'wins']

In [20]:
df = (
        spark
        .read
        .format('delta')
        .load(f"s3a://{minio_bucket}/{minio_path_bronze_players[2]}")
)
print(len(df.columns))
df.columns

13


['freshBlood',
 'hotStreak',
 'inactive',
 'leagueId',
 'leaguePoints',
 'losses',
 'queueType',
 'rank',
 'summonerId',
 'summonerName',
 'tier',
 'veteran',
 'wins']

In [21]:
df = (
        spark
        .read
        .format('delta')
        .load(f"s3a://{minio_bucket}/{minio_path_bronze_players[3]}")
)
print(len(df.columns))
df.columns

13


['freshBlood',
 'hotStreak',
 'inactive',
 'leagueId',
 'leaguePoints',
 'losses',
 'queueType',
 'rank',
 'summonerId',
 'summonerName',
 'tier',
 'veteran',
 'wins']

In [21]:
#BRONZE PLAYERS
final_df = None
for i in range(len(minio_path_bronze_players)):
    df = (
    spark
    .read
    .format('delta')
    .load(f"s3a://{minio_bucket}/{minio_path_silver_players[i]}")
    )

    # Union the dataframes
    if final_df is None:
        final_df = df
    else:
        final_df = final_df.union(df)
        
#Salvando delta table
(
    final_df
    .write
    .format("delta")
    .mode("overwrite") 
    .option("overwriteSchema", "True")
    .save(f"s3a://silver/" + 'bronze_players')
)

+----+
|rank|
+----+
|   I|
|  II|
| III|
|  IV|
+----+

