In [None]:
# ============================================================
# Notebook: etl_bronze_to_silver.ipynb
# Ng∆∞·ªùi th·ª±c hi·ªán: SV1 - Data Engineer
# M·ª•c ti√™u: L√†m s·∫°ch d·ªØ li·ªáu Bronze ‚Üí Silver (Transfermarkt)
# ============================================================

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, lower, year, current_date, concat_ws, when

# -------------------------------
# 1. Kh·ªüi t·∫°o SparkSession
# -------------------------------
spark = (
    SparkSession.builder
    .appName("Transfermarkt ETL - Bronze to Silver")
    .config("spark.jars.packages", 
            "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262")
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .config("spark.hadoop.fs.s3a.access.key", "admin")
    .config("spark.hadoop.fs.s3a.secret.key", "admin12345")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .getOrCreate()
)

# -------------------------------
# 2. ƒê∆∞·ªùng d·∫´n
# -------------------------------
BRONZE = "s3a://transfermarkt-bronze"
SILVER = "s3a://transfermarkt-silver"

# -------------------------------
# 3. Players
# -------------------------------
print("‚ñ∂ ƒêang x·ª≠ l√Ω: players.csv")
df_players = spark.read.csv(f"{BRONZE}/players.csv", header=True, inferSchema=True)

df_players_clean = (
    df_players
    .withColumn("name", when(col("name").isNotNull(), col("name"))
                .otherwise(concat_ws(" ", trim(col("first_name")), trim(col("last_name")))))
    .dropna(subset=["name"])
    .withColumn("name", trim(lower(col("name"))))
)

if "date_of_birth" in df_players_clean.columns:
    df_players_clean = df_players_clean.withColumn(
        "age", year(current_date()) - year(col("date_of_birth"))
    )

df_players_clean.write.mode("overwrite").parquet(f"{SILVER}/players")
print("‚úÖ Ghi Silver: players")

# -------------------------------
# 4. Clubs
# -------------------------------
print("‚ñ∂ ƒêang x·ª≠ l√Ω: clubs.csv")
df_clubs = spark.read.csv(f"{BRONZE}/clubs.csv", header=True, inferSchema=True)

df_clubs_clean = (
    df_clubs
    .withColumnRenamed("name", "club_name")
    .withColumn("club_name", trim(lower(col("club_name"))))
    .dropna(subset=["club_name"])
    .dropDuplicates(["club_name"])
)

df_clubs_clean.write.mode("overwrite").parquet(f"{SILVER}/clubs")
print("‚úÖ Ghi Silver: clubs")

# -------------------------------
# 5. Transfers
# -------------------------------
print("‚ñ∂ ƒêang x·ª≠ l√Ω: transfers.csv")
df_transfers = spark.read.csv(f"{BRONZE}/transfers.csv", header=True, inferSchema=True)

df_transfers_clean = (
    df_transfers
    .withColumn("club_from", trim(lower(col("club_from"))))
    .withColumn("club_to", trim(lower(col("club_to"))))
    .withColumn("fee", when(col("fee").isNotNull(), col("fee")).otherwise(0))
    .withColumn("season", col("season").cast("string"))
    .dropna(subset=["player_id"])
)

df_transfers_clean.write.mode("overwrite").parquet(f"{SILVER}/transfers")
print("‚úÖ Ghi Silver: transfers")

# -------------------------------
# 6. Performance (t·ª´ appearances.csv ho·∫∑c player_valuations.csv)
# -------------------------------
print("‚ñ∂ ƒêang x·ª≠ l√Ω: appearances.csv")
df_perf = spark.read.csv(f"{BRONZE}/appearances.csv", header=True, inferSchema=True)

# Gi·ªØ c·ªôt quan tr·ªçng (tu·ª≥ theo dataset c·ªßa b·∫°n)
cols_perf = [c for c in [
    "player_id", "game_id", "goals", "assists", "minutes_played", "yellow_cards", "red_cards"
] if c in df_perf.columns]

df_perf_clean = (
    df_perf.select(cols_perf)
    .dropna(subset=["player_id"])
    .withColumn("goals", col("goals").cast("int"))
    .withColumn("assists", col("assists").cast("int"))
    .withColumn("minutes_played", col("minutes_played").cast("int"))
)

df_perf_clean.write.mode("overwrite").parquet(f"{SILVER}/performance")
print("‚úÖ Ghi Silver: performance")

# -------------------------------
# 7. Ki·ªÉm tra nhanh
# -------------------------------
print("\nüéØ Silver layer ƒë√£ s·∫µn s√†ng:")
for tbl in ["players", "clubs", "transfers", "performance"]:
    df = spark.read.parquet(f"{SILVER}/{tbl}")
    print(f"{tbl}: {df.count()} b·∫£n ghi | {len(df.columns)} c·ªôt")
