## Carregando dados no spark

In [1]:
# carregando arquivos do hdfs

pessoasAcidentesTransito = spark.read.csv("hdfs://node1:8020//user/vagrant/dados/pessoasAcidentesTransito/*", sep=';', header=True, encoding = 'latin1')

ocorrenciasTransito = spark.read.csv("hdfs://node1:8020//user/vagrant/dados/ocorrenciasTransito/*", sep=';', header=True, encoding = 'latin1')

logradourosAcidentesTransito = spark.read.csv("hdfs://node1:8020//user/vagrant/dados/logradourosAcidentesTransito/*", sep=';', header=True, encoding = 'latin1')

pessoasAcidentesTransito.show()

ocorrenciasTransito.show()

logradourosAcidentesTransito.show()

+------------------+-----------------+-----------+--------+--------------+---------------+----+---------------+-------------+-----+----------+---------------------+---------------------+----------------+---------------------+--------------------+--------+----------+
|       num_boletim|data_hora_boletim|n_envolvido|condutor|cod_severidade|desc_severidade|sexo|cinto_seguranca|   Embreagues|Idade|nascimento|categoria_habilitacao|descricao_habilitacao|declaracao_obito|cod_severidade_antiga|     especie_veiculo|pedestre|passageiro|
+------------------+-----------------+-----------+--------+--------------+---------------+----+---------------+-------------+-----+----------+---------------------+---------------------+----------------+---------------------+--------------------+--------+----------+
|2019-007782522-001| 18/02/2019 19:24|          2|       S|             1|NAO FATAL      |   M|            SIM|          NÃO|   32|02/05/1986|                  AB | HABILITADO NAS CA...|             

+------------------+----------------+------------+--------------------+---------------+-------------+---------------+--------------------+------------------------+------------------------+---------+--------------------+-----------+---------------------+---------+-----------------+
|       num_boletim|    data_boletim|N°_municipio|      nome_municipio|seq_logradouros|N°_logradouro|tipo_logradouro|     nome_logradouro|tipo_logradouro_anterior|nome_logradouro_anterior|N°_bairro|         nome_bairro|tipo_bairro|descricao_tipo_bairro|N°_imovel|N°_imovel_proximo|
+------------------+----------------+------------+--------------------+---------------+-------------+---------------+--------------------+------------------------+------------------------+---------+--------------------+-----------+---------------------+---------+-----------------+
|2019-049379035-001|09/10/2019 06:46|           1|BELO HORIZONTE   ...|              1|       117712|            RUA|JOAO ARANTES     ...|                

## Join das tabelas

In [2]:
tabela = pessoasAcidentesTransito.join(ocorrenciasTransito, on='num_boletim', how='left')

tabela = tabela.join(logradourosAcidentesTransito, on='num_boletim', how='left')

tabela.show()

+------------------+-----------------+-----------+--------+--------------+---------------+----+---------------+-------------+-----+----------+---------------------+---------------------+----------------+---------------------+--------------------+--------+----------+-----------------+----------------+-------------+--------------------+---------+---------------+-------------+---------------+------------+---------------+---------------+----------------+--------------------+-------------+-------------+--------------+--------------------+---------+-------------+-------------------+----------------+--------------------+----------------+------------+--------------------+---------------+-------------+---------------+--------------------+------------------------+------------------------+---------+--------------------+-----------+---------------------+---------+-----------------+
|       num_boletim|data_hora_boletim|n_envolvido|condutor|cod_severidade|desc_severidade|sexo|cinto_seguranca|   Emb

## Total de acidentes com vítima por bairro em acidentes com embriaguez

In [34]:
import pyspark.sql.functions as F

ComVitima = tabela.filter(tabela.DESC_TIPO_ACIDENTE.like('%VITIMA%'))
Embriaguez = ComVitima.filter(ComVitima.Embreagues == 'SIM')
TotalEmbriaguez = Embriaguez.select('nome_bairro').groupby('nome_bairro').agg(F.count('*').alias('Total de acidentes por embriaguez com vítimas'))

TotalEmbriaguez.show()

+--------------------+---------------------------------------------+
|         nome_bairro|Total de acidentes por embriaguez com vítimas|
+--------------------+---------------------------------------------+
|PAQUETA          ...|                                            1|
|SANTO ANTONIO    ...|                                            2|
|LOURDES          ...|                                           11|
|SION             ...|                                            1|
|HELIOPOLIS       ...|                                            8|
|SILVEIRA         ...|                                            5|
|DOM SILVERIO     ...|                                            1|
|CAICARAS         ...|                                            2|
|SANTO ANDRE      ...|                                            2|
|IPIRANGA         ...|                                            3|
|SANTA HELENA     ...|                                            2|
|JARDIM LEBLON    ...|            

## Total de acidentes por tipo de pavimento e condição do tempo

