### Imports

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel


POST_JDBC = "/app/postgresql-42.7.8.jar"
FILE_TABELAS_DIR = "/app/parquet_tables"
SCRATCH = "/tmp/spark_temp"

### Spark Session Configuration

In [None]:
spark = SparkSession.builder \
    .appName("DataCrash-ETL") \
    .config("spark.sql.legacy.timeParserPolicy", "CORRECTED") \
    .config("spark.local.dir", SCRATCH) \
    .config("spark.master", "local[*]") \
    .config("spark.driver.memory", "2g") \
    .config("spark.worker.cleanup.enabled", "true") \
    .config("spark.ui.showConsoleProgress", "false") \
    \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.8") \
    \
    \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/04 01:52:57 WARN Utils: Your hostname, DESKTOP-COU095G, resolves to a loopback address: 127.0.1.1; using 172.30.167.94 instead (on interface eth0)
25/11/04 01:52:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/italo/sbd2/dw-medallion/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/italo/.ivy2.5.2/cache
The jars for the packages stored in: /home/italo/.ivy2.5.2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-bc3ad866-f361-49d0-bf89-a74114b385f9;1.0
	confs: [default]
	found org.postgresql#postgresql;42.7.8 in central
	found org.checkerframework#checker-qual;3.49.5 in central
:: resolution report :: resolve 192ms :: artifacts dl 6ms
	:: modules in use:
	org.checkerframework#c

### Column Definitions

In [3]:
def get_accident_columns():
    """Return list of columns to KEEP for accidents"""
    return [
        'Accident_Index', 'Longitude', 'Latitude', 'Accident_Severity',
        'Number_of_Vehicles', 'Number_of_Casualties', 'Date', 'Day_of_Week', 
        'Time', 'Road_Type', 'Speed_limit', 'Junction_Detail', 'Junction_Control',
        'Pedestrian_Crossing-Physical_Facilities', 'Light_Conditions', 
        'Weather_Conditions', 'Road_Surface_Conditions', 'Special_Conditions_at_Site',
        'Carriageway_Hazards', 'Urban_or_Rural_Area'
    ]

def get_vehicle_columns():
    """Return list of columns to KEEP for vehicles"""
    return [
        'Accident_Index', 'Vehicle_Reference', 'Vehicle_Type',
        'Vehicle_Manoeuvre', 'Vehicle_Location-Restricted_Lane',
        'Was_Vehicle_Left_Hand_Drive?', 'Sex_of_Driver', 'Age_of_Driver',
        'Age_Band_of_Driver', 'Propulsion_Code', 'Age_of_Vehicle'
    ]

def get_casualty_columns():
    """Return list of columns to KEEP for casualties"""
    return [
        'Accident_Index', 'Vehicle_Reference', 'Casualty_Reference',
        'Casualty_Class', 'Sex_of_Casualty', 'Age_of_Casualty',
        'Age_Band_of_Casualty', 'Casualty_Severity', 'Pedestrian_Movement',
        'Car_Passenger', 'Bus_or_Coach_Passenger', 'Casualty_Type'
    ]

print("Column definitions loaded")

Column definitions loaded


### Data Loading Function

In [4]:
def load_and_filter_data(file_path, columns_to_keep):


    df_complete = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .parquet(file_path)

    print(f"{file_path}: {df_complete.count():,} rows, {len(df_complete.columns)} columns")
    df_complete.printSchema()

    
    existing_columns = [col for col in columns_to_keep if col in df_complete.columns]
    df_filtered = df_complete.select(existing_columns)
    
    print(f"Filtered: {len(df_filtered.columns)} columns kept")
    df_filtered.printSchema()
    print(f"Sample data from {file_path}:")
    df_filtered.show(5, truncate=False)
    
    return df_filtered

### Load Datasets


In [5]:
df_accidents = load_and_filter_data(
    "../Data_Layer/raw/parquet/Accidents0515",
    get_accident_columns()
)

df_vehicles = load_and_filter_data(
    "../Data_Layer/raw/parquet/Vehicles0515", 
    get_vehicle_columns()
)

df_casualties = load_and_filter_data(
    "../Data_Layer/raw/parquet/Casualties0515",
    get_casualty_columns()
)

