## Tabela Silver para Fatos de Acidentes

### Tratamento do Dataframe para obter informações desejadas

Aqui vamos filtrar apenas as colunas desejadas e realizar o tratamentod e tipagem e limpeza de dados necessárias

Vamos realizar alguns tratamentos

    Seleção de colunas necessárias
    Remoção de duplicidades aproveitado já o ID_Veiculo que vem no arquivo original (Por ser o campo unico de cada veiculo já)
    Configurar colunas para o padrão
    Filtrando dados indesejados
    Alteração de tipagem de dados
    Removendo dados nulos
    Salvando arquivo de forma compactada

In [0]:
# Importando Bibliotecas para uso do Spark
from pyspark.sql import SparkSession # Import para inciar o spark
from pyspark.sql.types import * # Todos as configurações do SQL de tipagem no spark
from pyspark.sql.functions import * # Todas as funções de SQL do Spark
from pyspark.sql.window import Window

In [0]:
# Diretorio Bronze, com os dados brutos das causas dos acidentes
Bronze_Acidentes = '/Volumes/workspace/dw_acidentes/01_bronze/Acidentes/'
Bronze_Acidentes_Causas = '/Volumes/workspace/dw_acidentes/01_bronze/Acidentes_Causas/'

# Local para armazenar as tabelas Silvers
diretorio_Silver = '/Volumes/workspace/dw_acidentes/02_silver'

#Dimenssões necessárias para os relacionamentos
localizacao = fr'{diretorio_Silver}/Dim_Localizacao'
clima = fr'{diretorio_Silver}/Dim_Clima'
calenadario = fr'{diretorio_Silver}/Dim_Calendario'

In [0]:
# Lê os arquivos CSV descompactado
df_Acidentes = spark.read.parquet(Bronze_Acidentes)
df_Acidentes_causas = spark.read.parquet(Bronze_Acidentes_Causas)

In [0]:
# Realizado leitura dos arquivos dimenssionais
dim_calenadario = spark.read.parquet(calenadario)
dim_clima = spark.read.parquet(clima)
dim_localizacao = spark.read.parquet(localizacao)

In [0]:
# Reduzindo o volume de dados para perfomance em relacionamento
dim_calenadario = dim_calenadario.select(['id_calendario','data_acidente'])

In [0]:
# Renomeado e alterando o tipo da coluna para padrão desejado para relacionamento
df_Acidentes_causas = df_Acidentes_causas.withColumnRenamed('data_inversa', 'data_acidente')
df_Acidentes_causas = df_Acidentes_causas.withColumn('data_acidente', to_date(col('data_acidente'), 'yyyy-MM-dd'))

In [0]:
# Realizando o relacionamento para onbter o dado do ID_Calendario da Dimenssão para tabela fato
df_Acidentes_causas = df_Acidentes_causas.join(dim_calenadario, on='data_acidente', how="left")

In [0]:
# Realizando o relacionamento para onbter o dado do ID_CLIME da Dimenssão para tabela fato 
df_Acidentes_causas = df_Acidentes_causas.join(dim_clima, on=['fase_dia', 'condicao_metereologica'], how="left")

In [0]:
# Alterando a tipagem para padrões necesários - Realizado "regexp_replace" para remover dados com "."" E substituir as "," por ".", por fim alterar os tipos para decimal
df_Acidentes_causas = df_Acidentes_causas.withColumn("km",regexp_replace(regexp_replace(col("km"), "\\.", ""), ",", ".").try_cast(DecimalType(18, 8)))
df_Acidentes_causas = df_Acidentes_causas.withColumn("latitude",regexp_replace(regexp_replace(col("latitude"), "\\.", ""), ",", ".").try_cast(DecimalType(18, 8)))
df_Acidentes_causas = df_Acidentes_causas.withColumn("longitude",regexp_replace(regexp_replace(col("longitude"), "\\.", ""), ",", ".").try_cast(DecimalType(18, 8)))

In [0]:
ids_para_relacao = ['municipio', 'uf', 'br', 'tipo_pista', 'km', 'tracado_via', 'uso_solo', 'sentido_via', 'regional', 'delegacia', 'uop', 'latitude', 'longitude']
# Realizando o relacionamento para onbter o dado do ID_CLIME da Dimenssão para tabela fato 
df_Acidentes_causas = df_Acidentes_causas.join(dim_localizacao, on=ids_para_relacao, how="left")

In [0]:
df_ponte = df_Acidentes_causas.select(['id', 'id_veiculo','pesid', 'id_calendario', 'id_clima', 'id_localizacao'])
df_ponte = df_ponte.dropDuplicates(subset=["id"])
df_Acidentes = df_Acidentes.join(df_ponte, on="id", how="left")

In [0]:
# Selecionado apenas as colunas necessária para criar o modelo Dimenssional de Veiculos
df_Facts_Acidentes = df_Acidentes.select(['id', 'id_localizacao', 'id_veiculo', 'id_clima', 
                                          'pesid', 'id_calendario', 'data_inversa', 'horario',
                                          'causa_acidente', 'tipo_acidente', 'classificacao_acidente',
                                          'pessoas', 'veiculos', 'ilesos',
                                          'feridos', 'feridos_leves', 'feridos_graves',
                                          'mortos', 'ignorados']
                                        )

In [0]:
# Removendo Duplicadas df de veiculos no campo chave
df_Facts_Acidentes = df_Facts_Acidentes.dropDuplicates(subset=["id"])

