In [None]:
# Databricks notebook source
# MAGIC %md
# MAGIC ## Bronze-to-Silver: Extract SQLite Tables

# COMMAND ----------

import re
from pyspark.sql.functions import col
from pyspark.sql.types import StringType

db_path = "/dbfs/mnt/bronze/database.sqlite"
jdbc_url = f"jdbc:sqlite:{db_path}"

tables_to_extract = [
    "Country", "League", "Match", "Player", 
    "Player_Attributes", "Team", "Team_Attributes"
]
silver_base_path = "/mnt/silver"

print(f"Bronze DB path: {db_path}")
print(f"Silver path: {silver_base_path}")

# COMMAND ----------

def clean_col_name(name):
    clean_name = name.lower()
    clean_name = re.sub(r'[^a-zA-Z0-9_]', '_', clean_name)
    clean_name = re.sub(r'_+', '_', clean_name)
    return clean_name

# COMMAND ----------

print("Starting extraction...")

for table_name in tables_to_extract:
    print(f"Processing table: {table_name} ...")
    
    try:
        df = spark.read \
            .format("jdbc") \
            .option("url", jdbc_url) \
            .option("dbtable", table_name) \
            .load()

        cleaned_df = df
        for col_name in df.columns:
            cleaned_df = cleaned_df.withColumnRenamed(col_name, clean_col_name(col_name))
        
        for field in cleaned_df.schema.fields:
            if "StringType" in str(field.dataType):
                cleaned_df = cleaned_df.withColumn(field.name, col(field.name).cast(StringType()))

        output_path = f"{silver_base_path}/{table_name.lower()}"
        
        cleaned_df.write \
            .format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .save(output_path)
          
        print(f"Successfully extracted '{table_name}' to '{output_path}'")
        
    except Exception as e:
        print(f"Error processing table {table_name}: {e}")

print("\nBronze-to-Silver extraction complete.")