In [33]:
TotalTempo = ocorrenciasTransito.select('PAVIMENTO', 'DESC_TEMPO').groupby('PAVIMENTO', 'DESC_TEMPO').agg(F.count('*').alias('Total de acidentes por pavimento e tempo'))

TotalTempo.show()

+---------------+---------------+----------------------------------------+
|      PAVIMENTO|     DESC_TEMPO|Total de acidentes por pavimento e tempo|
+---------------+---------------+----------------------------------------+
|CALCAMENTO     |CHUVA          |                                       4|
|CALCAMENTO     |NAO INFORMADO  |                                       1|
|NAO INFORMADO  |NAO INFORMADO  |                                    6942|
|NAO INFORMADO  |NUBLADO        |                                       2|
|CONCRETO       |NUBLADO        |                                       6|
|ASFALTO        |BOM            |                                    5356|
|NAO INFORMADO  |CHUVA          |                                       6|
|TERRA          |BOM            |                                       1|
|ASFALTO        |NEBLINA        |                                       7|
|CONCRETO       |BOM            |                                      58|
|CALCAMENTO     |NUBLADO 

## Total de pessoas acidentadas por tipo de veiculo e tipo de pavimentação

In [36]:
TotalTipo = tabela.select('PAVIMENTO', 'especie_veiculo', 'n_envolvido').groupby('PAVIMENTO', 'especie_veiculo').agg({'n_envolvido':'sum'}).alias('Total de acidentes por veículo e pavimento')

TotalTipo.show()

+---------------+--------------------+----------------+
|      PAVIMENTO|     especie_veiculo|sum(n_envolvido)|
+---------------+--------------------+----------------+
|CALCAMENTO     |CAMINHAO         ...|             3.0|
|CONCRETO       |MOTONETA         ...|             4.0|
|NAO INFORMADO  |TRACAO           ...|             2.0|
|CALCAMENTO     |                 ...|            15.0|
|NAO INFORMADO  |REBOQUE E SEMI-RE...|             6.0|
|NAO INFORMADO  |AUTOMOVEL        ...|         13659.0|
|CALCAMENTO     |CAMINHONETE      ...|             2.0|
|NAO INFORMADO  |CICLOMOTOR       ...|             4.0|
|NAO INFORMADO  |MOTONETA         ...|           146.0|
|ASFALTO        |MOTOCICLETA      ...|         11479.0|
|ASFALTO        |MOTONETA         ...|           147.0|
|ASFALTO        |TRATOR DE RODAS  ...|             6.0|
|CONCRETO       |MOTOCICLETA      ...|           130.0|
|NAO INFORMADO  |CAMINHAO-TRATOR  ...|            11.0|
|NAO INFORMADO  |CAMIONETA        ...|          

## Média de idade dos condutores por tipo de veículo e tipo de acidente

In [31]:
Condutor = tabela.filter(tabela.condutor == 'S')
MediaTipo = Condutor.select('DESC_TIPO_ACIDENTE', 'especie_veiculo', 'Idade').groupby('DESC_TIPO_ACIDENTE', 'especie_veiculo').agg({'Idade':'mean'}).alias('Total de acidentes por veículo e pavimento')

MediaTipo.show()

+--------------------+--------------------+------------------+
|  DESC_TIPO_ACIDENTE|     especie_veiculo|        avg(Idade)|
+--------------------+--------------------+------------------+
|ABALROAMENTO COM ...|TRATOR DE RODAS  ...|              33.0|
|ATROPELAMENTO DE ...|AUTOMOVEL        ...|              25.0|
|CHOQUE MECANICO C...|KOMBI            ...|45.714285714285715|
|COLISAO DE VEICUL...|PATINETE         ...|              16.5|
|ATROPELAMENTO DE ...|AUTOMOVEL        ...| 35.83870967741935|
|QUEDA DE PESSOA D...|CAMINHONETE      ...|              19.0|
|ABALROAMENTO COM ...|CICLOMOTOR       ...|              34.5|
|COLISAO DE VEICUL...|MOTOCICLETA      ...|29.900095602294456|
|COLISAO DE VEICUL...|CARRO DE MAO     ...|35.166666666666664|
|CHOQUE MECANICO C...|CAMINHAO-TRATOR  ...|              41.9|
|ATROPELAMENTO DE ...|NAO INFORMADO    ...| 9.736842105263158|
|CAPOTAMENTO/TOMBA...|ONIBUS           ...|39.714285714285715|
|CAPOTAMENTO/TOMBA...|BICICLETA        ...| 41.57142857

## Média de idade dos condutores por indicativo de embriaguez

In [23]:
Embriagado = tabela.filter(tabela.Embreagues == 'SIM')
MediaTipo = Embriagado.agg({'Idade':'mean'}).alias('Total de acidentes por veículo e pavimento')

MediaTipo.show()

+-----------------+
|       avg(Idade)|
+-----------------+
|37.80203045685279|
+-----------------+

