In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, year, month, when, lit, mean
from pyspark.sql.functions import coalesce

In [2]:
# Configuraçção da conexão
minio_config = {
    "fs.s3a.access.key": "admin",
    "fs.s3a.secret.key": "senhasegura",
    "fs.s3a.endpoint": "http://minio:9000",
    "fs.s3a.path.style.access": "true",
    "fs.s3a.connection.ssl.enabled": "false",
    "fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
    "fs.s3a.connection.ssl.enabled": "false",
    "spark.hadoop.fs.s3a.impl.disable.cache": "true",
    "fs.spark.hadoop.fs.s3a.attempts.maximum":"5"
}

def create_spark_session() -> SparkSession:
    spark = (
            SparkSession.builder 
            .appName("SilverZone") 
            .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0,org.apache.hadoop:hadoop-aws:3.3.1") 
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") 
            .config("spark.hadoop.fs.s3minio.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
            .getOrCreate()
    )
    spark.sparkContext.setLogLevel("WARN")
    return spark

spark = create_spark_session()

for key, value in minio_config.items():
    spark._jsc.hadoopConfiguration().set(key, value)

player_df = spark.read.format("delta").options(**minio_config).load("s3a://bronze/soccer/player")
player_attr_df = spark.read.format("delta").options(**minio_config).load("s3a://bronze/soccer/player_attributes")

In [3]:
# Removendo colunas duplicadas
columns_to_drop = [
    "_airbyte_extracted_at", "_airbyte_raw_id", "_airbyte_meta",
    "_airbyte_generation_id", "player_fifa_api_id", "id",
    "_airbyte_meta.changes.element.reason", "_airbyte_meta.changes",
    "_airbyte_meta.changes.element.field", "_airbyte_meta.changes.element.change",
    "_airbyte_meta.sync_id"
]

player_attr_df = player_attr_df.drop(*columns_to_drop)

In [4]:
# Join nas tabelas
players_enhanced = player_df.join(
    player_attr_df,
    player_df.player_api_id == player_attr_df.player_api_id,
    "left"
).drop(player_attr_df.player_api_id)

players_enhanced = players_enhanced.withColumn(
    "birth_date",
    to_date("birthday", "yyyy-MM-dd HH:mm:ss")
).withColumn(
    "attributes_date",
    to_date("date", "yyyy-MM-dd HH:mm:ss")
).drop("date", "birthday")

players_enhanced = players_enhanced.withColumn(
    "birth_year",
    year("birth_date")
)

# Convertendo a altura (cm para m) e peso (lbs para kg)
players_enhanced = players_enhanced.withColumn(
    "height_m",
    col("height") / 100
).withColumn(
    "weight_kg",
    col("weight") * 0.453592
)

# Calculaando o IMC
players_enhanced = players_enhanced.withColumn(
    "bmi",
    col("weight_kg") / (col("height_m") ** 2)
)

# Tratando nulos em atributos numéricos
numeric_cols = [col_name for col_name, dtype in players_enhanced.dtypes if dtype in ['int', 'double']]
for col_name in numeric_cols:
    avg_value = players_enhanced.select(mean(col(col_name))).collect()[0][0]
    players_enhanced = players_enhanced.withColumn(
        col_name,
        coalesce(col(col_name), lit(avg_value))
    )

# Classificação de jogador
players_enhanced = players_enhanced.withColumn(
    "position_category",
    when(col("gk_diving") > 50, "Goalkeeper")
    .when(col("defensive_work_rate") == "High", "Defender")
    .when(col("attacking_work_rate") == "High", "Forward")
    .otherwise("Midfielder")
)

# Criando tabela Silver
players_enhanced.write.format("delta") \
    .options(**minio_config) \
    .partitionBy("birth_year", "position_category") \
    .mode("overwrite") \
    .save("s3a://silver/soccer/players_enhanced")

print("Tabela players_enhanced criada na camada Silver!")

Tabela players_enhanced criada na camada Silver!


In [5]:
match_df = spark.read.format("delta").options(**minio_config).load("s3a://bronze/soccer/match")

from pyspark.sql.functions import concat_ws

matches_cleaned = match_df.withColumn(
    "match_date",
    to_date("date", "yyyy-MM-dd HH:mm:ss")
).drop("date")

# Temporada e ano
matches_cleaned = matches_cleaned.withColumn(
    "season_start_year",
    year("match_date")
).withColumn(
    "season_end_year",
    col("season_start_year") + 1
).withColumn(
    "season_formatted",
    concat_ws("/", col("season_start_year"), col("season_end_year"))
)

# Resultado das partidas
matches_cleaned = matches_cleaned.withColumn(
    "match_result",
    when(col("home_team_goal") > col("away_team_goal"), "Home Win")
    .when(col("home_team_goal") < col("away_team_goal"), "Away Win")
    .otherwise("Draw")
)

# Total gols
matches_cleaned = matches_cleaned.withColumn(
    "total_goals",
    col("home_team_goal") + col("away_team_goal")
)

# Odds pré-jogo Bet365 (média das casas de aposta)
odds_columns_home = ["B365H", "BWH", "IWH"]
odds_columns_draw = ["B365D", "BWD", "IWD"]
odds_columns_away = ["B365A", "BWA", "IWA"]

matches_cleaned = matches_cleaned.withColumn(
    "avg_home_odds",
    (col("B365H") + col("BWH") + col("IWH")) / len(odds_columns_home)
).withColumn(
    "avg_draw_odds",
    (col("B365D") + col("BWD") + col("IWD")) / len(odds_columns_draw)
).withColumn(
    "avg_away_odds",
    (col("B365A") + col("BWA") + col("IWA")) / len(odds_columns_away)
)

matches_cleaned.write.format("delta") \
    .options(**minio_config) \
    .partitionBy("season_start_year", "league_id") \
    .mode("overwrite") \
    .save("s3a://silver/soccer/matches_cleaned")

print("Tabela matches_cleaned criada na camada Silver!")

Tabela matches_cleaned criada na camada Silver!


In [6]:
# Mostrando schemas e estatísticas
silver_tables = {
    "players_enhanced": ["birth_year", "position_category"],
    "matches_cleaned": ["season_start_year", "league_id"]
}

for table, partitions in silver_tables.items():
    df = spark.read.format("delta").load(f"s3a://silver/soccer/{table}")
    print(f"\nSchema da tabela {table}:")
    df.printSchema()
    print(f"\nContagem de registros: {df.count()}")
    print(f"\nPartições: {partitions}")


Schema da tabela players_enhanced:
root
 |-- _airbyte_raw_id: string (nullable = true)
 |-- _airbyte_extracted_at: timestamp (nullable = true)
 |-- _airbyte_meta: struct (nullable = true)
 |    |-- sync_id: long (nullable = true)
 |    |-- changes: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- field: string (nullable = true)
 |    |    |    |-- change: string (nullable = true)
 |    |    |    |-- reason: string (nullable = true)
 |-- _airbyte_generation_id: long (nullable = true)
 |-- id: long (nullable = true)
 |-- height: long (nullable = true)
 |-- weight: long (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_api_id: long (nullable = true)
 |-- player_fifa_api_id: long (nullable = true)
 |-- curve: long (nullable = true)
 |-- vision: long (nullable = true)
 |-- agility: long (nullable = true)
 |-- balance: long (nullable = true)
 |-- jumping: long (nullable = true)
 |-- marking: long (nullable = true)
 |-- s