# 📌 02D-Correcao-Datas-e-Partidas-Ausentes
📍 Objetivo: corrigir as Datas e Partidas ausentes no banco de dados.

Durante uma análise exploratória dos dados, foi notado que algumas partidas estavam ausentes da base, fezendo-se necessário o input manual.

## 🔶 Criação dos Dataframes

#### 🔹 Leitura do S3 e criação do Dataframe

In [0]:
# Caminho correto no S3 para armazenar os arquivos
#silver_parquet_path = "s3://mvp-brasileirao-2024/silver/parquet/brasil_seriea_2024_todas_partidas"
silver_delta_path_partidas = "s3://mvp-brasileirao-2024/silver/delta/brasil_seriea_2024_todas_partidas"

# Caminho correto no S3 para armazenar os arquivos
#silver_parquet_path = f"s3a://mvp-brasileirao-2024/silver/parquet/brasil_seriea_2024_estatistica_jogador"
silver_delta_path_estatisticas = f"s3a://mvp-brasileirao-2024/silver/delta/brasil_seriea_2024_estatistica_jogador"

df_estatisticas = spark.read.format("delta").load(silver_delta_path_estatisticas)
df_partidas = spark.read.format("delta").load(silver_delta_path_partidas)


## 🔶 Aplicar transformações (limpeza e padronização) 


#### 🔹 Converter a coluna "Data" para formato Date

In [0]:
from pyspark.sql.functions import to_date, col

# Converter a coluna "Data" para DateType no Spark
df_partidas = df_partidas.withColumn("Data", to_date(col("Data")))
df_estatisticas = df_estatisticas.withColumn("Data", to_date(col("Data")))

#### 🔹 Criar a Chave Única (Clube + Data)

In [0]:
from pyspark.sql.functions import concat_ws

# Criar a chave única nos dois DataFrames
df_estatisticas = df_estatisticas.withColumn("Chave", concat_ws("_", col("Clube"), col("Data")))
df_partidas = df_partidas.withColumn("Chave", concat_ws("_", col("Clube"), col("Data")))


#### 🔹 Encontrar registros exclusivos de cada DataFrame

Identificar registros que estão em df_estatisticas, mas não em df_partidas

In [0]:
df_estatisticas_unicos = df_estatisticas.join(df_partidas, "Chave", "left_anti")

# Exibir resultados
df_estatisticas_unicos.select("Clube", "Data", "Chave").show()


+----------+----------+--------------------+
|     Clube|      Data|               Chave|
+----------+----------+--------------------+
|Fluminense|2024-04-13|Fluminense_2024-0...|
|Fluminense|2024-04-13|Fluminense_2024-0...|
|Fluminense|2024-04-13|Fluminense_2024-0...|
|Fluminense|2024-04-13|Fluminense_2024-0...|
|Fluminense|2024-04-13|Fluminense_2024-0...|
|Fluminense|2024-04-13|Fluminense_2024-0...|
|Fluminense|2024-04-13|Fluminense_2024-0...|
|Fluminense|2024-04-13|Fluminense_2024-0...|
|Fluminense|2024-04-13|Fluminense_2024-0...|
|Fluminense|2024-04-13|Fluminense_2024-0...|
|Fluminense|2024-04-13|Fluminense_2024-0...|
|Fluminense|2024-04-13|Fluminense_2024-0...|
|Fluminense|2024-04-13|Fluminense_2024-0...|
|Fluminense|2024-04-13|Fluminense_2024-0...|
|Fluminense|2024-04-13|Fluminense_2024-0...|
|Bragantino|2024-04-13|Bragantino_2024-0...|
|Bragantino|2024-04-13|Bragantino_2024-0...|
|Bragantino|2024-04-13|Bragantino_2024-0...|
|Bragantino|2024-04-13|Bragantino_2024-0...|
|Bragantin

Identificar registros que estão em df_partidas, mas não em df_estatisticas

In [0]:
df_partidas_unicos = df_partidas.join(df_estatisticas, "Chave", "left_anti")

# Exibir resultados
df_partidas_unicos.select("Clube", "Data", "Chave").show(10)


+------------+----------+--------------------+
|       Clube|      Data|               Chave|
+------------+----------+--------------------+
| Corinthians|2024-07-02|Corinthians_2024-...|
|   Fortaleza|2024-07-04|Fortaleza_2024-07-04|
|   São Paulo|2024-07-12|São Paulo_2024-07-12|
|   São Paulo|2024-11-06|São Paulo_2024-11-06|
|      Cuiabá|2024-09-17|   Cuiabá_2024-09-17|
|Athletico-PR|2024-10-18|Athletico-PR_2024...|
|    Flamengo|2024-06-27| Flamengo_2024-06-27|
|      Grêmio|2024-04-28|   Grêmio_2024-04-28|
|       Bahia|2024-09-22|    Bahia_2024-09-22|
|    Cruzeiro|2024-07-04| Cruzeiro_2024-07-04|
+------------+----------+--------------------+
only showing top 10 rows




## 🔶 Aplicar ajuste nas datas 


#### 🔹 Cria coluna `Data_Original` a partir da coluna de Data

In [0]:
from pyspark.sql.functions import date_sub

# Criar a coluna "Data Original"
df_partidas_unicos = df_partidas_unicos.withColumn("Data Original", col("Data"))

#### 🔹 Subtrai 1 dia dessa `Data_Original` para ajuste

In [0]:
# Ajustar a data subtraindo 1 dia
df_partidas_unicos = df_partidas_unicos.withColumn("Data Ajustada", date_sub(col("Data"), 1))

#### 🔹 Criando a nova coluna agora ajustada

