In [116]:
from pyspark.sql import SparkSession

def create_spark_session() -> SparkSession:
    spark = (
        SparkSession.builder
        .appName("SilverLayer")
        .enableHiveSupport()
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .config("spark.hadoop.fs.s3minio.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")      
        .getOrCreate()
    )
    return spark

spark = create_spark_session()

 

In [117]:
minio_delta_options = {
    "fs.s3a.access.key": "admin",
    "fs.s3a.secret.key": "senhasegura",
    "fs.s3a.endpoint": "http://minio:9000",
    "fs.s3a.path.style.access": "true",
    "fs.s3a.connection.ssl.enabled": "false"
}

In [118]:
df_team = spark.read.parquet("s3a://bucket-bronze-zone/team/")
df_team_attributes = spark.read.parquet("s3a://bucket-bronze-zone/team_attributes/")
df_match = spark.read.parquet("s3a://bucket-bronze-zone/match/")
#df_team.show()
#df_team_attributes.show()
#df_match.show()

print(f'Team: {df_team.columns}\n')
print(f'Team_attributes: {df_team_attributes.columns}\n')
print(f'Match: {df_match.columns}\n')


Team: ['_airbyte_raw_id', '_airbyte_extracted_at', '_airbyte_meta', '_airbyte_generation_id', 'id', 'team_api_id', 'team_long_name', 'team_short_name', 'team_fifa_api_id']

Team_attributes: ['_airbyte_raw_id', '_airbyte_extracted_at', '_airbyte_meta', '_airbyte_generation_id', 'id', 'date', 'team_api_id', 'defencePressure', 'buildUpPlaySpeed', 'defenceTeamWidth', 'team_fifa_api_id', 'defenceAggression', 'buildUpPlayPassing', 'buildUpPlayDribbling', 'defencePressureClass', 'buildUpPlaySpeedClass', 'chanceCreationPassing', 'defenceTeamWidthClass', 'chanceCreationCrossing', 'chanceCreationShooting', 'defenceAggressionClass', 'buildUpPlayPassingClass', 'defenceDefenderLineClass', 'buildUpPlayDribblingClass', 'chanceCreationPassingClass', 'buildUpPlayPositioningClass', 'chanceCreationCrossingClass', 'chanceCreationShootingClass', 'chanceCreationPositioningClass']

Match: ['_airbyte_raw_id', '_airbyte_extracted_at', '_airbyte_meta', '_airbyte_generation_id', 'id', 'BSA', 'BSD', 'BSH', 'BWA',

In [119]:
#Renomeando as colunas = Apos utilizar as colunas nas querys fazer a mudança
df_team = df_team.withColumnRenamed("team_short_name", "time")
df_team_attributes = df_team_attributes.withColumnRenamed("date", "data")
#df_match = df_match.withColumnRenamed("date", "data").withColumnRenamed("XXX", "xxxx")
df_team.columns

['_airbyte_raw_id',
 '_airbyte_extracted_at',
 '_airbyte_meta',
 '_airbyte_generation_id',
 'id',
 'team_api_id',
 'team_long_name',
 'time',
 'team_fifa_api_id']

In [120]:
#Verificando dados ausentes
from pyspark.sql.functions import col, sum, mean


df_team.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_team.columns]).show()
df_team.count()

+---------------+---------------------+-------------+----------------------+---+-----------+--------------+----+----------------+
|_airbyte_raw_id|_airbyte_extracted_at|_airbyte_meta|_airbyte_generation_id| id|team_api_id|team_long_name|time|team_fifa_api_id|
+---------------+---------------------+-------------+----------------------+---+-----------+--------------+----+----------------+
|              0|                    0|            0|                     0|  0|          0|             0|   0|              11|
+---------------+---------------------+-------------+----------------------+---+-----------+--------------+----+----------------+



299

In [121]:
from pyspark.sql.types import NumericType

# Identificar colunas numéricas
numeric_cols_team = [c for c, t in df_team.dtypes if isinstance(df_team.schema[c].dataType, NumericType)]

# Calcular a média apenas para colunas numéricas
mean_values_team = df_team.select([mean(col(c)).alias(c) for c in numeric_cols_team]).collect()[0].asDict()

# Substituir valores nulos nas colunas numéricas pela média correspondente
df_team = df_team.fillna(mean_values_team)

df_team.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_team.columns]).show()
#df_team.show(5)


+---------------+---------------------+-------------+----------------------+---+-----------+--------------+----+----------------+
|_airbyte_raw_id|_airbyte_extracted_at|_airbyte_meta|_airbyte_generation_id| id|team_api_id|team_long_name|time|team_fifa_api_id|
+---------------+---------------------+-------------+----------------------+---+-----------+--------------+----+----------------+
|              0|                    0|            0|                     0|  0|          0|             0|   0|               0|
+---------------+---------------------+-------------+----------------------+---+-----------+--------------+----+----------------+