../Data_Layer/raw/parquet/Accidents0515: 1,780,653 rows, 32 columns
root
 |-- Accident_Index: string (nullable = true)
 |-- Location_Easting_OSGR: integer (nullable = true)
 |-- Location_Northing_OSGR: integer (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Police_Force: integer (nullable = true)
 |-- Accident_Severity: integer (nullable = true)
 |-- Number_of_Vehicles: integer (nullable = true)
 |-- Number_of_Casualties: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Day_of_Week: integer (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- Local_Authority_(District): integer (nullable = true)
 |-- Local_Authority_(Highway): string (nullable = true)
 |-- 1st_Road_Class: integer (nullable = true)
 |-- 1st_Road_Number: integer (nullable = true)
 |-- Road_Type: integer (nullable = true)
 |-- Speed_limit: integer (nullable = true)
 |-- Junction_Detail: integer (nullable = true)
 |-- Junction_Control: integ

### Data Quality

In [6]:
def safe_quality_analysis(df, df_name):
    print(f"\n{df_name} - QUALITY ANALYSIS")
    print("=" * 40)
    
    # Contagem básica
    total_rows = df.count()
    print(f"Total rows: {total_rows:,}")
    print(f"Total columns: {len(df.columns)}")
    
    # Lista de problemas
    problems = []
    
    # Verificar cada coluna
    for col_name in df.columns:
        print(f"\n{col_name}:")
        
        try:
            # NULL values
            null_count = df.filter(F.col(col_name).isNull()).count()
            if null_count > 0:
                print(f"   NULL: {null_count}")
                problems.append(f"{col_name}: {null_count} NULL")
            else:
                print(f"   NULL: 0")
                
            # Data type
            dtype = df.schema[col_name].dataType
            print(f"   Type: {dtype}")
            
            # Amostra
            samples = df.select(col_name).limit(2).collect()
            sample_values = [str(row[col_name]) for row in samples]
            print(f"   Samples: {sample_values}")
            
        except Exception as e:
            print(f"  Erro ao analisar: {e}")
            problems.append(f"{col_name}: ERRO - {e}")
    

    print(f"\nRESUMO:")
    print("-" * 20)
    if problems:
        print(f"Problemas encontrados: {len(problems)}")
        for problem in problems:
            print(f"   • {problem}")
    else:
        print("Nenhum problema encontrado!")
    
    return problems

# EXECUTAR PARA TODOS OS DATASETS
print("ANALISANDO ACCIDENTS...")
problems_accidents = safe_quality_analysis(df_accidents, "Accidents")

print("\nANALISANDO VEHICLES...")
problems_vehicles = safe_quality_analysis(df_vehicles, "Vehicles")

print("\nANALISANDO CASUALTIES...") 
problems_casualties = safe_quality_analysis(df_casualties, "Casualties")

print("\nANÁLISE DE QUALIDADE CONCLUÍDA PARA TODOS OS DATASETS!")

ANALISANDO ACCIDENTS...

Accidents - QUALITY ANALYSIS
Total rows: 1,780,653
Total columns: 20

Accident_Index:
   NULL: 0
   Type: StringType()
   Samples: ['201001CW11044', '201001CW11047']

Longitude:
   NULL: 138
   Type: DoubleType()
   Samples: ['-0.126838', '-0.173004']

Latitude:
   NULL: 138
   Type: DoubleType()
   Samples: ['51.49474', '51.521907']

Accident_Severity:
   NULL: 0
   Type: IntegerType()
   Samples: ['3', '3']

Number_of_Vehicles:
   NULL: 0
   Type: IntegerType()
   Samples: ['2', '2']

Number_of_Casualties:
   NULL: 0
   Type: IntegerType()
   Samples: ['1', '1']

Date:
   NULL: 0
   Type: StringType()
   Samples: ['25/05/2010', '04/06/2010']

Day_of_Week:
   NULL: 0
   Type: IntegerType()
   Samples: ['3', '6']

Time:
   NULL: 151
   Type: TimestampType()
   Samples: ['2025-10-31 09:20:00', '2025-10-31 23:30:00']

Road_Type:
   NULL: 0
   Type: IntegerType()
   Samples: ['6', '6']

Speed_limit:
   NULL: 0
   Type: IntegerType()
   Samples: ['30', '30']

Junct

### Data Type Conversion

In [7]:
def convert_data_types_simple(df):   
    df_converted = df
    
    # 1. CONVERTER DATE
    if 'Date' in df.columns:
        df_converted = df_converted.withColumn(
            "Date", 
            F.to_date(F.col("Date"), "dd/MM/yyyy")
        )
        print("Date: String → DateType")
    
    # 2. CONVERTER TIME (Timestamp → String no formato HH:mm)
    if 'Time' in df.columns:
        df_converted = df_converted.withColumn(
            "Time",
            F.date_format(F.col("Time"), "HH:mm")
        )
        print("Time: Timestamp → String (HH:mm)")
        
    return df_converted

df_accidents_converted = convert_data_types_simple(df_accidents)

Date: String → DateType
Time: Timestamp → String (HH:mm)


### Missing Value Treatment

In [8]:
def treat_missing_values(df):    
    original_count = df.count()
    
    # COLUNAS CRÍTICAS (se tiver missing, remove a linha)
    critical_columns = ['Longitude', 'Latitude', 'Time']
    
    # Aplicar filtro para remover missing nas colunas críticas
    condition = None
    for col_name in critical_columns:
        if col_name in df.columns:
            if condition is None:
                condition = F.col(col_name).isNotNull()
            else:
                condition = condition & F.col(col_name).isNotNull()
    
    if condition is not None:
        df_clean = df.filter(condition)
        removed_count = original_count - df_clean.count()
        removed_pct = (removed_count / original_count) * 100
        
        print(f"Removidas {removed_count:,} linhas ({removed_pct:.4f}%) com missing em colunas críticas")
        print(f"Antes: {original_count:,} linhas")
        print(f"Depois: {df_clean.count():,} linhas")
    else:
        df_clean = df
        print("Nenhum missing encontrado nas colunas críticas")
    
    return df_clean

# EXECUTAR TRATAMENTO
df_accidents_clean = treat_missing_values(df_accidents_converted)

Removidas 289 linhas (0.0162%) com missing em colunas críticas
Antes: 1,780,653 linhas
Depois: 1,780,364 linhas


### Referential Integrity Cleanup

In [9]:
def ensure_referential_integrity(accidents_df, vehicles_df, casualties_df):
    """Remove veículos e vítimas de acidentes excluídos"""
    print("Sincronizando datasets...")
    
    # Lista de Accident_Index válidos (após limpeza)
    valid_accidents = accidents_df.select("Accident_Index").distinct()
    
    # Contagens antes
    vehicles_before = vehicles_df.count()
    casualties_before = casualties_df.count()
    
    # Filtrar veículos e vítimas para manter só os que têm acidentes válidos
    vehicles_clean = vehicles_df.join(valid_accidents, "Accident_Index", "inner")
    casualties_clean = casualties_df.join(valid_accidents, "Accident_Index", "inner")
    
    # Contagens depois
    vehicles_after = vehicles_clean.count()
    casualties_after = casualties_clean.count()
    
    # Estatísticas
    vehicles_removed = vehicles_before - vehicles_after
    casualties_removed = casualties_before - casualties_after
    
    print(f"Veículos: {vehicles_before:,} → {vehicles_after:,} (removidos {vehicles_removed:,})")
    print(f"Vítimas: {casualties_before:,} → {casualties_after:,} (removidos {casualties_removed:,})")
    
    return vehicles_clean, casualties_clean

# EXECUTAR LIMPEZA DE INTEGRIDADE
print("REMOVENDO DADOS ÓRFÃOS...")
df_vehicles_clean, df_casualties_clean = ensure_referential_integrity(
    df_accidents_clean, df_vehicles, df_casualties
)

REMOVENDO DADOS ÓRFÃOS...
Sincronizando datasets...
Veículos: 3,262,270 → 3,261,764 (removidos 506)
Vítimas: 2,402,909 → 2,402,551 (removidos 358)


### Domain Validation 

In [10]:
def validate_all_domains(df, df_name):
    """Valida TODAS as colunas com códigos"""
    print(f"\n {df_name} - COMPREHENSIVE DOMAIN VALIDATION")
    print("=" * 50)
    
    issues = []
    
    validation_rules = {
        # ACCIDENTS
        'Accident_Severity': [1, 2, 3],
        'Day_of_Week': [1, 2, 3, 4, 5, 6, 7],
        'Road_Type': [1, 2, 3, 6, 7, 9, 12],
        'Speed_limit': [0, 10, 15, 20, 30, 40, 50, 60, 70],
        'Junction_Detail': [0, 1, 2, 3, 5, 6, 7, 8, 9, -1],
        'Junction_Control': [0, 1, 2, 3, 4, -1],
        'Pedestrian_Crossing-Physical_Facilities': [0, 1, 4, 5, 7, 8, -1],
        'Light_Conditions': [1, 4, 5, 6, 7, -1],
        'Weather_Conditions': [1, 2, 3, 4, 5, 6, 7, 8, 9, -1],
        'Road_Surface_Conditions': [1, 2, 3, 4, 5, 6, 7, -1],
        'Special_Conditions_at_Site': [0, 1, 2, 3, 4, 5, 6, 7, -1],
        'Carriageway_Hazards': [0, 1, 2, 3, 4, 5, 6, 7, -1],
        'Urban_or_Rural_Area': [1, 2, 3],
        
        # VEHICLES
        'Vehicle_Type': list(range(1, 99)) + [-1],  # 1-98 + -1
        'Vehicle_Manoeuvre': list(range(1, 19)) + [-1],
        'Vehicle_Location-Restricted_Lane': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, -1],
        'Sex_of_Driver': [1, 2, 3, -1],
        'Age_Band_of_Driver': list(range(1, 12)) + [-1],
        'Propulsion_Code': list(range(1, 13)) + [-1],
        
        # CASUALTIES  
        'Casualty_Class': [1, 2, 3],
        'Sex_of_Casualty': [1, 2, 3, -1],
        'Age_Band_of_Casualty': list(range(1, 12)) + [-1],
        'Casualty_Severity': [1, 2, 3],
        'Pedestrian_Movement': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, -1],
        'Car_Passenger': [0, 1, 2, 9, -1],
        'Bus_or_Coach_Passenger': [0, 1, 2, 3, 4, 9, -1],
        'Casualty_Type': [0, 1, 2, 3, 4, 5, 8, 9, 10, 11, 16, 17, 18, 19, 20, 21, 22, 23, 90, 97, 98, -1]
    }
    
    for col_name, valid_values in validation_rules.items():
        if col_name in df.columns:
            # Contar valores fora do domínio
            invalid_count = df.filter(~F.col(col_name).isin(valid_values)).count()
            
            if invalid_count > 0:
                # Pegar exemplos dos valores inválidos
                invalid_samples = df.filter(~F.col(col_name).isin(valid_values)) \
                                  .select(col_name).distinct().limit(3).collect()
                
                samples = [row[col_name] for row in invalid_samples]
                issues.append(f"{col_name}: {invalid_count} inválidos - Ex: {samples}")
                print(f"   {col_name}: {invalid_count} valores inválidos")
                print(f"      Exemplos: {samples}")
            else:
                print(f"   {col_name}: Todos os {df.select(col_name).distinct().count()} valores estão no domínio")
    
    print(f"\nRESUMO DE VALIDAÇÃO:")
    print("-" * 30)
    if issues:
        print(f"⚠  Problemas encontrados: {len(issues)}")
        for issue in issues:
            print(f"   • {issue}")
    else:
        print("Todos os domínios estão válidos!")
    
    return issues

