# Analise de dados - Tabela Acidentes Fatais

In [0]:
# Leitura do arquivo no Blob Storage e mantendo arquivo em variavel
acidentes_fatais = "/mnt/landing/web_scraping/estadual/raw/xlsx/acidentes_fatais.xlsx"

In [0]:
# Criando Dataframe
df_fatais = spark.read.format("com.crealytics.spark.excel").option("header", "true").load(acidentes_fatais)

In [0]:
# Excluindo campos que não serão utilizados
df_fatais_selec = df_fatais.drop(*['Ano BO (RDO)', 'Ano/Mês do Acidente', 'Turno', 'Região Administrativa', 'Administração', 'Conservação', 'Tipo do Local do Acidente','Iluminação da via (SIOPM)',  'Condições Climáticas (SIOPM)', 'Sentido da Via (SIOPM)', 'Limite da velocidade da via (SIOPM)'])


In [0]:
from pyspark.sql.functions import col

df_colunas_fatais = df_fatais.select(col("ID").alias("id"), 
                                   col("Id Delegacia (RDO)").alias("id_delegacia"),
                                   col("Número BO (RDO)").alias("numero_bo"),
                                   col("Data do Acidente").alias("data_acidente"),
                                   col("Dia do Acidente").alias("dia_acidente"),
                                   col("Mês do Acidente").alias("mes_acidente"),
                                   col("Ano do Acidente").alias("ano_acidente"),
                                   col("Dia da semana").alias("dia_semana"),
                                   col("Hora do Acidente").alias("hora_acidente"),
                                   col("Município").alias("municipio"),
                                   col("Logradouro").alias("logradouro"),
                                   col("Numeral / KM").alias("km"),
                                   col("Jurisdição").alias("jurisdicao"),
                                   col("Lat (GEO)").alias("lat"),
                                   col("Long (GEO)").alias("long"),  
                                   col("Superfície da Via (SIOPM)").alias("superficie_via"),
                                   col("Tipo de pista (SIOPM)").alias("tipo_pista"),
                                   col("Outro Veículo Envolvido").alias("outro_veiculo_envolvido"),
                                   col("Tipo de via").alias("tipo_via"),
                                   col("Quantidade de vítimas").alias("quantidade_vitimas"),
                                   col("Tempo entre o Acidente e as Mortes").alias("tempo_entre_acidente_e_obito")).filter("data_acidente >= '2019-01-01'")

In [0]:
# Importando a biblioteca de funções disponíveis para DataFrame
import pyspark.sql.functions as sf

In [0]:
# Filtra dados nulos na coluna 'Município' e altera o campo para 'NAO INFORMADO'
df_ = df_colunas_fatais.withColumn('municipio', sf.when(col('municipio').isNull(), sf.lit('NAO INFORMADO')).otherwise(sf.col('municipio')))

In [0]:
# Filtra dados nulos na coluna 'outro_veiculo_envolvido' e altera o campo para 'NAO INFORMADO'
df_veiculo = df_.withColumn('outro_veiculo_envolvido', sf.when(col('outro_veiculo_envolvido').isNull(), sf.lit('NAO INFORMADO')).otherwise(sf.col('outro_veiculo_envolvido')))

In [0]:
# Para cada valor zerado, altera para 'Nao Informado'
df_lat = df_veiculo.withColumn('lat', sf.when(col('lat') =='0,0',sf.lit('NAO INFORMADO')).otherwise(sf.col('lat')))

# Para cada valor zerado, altera para 'Nao Informado'
df_long = df_lat.withColumn('long', sf.when(col('lat') =='0,0',sf.lit('NAO INFORMADO')).otherwise(sf.col('long')))

In [0]:
# Para valores com 'km' menores que 0 , é substituido por '0'
df_km = df_long.withColumn('km', sf.when(col('km') < '0', sf.lit('0')).otherwise(sf.col('km')))

In [0]:
# Padroniza a formatação para campos 'NAO DISPONIVEL' para 'NAO INFORMADO'
df2 = df_km.select([sf.when(col(c) == 'NAO DISPONIVEL', sf.lit('NAO INFORMADO')).otherwise(col(c)).alias(c) for c in df_km.columns])    

In [0]:
# Padronizando para minúsculo o dia da semana
df_semana = df2.withColumn('dia_semana',sf.lower(sf.col("dia_semana")))

In [0]:
#Alterando compos 'NAO INFORMADO' para valor '00:00:00' no campo hora_acidente
df_h = df_semana.withColumn('hora_acidente',sf.when(sf.col('hora_acidente') == 'NAO INFORMADO', sf.lit('00:00:00')).otherwise(col('hora_acidente')))

In [0]:
# Formatando o campo Hora no formato "HH:mm:ss" 
class DateUtils:
    def __init__(self, spark):
        self.spark = spark
        self.origin_date_format_list = ["yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd'T'HH:mm:ss'Z'"]
        self.converted_date_format = "HH:mm:ss"

def convert_date(col):
    return sf.coalesce(*[sf.from_unixtime(sf.unix_timestamp(col, f), self.converted_date_format) for f in self.origin_date_format_list])
  
# Chamada da função convert_date  
df_hora = df_h.withColumn("hora_acidente",df_h["hora_acidente"])

In [0]:
# Formatando o campo data_acidente no formato 'yyyy-MM-dd'
class DateUtils:
    def __init__(self, spark):
        self.spark = spark
        self.origin_date_format_list = ["yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd'T'HH:mm:ss'Z'"]
        self.converted_date_format = "yyyy-MM-dd"

def convert_date(col):
    return sf.coalesce(*[sf.from_unixtime(sf.unix_timestamp(col, f), self.converted_date_format) for f in self.origin_date_format_list])
      
# Chamada da função convert_date  
df_date = df_hora.withColumn("data_acidente",df_hora["data_acidente"])

In [0]:
#testar dia da semana
df3 = df_date.withColumn('dia_semana',sf.when(sf.col('dia_semana') == 'segunda', sf.lit('segunda-feira'))
                         .when(col('dia_semana') == 'terça', sf.lit('terça-feira'))
                         .when(col('dia_semana') == 'quarta', sf.lit('quarta-feira'))
                         .when(col('dia_semana') == 'quinta', sf.lit('quinta-feira'))
                         .when(col('dia_semana') == 'sexta', sf.lit('sexta-feira'))
                         .otherwise(col('dia_semana')))

In [0]:
df_acidentes_fatais_final= df3

In [0]:
jdbc_connection = "jdbc:sqlserver://projeto-grupob.database.windows.net:1433;database=projetointegradogrupob;user=grupob@projeto-grupob;password=Fwk$MOSSgztl;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

In [0]:
(
  df_acidentes_fatais_final
    .write
    .format('jdbc')
    .mode('overwrite')
    .option('url', jdbc_connection)
    .option('dbtable', 'stage.acidentes_fatais')
    .save()
)