In [None]:
# Import necessary libraries
import yaml
import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.dbutils import DBUtils
from pyspark.sql.utils import AnalysisException

# Configure SparkSession
spark = SparkSession.builder.appName("Raw to STG Transformation").getOrCreate()
dbutils = DBUtils(spark)

In [None]:
# Load database connection details from Databricks secrets
secret_scope = "gustavo_lima_adw"
mssql_host = dbutils.secrets.get(secret_scope, "mssql_host")
mssql_port = dbutils.secrets.get(secret_scope, "mssql_port")
mssql_database = dbutils.secrets.get(secret_scope, "mssql_database")
username = dbutils.secrets.get(secret_scope, "username")
password = dbutils.secrets.get(secret_scope, "password")
schema_source = "Sales"

# Define JDBC connection properties
connection_properties = {
    "user": username,
    "password": password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
    "encrypt": 'true',
    "trustServerCertificate": 'true'
}
jdbc_url = f"jdbc:sqlserver://{mssql_host}:{mssql_port};databaseName={mssql_database};encrypt=true;trustServerCertificate=true"

In [None]:
# Load the transformations.yml file
yaml_path = "/Workspace/Users/gustavo.lima@indicium.tech/.bundle/desafio_lh/prod/files/src/transformations.yml"
try:
    with open(yaml_path, "r") as file:
        transformations = yaml.safe_load(file)
        tables_metadata = {table["name"]: table["columns"] for table in transformations["sources"][0]["tables"]}
        print("Successfully loaded transformation metadata.")
except FileNotFoundError:
    raise Exception(f"YAML file not found at path: {yaml_path}")

In [None]:
def to_snake_case(column_name):
    """Converte CamelCase para snake_case incluindo tratamento de números e siglas."""
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', column_name)
    s2 = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1)
    s3 = re.sub('([a-z])([0-9])', r'\1_\2', s2)  # Trata números (ex: YTD)
    s4 = s3.replace(' ', '_').replace('-', '_')
    return re.sub('_+', '_', s4).lower()

def transform_table(df, table_name):
    """Aplica snake_case e type casting baseado no YAML."""
    if table_name not in tables_metadata:
        print(f"Warning: No metadata for {table_name}. Skipping.")
        return df

    transformations = tables_metadata[table_name]
    
    for column in transformations:
        original_name = column["name"]
        new_name = to_snake_case(original_name)
        
        # Debug: Verifica conversão
        print(f"Renomeando coluna: {original_name} -> {new_name}")
        
        # Renomeia primeiro
        df = df.withColumnRenamed(original_name, new_name)
        
        # Type casting
        if "data_tests" in column:
            description = column.get("description", "").lower()
            if "int" in description:
                df = df.withColumn(new_name, col(new_name).cast("int"))
            elif "string" in description:
                df = df.withColumn(new_name, col(new_name).cast("string"))
            elif "date" in description:
                df = df.withColumn(new_name, col(new_name).cast("date"))
            elif "timestamp" in description:
                df = df.withColumn(new_name, col(new_name).cast("timestamp"))

    return df

In [None]:
# Function to process XML columns in the dataset
def process_xml_column(df, xml_column, extracted_columns):
    """
    Processes an XML column in a DataFrame and extracts specified fields.

    Args:
        df (DataFrame): DataFrame containing the XML column.
        xml_column (str): Name of the column containing XML data.
        extracted_columns (list): List of tuples with field names and their corresponding XPath.
    
    Returns:
        DataFrame: DataFrame with extracted XML fields as separate columns.
    """
    for field_name, xpath in extracted_columns:
        df = df.withColumn(field_name, expr(f"xpath_string({xml_column}, '{xpath}')"))
    return df



In [None]:
for table_name in tables_metadata.keys():
    print(f"\nProcessando tabela: {table_name}")
    
    # Força recriação da tabela
    spark.sql(f"DROP TABLE IF EXISTS gustavo_lima_stg.schema.{table_name}")
    
    # Leitura do RAW
    df = spark.sql(f"SELECT * FROM gustavo_lima_raw.schema.{table_name}")
    
    # Aplica transformações
    transformed_df = transform_table(df, table_name)
    
    # Processamento de XML (exemplo para tabela 'store')
    if table_name == "store":
        xml_fields = [
            ("annual_sales", "/StoreSurvey/AnnualSales/text()"),
            ("annual_revenue", "/StoreSurvey/AnnualRevenue/text()"),
            ("bank_name", "/StoreSurvey/BankName/text()"),
            ("business_type", "/StoreSurvey/BusinessType/text()"),
            ("year_opened", "/StoreSurvey/YearOpened/text()"),
            ("square_feet", "/StoreSurvey/SquareFeet/text()")
        ]
        transformed_df = process_xml_column(transformed_df, "demographics", xml_fields)
    
    # Salva no STG com overwrite completo
    transformed_df.write.format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(f"gustavo_lima_stg.schema.{table_name}")
    
    print(f"Tabela {table_name} salva no STG.")

In [None]:
print("\nValidação das Tabelas STG:")
spark.sql("SHOW TABLES IN gustavo_lima_stg.schema").show()
spark.sql("DESCRIBE gustavo_lima_stg.schema.salesorderdetail").show(truncate=False)