# EXECUTAR VALIDAÇÃO COMPLETA
print("VALIDANDO ACCIDENTS...")
issues_accidents = validate_all_domains(df_accidents_clean, "Accidents")

print("\nVALIDANDO VEHICLES...")
issues_vehicles = validate_all_domains(df_vehicles_clean, "Vehicles")

print("\nVALIDANDO CASUALTIES...")
issues_casualties = validate_all_domains(df_casualties_clean, "Casualties")

print("\n VALIDAÇÃO COMPLETA DE DOMÍNIOS CONCLUÍDA!")

VALIDANDO ACCIDENTS...

 Accidents - COMPREHENSIVE DOMAIN VALIDATION
   Accident_Severity: Todos os 3 valores estão no domínio
   Day_of_Week: Todos os 7 valores estão no domínio
   Road_Type: Todos os 6 valores estão no domínio
   Speed_limit: Todos os 9 valores estão no domínio
   Junction_Detail: Todos os 10 valores estão no domínio
   Junction_Control: Todos os 6 valores estão no domínio
   Pedestrian_Crossing-Physical_Facilities: Todos os 7 valores estão no domínio
   Light_Conditions: Todos os 5 valores estão no domínio
   Weather_Conditions: Todos os 10 valores estão no domínio
   Road_Surface_Conditions: Todos os 6 valores estão no domínio
   Special_Conditions_at_Site: Todos os 9 valores estão no domínio
   Carriageway_Hazards: Todos os 7 valores estão no domínio
   Urban_or_Rural_Area: Todos os 3 valores estão no domínio