In [0]:
# Criar novamente a chave ajustada
df_partidas_unicos = df_partidas_unicos.withColumn("Chave Ajustada", 
    concat_ws("_", col("Clube"), col("Data Ajustada")))

#### 🔹 Criando os Dicionários com as Datas compativeis dos dois Dataframes

In [0]:
# Criar dicionários para mapeamento de correções
correcao_datas = dict(df_partidas_unicos.select("Chave", "Data Ajustada").rdd.collect())
correcao_chaves = dict(df_partidas_unicos.select("Chave", "Chave Ajustada").rdd.collect())

#### 🔹 Converter os dicionários em listas de tuplas para criar DataFrames

In [0]:
from pyspark.sql import Row

# Converter os dicionários em listas de tuplas para criar DataFrames
correcao_datas_df = spark.createDataFrame([Row(Chave=k, Data_Ajustada=v) for k, v in correcao_datas.items()])
correcao_chaves_df = spark.createDataFrame([Row(Chave=k, Chave_Ajustada=v) for k, v in correcao_chaves.items()])


#### 🔹 JOIN para substituir a Data e a Chave ajustada

In [0]:
from pyspark.sql.functions import coalesce, when

# Fazer o join para substituir a Data e a Chave ajustada
df_partidas_corrigido = df_partidas.alias("p") \
    .join(correcao_datas_df.alias("c"), col("p.Chave") == col("c.Chave"), "left") \
    .join(correcao_chaves_df.alias("k"), col("p.Chave") == col("k.Chave"), "left") \
    .withColumn("Data Ajustada", coalesce(col("c.Data_Ajustada"), col("p.Data"))) \
    .withColumn("Chave Ajustada", coalesce(col("k.Chave_Ajustada"), col("p.Chave"))) \
    .select("p.*", "Data Ajustada", "Chave Ajustada")  # Manter todas as colunas originais + ajustadas

# Criar a coluna "Status_Ajustes" com referência correta às colunas
df_partidas_corrigido = df_partidas_corrigido.withColumn(
    "Status_Ajustes",
    when(col("p.Chave") == col("Chave Ajustada"), "Sem modificação").otherwise("Ajustado -1 dia")
)

# Exibir os ajustes aplicados
df_partidas_corrigido.select("Clube", "Data", "Data Ajustada", "Chave", "Chave Ajustada", "Status_Ajustes").show(10)


+-----------+----------+-------------+--------------------+--------------------+---------------+
|      Clube|      Data|Data Ajustada|               Chave|      Chave Ajustada| Status_Ajustes|
+-----------+----------+-------------+--------------------+--------------------+---------------+
|  Juventude|2024-10-26|   2024-10-26|Juventude_2024-10-26|Juventude_2024-10-26|Sem modificação|
|     Cuiabá|2024-12-05|   2024-12-05|   Cuiabá_2024-12-05|   Cuiabá_2024-12-05|Sem modificação|
|     Cuiabá|2024-11-02|   2024-11-02|   Cuiabá_2024-11-02|   Cuiabá_2024-11-02|Sem modificação|
| Fluminense|2024-06-30|   2024-06-30|Fluminense_2024-0...|Fluminense_2024-0...|Sem modificação|
|  Palmeiras|2024-11-04|   2024-11-04|Palmeiras_2024-11-04|Palmeiras_2024-11-04|Sem modificação|
|Corinthians|2024-07-02|   2024-07-01|Corinthians_2024-...|Corinthians_2024-...|Ajustado -1 dia|
|  São Paulo|2024-10-05|   2024-10-05|São Paulo_2024-10-05|São Paulo_2024-10-05|Sem modificação|
|  Fortaleza|2024-07-04|   202

#### 🔹 Renomeia as colunas de Data

In [0]:
df_partidas_corrigido = df_partidas_corrigido.drop("Chave", "Chave Ajustada", "Data") \
    .withColumnRenamed("Data Ajustada", "Data")


## 💾 Salvar os dados na camada Silver (Parquet no S3)

#### 🔹 Salvando o arquivo no S3 camada Silver

In [0]:
# Caminho correto no S3 para armazenar os arquivos
silver_parquet_path = "s3a://mvp-brasileirao-2024/silver/parquet/brasil_seriea_2024_todas_partidas"
silver_delta_path = "s3a://mvp-brasileirao-2024/silver/delta/brasil_seriea_2024_todas_partidas"

# Remover completamente o Delta anterior para evitar conflito de esquema
#dbutils.fs.rm(silver_delta_path, recurse=True)

# Salvar em Parquet (para acessibilidade geral)
df_partidas_corrigido.write.mode("overwrite").parquet(silver_parquet_path)

# Salvar em Delta (agora com mergeSchema ativado)
df_partidas_corrigido.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save(silver_delta_path)

print("✅ Dados corrigidos e salvos na Silver sem conflitos!")

print("✅ Dados salvos corretamente em Parquet e Delta, em pastas separadas!")


✅ Dados corrigidos e salvos na Silver sem conflitos!
✅ Dados salvos corretamente em Parquet e Delta, em pastas separadas!


#### 🔹 Salvando o arquivo no DBFS

In [0]:
# Caminho no DBFS para armazenar os dados Silver temporariamente
dbfs_path_silver = "dbfs:/mnt/silver_temp/todas_partidas"

# Salvar no DBFS em formato Parquet
df_partidas_corrigido.write.mode("overwrite").parquet(dbfs_path_silver)

print("✅ Dados transformados e salvos na camada Silver no DBFS com sucesso!")

✅ Dados transformados e salvos na camada Silver no DBFS com sucesso!