In [0]:
# Configurado o nome das colunas para manter um padrão de minuscula e com underscore no lugar de espaçamentos
# Renomear todas as colunas convertendo para minúsculas
df_Facts_Acidentes = df_Facts_Acidentes.toDF(*[c.lower() for c in df_Facts_Acidentes.columns])
# Renomear e substituir espaços por underscore
df_Facts_Acidentes = df_Facts_Acidentes.toDF(*[c.replace(' ', '_') for c in df_Facts_Acidentes.columns])

In [0]:
# Alterando a tipagem dos campos desejados
df_Facts_Acidentes = (
    df_Facts_Acidentes
    .withColumn('id', col('id').try_cast(IntegerType()))
    .withColumn('id_localizacao', col('id_localizacao').try_cast(IntegerType()))
    .withColumn('id_veiculo', col('id_veiculo').try_cast(IntegerType()))
    .withColumn('id_clima', col('id_clima').try_cast(IntegerType()))
    .withColumn('pesid', col('pesid').try_cast(IntegerType()))
    .withColumn('id_calendario', col('id_calendario').try_cast(IntegerType()))
    .withColumn('data_inversa', to_date(col('data_inversa'), 'yyyy-MM-dd'))
    .withColumn('horario',col('horario').try_cast("string"))
    .withColumn('pessoas',col('pessoas').cast(IntegerType()))
    .withColumn('veiculos',col('veiculos').cast(IntegerType()))
    .withColumn('ilesos',col('ilesos').cast(IntegerType()))
    .withColumn('feridos',col('feridos').cast(IntegerType()))
    .withColumn('feridos_leves',col('feridos_leves').cast(IntegerType()))
    .withColumn('feridos_graves',col('feridos_graves').cast(IntegerType()))
    .withColumn('mortos',col('mortos').cast(IntegerType()))
    .withColumn('ignorados',col('ignorados').cast(IntegerType()))
)

In [0]:
# Renomeado a coluna para padrão desejado
df_Facts_Acidentes = df_Facts_Acidentes.withColumnRenamed('id', 'id_acidente')
df_Facts_Acidentes = df_Facts_Acidentes.withColumnRenamed('data_inversa', 'data_acidente')
df_Facts_Acidentes = df_Facts_Acidentes.withColumnRenamed('horario', 'hora_acidente')
df_Facts_Acidentes = df_Facts_Acidentes.withColumnRenamed('pesid', 'id_envolvido')

In [0]:
# Validando os campos para realizar os tratamentos
df_Facts_Acidentes.printSchema()

root
 |-- id_acidente: integer (nullable = true)
 |-- id_localizacao: integer (nullable = true)
 |-- id_veiculo: integer (nullable = true)
 |-- id_clima: integer (nullable = true)
 |-- id_envolvido: integer (nullable = true)
 |-- id_calendario: integer (nullable = true)
 |-- data_acidente: date (nullable = true)
 |-- hora_acidente: string (nullable = true)
 |-- causa_acidente: string (nullable = true)
 |-- tipo_acidente: string (nullable = true)
 |-- classificacao_acidente: string (nullable = true)
 |-- pessoas: integer (nullable = true)
 |-- veiculos: integer (nullable = true)
 |-- ilesos: integer (nullable = true)
 |-- feridos: integer (nullable = true)
 |-- feridos_leves: integer (nullable = true)
 |-- feridos_graves: integer (nullable = true)
 |-- mortos: integer (nullable = true)
 |-- ignorados: integer (nullable = true)



In [0]:
# Avaliando os primeiros registros analise rapida.
display(df_Facts_Acidentes.limit(5))

id_acidente,id_localizacao,id_veiculo,id_clima,id_envolvido,id_calendario,data_acidente,hora_acidente,causa_acidente,tipo_acidente,classificacao_acidente,pessoas,veiculos,ilesos,feridos,feridos_leves,feridos_graves,mortos,ignorados
2685,303740,4305,21,5612.0,10,2017-01-10,2026-02-05 18:05:00,Defeito Mecânico no Veículo,Tombamento,Sem Vítimas,2,2,1,0,0,0,0,1
5585,20865,9313,37,13613.0,21,2017-01-21,2026-02-05 14:50:00,Falta de Atenção à Condução,Colisão traseira,Com Vítimas Feridas,4,4,1,1,1,0,0,3
7150,45137,12221,31,15783.0,27,2017-01-27,2026-02-05 16:30:00,Falta de Atenção à Condução,Colisão lateral,Com Vítimas Feridas,2,2,1,1,1,0,0,0
8311,194981,14448,31,,32,2017-02-01,2026-02-05 07:00:00,Defeito Mecânico no Veículo,Incêndio,Sem Vítimas,2,2,1,0,0,0,0,1
9254,196132,16079,27,20215.0,35,2017-02-04,2026-02-05 18:35:00,Ultrapassagem Indevida,Colisão transversal,Com Vítimas Fatais,3,3,1,1,0,1,1,0


In [0]:
# Salvando o arquivo em parquet com compressão gzip 
df_Facts_Acidentes.write.parquet(
    f"{diretorio_Silver}/Facts_Acidentes", 
    mode="overwrite",         # Aqui defino para sempre sobrescrever o arquivo
    compression="gzip"        # Comando para salvar o arquivo com compactação
)