RESUMO DE VALIDAÇÃO:
------------------------------
Todos os domínios estão válidos!

VALIDANDO VEHICLES...

 Vehicles - COMPREHENSIVE DOMAIN VALIDATION
   

### Renaming Columns to snake_case Pattern

In [11]:
# Renomear colunas para snake_case
def rename_columns(df, rename_dict):
    df_renamed = df
    for col_name in df.columns:
        df_renamed = df_renamed.withColumnRenamed(col_name, col_name.lower())

    for old_name, new_name in rename_dict.items():
        if old_name in df_renamed.columns:
            df_renamed = df_renamed.withColumnRenamed(old_name, new_name)

    return df_renamed

renomear = {"date": "accident_timestamp", "time": "accident_time", "pedestrian_crossing-physical_facilities": "pedestrian_crossing_physical_facilities", "was_vehicle_left_hand_drive?": "was_vehicle_left_hand_drive", "vehicle_location-restricted_lane": "vehicle_location_restricted_lane"}
df_accidents_clean = rename_columns(df_accidents_clean, renomear)
df_vehicles_clean = rename_columns(df_vehicles_clean, renomear)
df_casualties_clean = rename_columns(df_casualties_clean, renomear)



### Data Unification

In [12]:
df_joined_base = df_accidents_clean \
    .join(df_vehicles_clean, on="accident_index", how="inner") \
    .join(df_casualties_clean, on=["accident_index", "vehicle_reference"], how="inner")