In [122]:
#Verificando dados ausentes
df_team_attributes.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_team_attributes.columns]).show()
#df_team_attributes.count()

+---------------+---------------------+-------------+----------------------+---+----+-----------+---------------+----------------+----------------+----------------+-----------------+------------------+--------------------+--------------------+---------------------+---------------------+---------------------+----------------------+----------------------+----------------------+-----------------------+------------------------+-------------------------+--------------------------+---------------------------+---------------------------+---------------------------+------------------------------+
|_airbyte_raw_id|_airbyte_extracted_at|_airbyte_meta|_airbyte_generation_id| id|data|team_api_id|defencePressure|buildUpPlaySpeed|defenceTeamWidth|team_fifa_api_id|defenceAggression|buildUpPlayPassing|buildUpPlayDribbling|defencePressureClass|buildUpPlaySpeedClass|chanceCreationPassing|defenceTeamWidthClass|chanceCreationCrossing|chanceCreationShooting|defenceAggressionClass|buildUpPlayPassingClass|de

In [123]:
# Identificar colunas numéricas
numeric_cols_team_attributes = [c for c, t in df_team_attributes.dtypes if isinstance(df_team_attributes.schema[c].dataType, NumericType)]
   
# Calcular a média apenas para colunas numéricas
mean_values_team_attributes = df_team_attributes.select([mean(col(c)).alias(c) for c in numeric_cols_team_attributes]).collect()[0].asDict()

# Substituir valores nulos nas colunas numéricas pela média correspondente
df_team_attributes = df_team_attributes.fillna(mean_values_team_attributes)

df_team_attributes.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_team_attributes.columns]).show()
#df_team_attributes.show(5)

+---------------+---------------------+-------------+----------------------+---+----+-----------+---------------+----------------+----------------+----------------+-----------------+------------------+--------------------+--------------------+---------------------+---------------------+---------------------+----------------------+----------------------+----------------------+-----------------------+------------------------+-------------------------+--------------------------+---------------------------+---------------------------+---------------------------+------------------------------+
|_airbyte_raw_id|_airbyte_extracted_at|_airbyte_meta|_airbyte_generation_id| id|data|team_api_id|defencePressure|buildUpPlaySpeed|defenceTeamWidth|team_fifa_api_id|defenceAggression|buildUpPlayPassing|buildUpPlayDribbling|defencePressureClass|buildUpPlaySpeedClass|chanceCreationPassing|defenceTeamWidthClass|chanceCreationCrossing|chanceCreationShooting|defenceAggressionClass|buildUpPlayPassingClass|de

In [124]:
#Verificando dados ausentes
df_match.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_match.columns]).show()
#df_match.count()

+---------------+---------------------+-------------+----------------------+---+-----+-----+-----+----+----+----+-----+-----+-----+----+----+----+----+----+----+-----+-----+-----+----+----+----+----+----+----+----+----+----+-----+----+-----+-----+-----+-----+-----+-----+------+------+------+-------+---------+----------+----------+----------+------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--

In [125]:
# Identificar colunas numéricas
numeric_cols_match = [c for c, t in df_match.dtypes if isinstance(df_match.schema[c].dataType, NumericType)]
   
# Calcular a média apenas para colunas numéricas
mean_values_match = df_match.select([mean(col(c)).alias(c) for c in numeric_cols_match]).collect()[0].asDict()

# Substituir valores nulos nas colunas numéricas pela média correspondente
df_match = df_match.fillna(mean_values_match)

df_match.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_match.columns]).show()
#df_match.show(5)

+---------------+---------------------+-------------+----------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+-----+----+-----+-----+-----+-----+-----+-----+------+------+------+-------+---------+----------+----------+----------+------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------

In [126]:
#Renomeando colunas
df_match = df_match.withColumnRenamed("season", "ano") \
       .withColumnRenamed("home_team_api_id", "time_casa_api_id") \
       .withColumnRenamed("home_team_goal", "gols_time_casa") \
       .withColumnRenamed("avg_home_goals", "media_gols") \
       .withColumnRenamed("away_team_goal", "gols_time_visitante") \
       .withColumnRenamed("avg_away_goals", "gols_fora_de_casa")

#df_match.show()


In [127]:
#Gravação no minio
(
    df_match.write
    .format("delta")
    .options(**minio_delta_options)
    .mode("overwrite")
    .save("s3a://bucket-silver-zone/match/")
)

(
    df_team.write
    .format("delta")
    .options(**minio_delta_options)
    .mode("overwrite")
    .save("s3a://bucket-silver-zone/team/")
)

(
    df_team_attributes.write
    .format("delta")
    .options(**minio_delta_options)
    .mode("overwrite")
    .save("s3a://bucket-silver-zone/team_attributes/")
)
print("Tabelas Delta gravadas na zona silver com sucesso no MinIO!")

spark.stop()

Tabelas Delta gravadas na zona silver com sucesso no MinIO!