df_joined = df_joined_base.persist(StorageLevel.MEMORY_AND_DISK)

print(f"Dataframe final: {df_joined.count():,} linhas, {len(df_joined.columns)} colunas")

df_joined.printSchema()

Dataframe final: 2,402,551 linhas, 40 colunas
root
 |-- accident_index: string (nullable = true)
 |-- vehicle_reference: integer (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- accident_severity: integer (nullable = true)
 |-- number_of_vehicles: integer (nullable = true)
 |-- number_of_casualties: integer (nullable = true)
 |-- accident_timestamp: date (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- accident_time: string (nullable = true)
 |-- road_type: integer (nullable = true)
 |-- speed_limit: integer (nullable = true)
 |-- junction_detail: integer (nullable = true)
 |-- junction_control: integer (nullable = true)
 |-- pedestrian_crossing_physical_facilities: integer (nullable = true)
 |-- light_conditions: integer (nullable = true)
 |-- weather_conditions: integer (nullable = true)
 |-- road_surface_conditions: integer (nullable = true)
 |-- special_conditions_at_site: integer (nullable = true)
 |-- carria

### Saving Joined Table in Parquet

In [13]:
silver_path = "../Data_Layer/silver/parquet" 

df_joined.write.mode("overwrite") \
    .option("compression", "snappy") \
    .parquet(silver_path)

df_joined.unpersist() 

print(f"Camada Silver salva com sucesso em: {silver_path}")

Camada Silver salva com sucesso em: ../Data_Layer/silver/parquet


In [14]:
df_silver = spark.read.parquet(silver_path)

print(f"Iniciando a escrita de {df_silver.count():,} linhas no PostgreSQL...")

db_url = "jdbc:postgresql://localhost:5432/gis"
db_table = "joined"
db_properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

df_silver.write.jdbc(
    url=db_url,
    table=db_table,
    mode="overwrite", 
    properties=db_properties
)

print(f"Dados salvos na tabela '{db_table}' do banco 'gis'!")

Iniciando a escrita de 2,402,551 linhas no PostgreSQL...
Dados salvos na tabela 'joined' do banco 'gis'!
