<a href="https://colab.research.google.com/github/gisleinemoreno/data_science/blob/main/PySpark_SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**ANÁLISE DE RISCO NO TRANSPORTE PÚBLICO DE LONDRES**
*   Será analisado um conjunto de dados que lista incidentes ocorridos na cidade de Londres.

*   O objetivo é analisar dados e construir gráficos que respondam as 10 perguntas abaixo:

1. Qual a quantidade de incidentes por gênero?
2.Qual faixa etária esteve mais envolvida nos incidentes?

3. Qual o percentual de incidentes por tipo de evento (Incident Event Type)?

4. Como foi a evolução de incidentes por mês ao longo do tempo?

5. Quando o incidente foi “Collision Incident” em qual mês houve o maior número de incidentes envolvendo pessoas do sexo feminino?

6. Qual foi a média de incidentes por mês envolvendo crianças (Child)?

7. Considerando a descrição de incidente como “Injuries treated on scene” (coluna Injury Result Description), qual o total de incidentes de pessoas do sexo masculino e sexo feminino?

8. No ano de 2017 em qual mês houve mais incidentes com idosos (Elderly)?

9. Considerando o Operador qual a distribuição de incidentes ao longo do tempo?

10. Qual o tipo de incidente mais comum com ciclistas?

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Versão da Linguagem Python
from platform import python_version
print('Versão da Linguagem Python Usada Neste Jupyter Notebook:', python_version())

Versão da Linguagem Python Usada Neste Jupyter Notebook: 3.10.12


In [None]:
# Versão Linux do Google Colab
!cat /etc/*release

DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=22.04
DISTRIB_CODENAME=jammy
DISTRIB_DESCRIPTION="Ubuntu 22.04.2 LTS"
PRETTY_NAME="Ubuntu 22.04.2 LTS"
NAME="Ubuntu"
VERSION_ID="22.04"
VERSION="22.04.2 LTS (Jammy Jellyfish)"
VERSION_CODENAME=jammy
ID=ubuntu
ID_LIKE=debian
HOME_URL="https://www.ubuntu.com/"
SUPPORT_URL="https://help.ubuntu.com/"
BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/"
PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy"
UBUNTU_CODENAME=jammy


In [None]:
# Versão Java
!java -version

openjdk version "11.0.20" 2023-07-18
OpenJDK Runtime Environment (build 11.0.20+8-post-Ubuntu-1ubuntu122.04)
OpenJDK 64-Bit Server VM (build 11.0.20+8-post-Ubuntu-1ubuntu122.04, mixed mode, sharing)


https://archive.apache.org/dist/spark/

In [None]:
# Download dos binários do Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz

In [None]:
# Descompacta o arquivo
!tar xf spark-3.4.1-bin-hadoop3.tgz

In [None]:
# Lista o conteúdo da pasta local
!ls

drive	     spark-3.4.1-bin-hadoop3	  spark-3.4.1-bin-hadoop3.tgz.1
sample_data  spark-3.4.1-bin-hadoop3.tgz  TFL_Bus_Safety.csv


In [None]:
# Lista o conteúdo da pasta local
!ls /content

drive	     spark-3.4.1-bin-hadoop3	  spark-3.4.1-bin-hadoop3.tgz.1
sample_data  spark-3.4.1-bin-hadoop3.tgz  TFL_Bus_Safety.csv


In [None]:
# Define a variável de ambiente SPARK_HOME
import os
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

In [None]:
# Instala o findspark
!pip install -q findspark

In [None]:
# Importa os pacotes e cria a sessão Spark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Propriedade usada para formatar melhor as tabelas de saída
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark

Faça o upload do arquivo de dados usando o botão do lado esquerdo.

In [None]:
# Carrega o dataset
df_dsa = spark.read.csv('TFL_Bus_Safety.csv', header = True, sep = ",",inferSchema = True)

In [None]:
# Visualiza as primeiras linhas
df_dsa.show(5)

+----+----------------+-----+--------------+----------+--------------------+-------------------+-------------------------+-------------------+---------------+-----------+-----------+
|Year|Date Of Incident|Route|      Operator|Group Name|          Bus Garage|            Borough|Injury Result Description|Incident Event Type|Victim Category|Victims Sex|Victims Age|
+----+----------------+-----+--------------+----------+--------------------+-------------------+-------------------------+-------------------+---------------+-----------+-----------+
|2015|      2015-01-01|    1|London General|  Go-Ahead|Garage Not Available|          Southwark|     Injuries treated ...|   Onboard Injuries|      Passenger|       Male|      Child|
|2015|      2015-01-01|    4|     Metroline| Metroline|Garage Not Available|          Islington|     Injuries treated ...|   Onboard Injuries|      Passenger|       Male|    Unknown|
|2015|      2015-01-01|    5|   East London|Stagecoach|Garage Not Available|         

In [None]:
# Visualiza as primeiras linhas sem truncate
df_dsa.show(5, truncate = False)

+----+----------------+-----+--------------+----------+--------------------+-------------------+---------------------------------------------------------------+-------------------+---------------+-----------+-----------+
|Year|Date Of Incident|Route|Operator      |Group Name|Bus Garage          |Borough            |Injury Result Description                                      |Incident Event Type|Victim Category|Victims Sex|Victims Age|
+----+----------------+-----+--------------+----------+--------------------+-------------------+---------------------------------------------------------------+-------------------+---------------+-----------+-----------+
|2015|2015-01-01      |1    |London General|Go-Ahead  |Garage Not Available|Southwark          |Injuries treated on scene                                      |Onboard Injuries   |Passenger      |Male       |Child      |
|2015|2015-01-01      |4    |Metroline     |Metroline |Garage Not Available|Islington          |Injuries treated on 

In [None]:
# Colunas do dataset
df_dsa.columns

['Year',
 'Date Of Incident',
 'Route',
 'Operator',
 'Group Name',
 'Bus Garage',
 'Borough',
 'Injury Result Description',
 'Incident Event Type',
 'Victim Category',
 'Victims Sex',
 'Victims Age']

In [None]:
# Tipos de dados
df_dsa.dtypes

[('Year', 'int'),
 ('Date Of Incident', 'date'),
 ('Route', 'string'),
 ('Operator', 'string'),
 ('Group Name', 'string'),
 ('Bus Garage', 'string'),
 ('Borough', 'string'),
 ('Injury Result Description', 'string'),
 ('Incident Event Type', 'string'),
 ('Victim Category', 'string'),
 ('Victims Sex', 'string'),
 ('Victims Age', 'string')]

In [None]:
# Schema
df_dsa.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Date Of Incident: date (nullable = true)
 |-- Route: string (nullable = true)
 |-- Operator: string (nullable = true)
 |-- Group Name: string (nullable = true)
 |-- Bus Garage: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Injury Result Description: string (nullable = true)
 |-- Incident Event Type: string (nullable = true)
 |-- Victim Category: string (nullable = true)
 |-- Victims Sex: string (nullable = true)
 |-- Victims Age: string (nullable = true)



In [None]:
# Faz a inferência do schema ao carregar o dataset
df_dsa = spark.read.csv('TFL_Bus_Safety.csv', header = True, sep = ",", inferSchema = True)
df_dsa.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Date Of Incident: date (nullable = true)
 |-- Route: string (nullable = true)
 |-- Operator: string (nullable = true)
 |-- Group Name: string (nullable = true)
 |-- Bus Garage: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Injury Result Description: string (nullable = true)
 |-- Incident Event Type: string (nullable = true)
 |-- Victim Category: string (nullable = true)
 |-- Victims Sex: string (nullable = true)
 |-- Victims Age: string (nullable = true)



In [None]:
# Lista as colunas
from pyspark.sql.types import *
df_dsa.columns

['Year',
 'Date Of Incident',
 'Route',
 'Operator',
 'Group Name',
 'Bus Garage',
 'Borough',
 'Injury Result Description',
 'Incident Event Type',
 'Victim Category',
 'Victims Sex',
 'Victims Age']

In [None]:
# Definir a lista de colunas e tipos
labels = [
    ('Year', IntegerType()),
    ('Date Of Incident', DateType()),  # Vamos usar StringType temporariamente
    ('Route', StringType()),
    ('Operator', StringType()),
    ('Group Name', StringType()),
    ('Bus Garage', StringType()),
    ('Borough', StringType()),
    ('Injury Result Description', StringType()),
    ('Incident Event Type', StringType()),
    ('Victim Category', StringType()),
    ('Victims Sex', StringType()),
    ('Victims Age', StringType())
]


In [None]:
# Criando o esquema que será passado na leitura do arquivo
schema = StructType([StructField (x[0], x[1], True) for x in labels])
schema

StructType([StructField('Year', IntegerType(), True), StructField('Date Of Incident', DateType(), True), StructField('Route', StringType(), True), StructField('Operator', StringType(), True), StructField('Group Name', StringType(), True), StructField('Bus Garage', StringType(), True), StructField('Borough', StringType(), True), StructField('Injury Result Description', StringType(), True), StructField('Incident Event Type', StringType(), True), StructField('Victim Category', StringType(), True), StructField('Victims Sex', StringType(), True), StructField('Victims Age', StringType(), True)])

In [None]:
# Visualiza os dados
df_dsa.show(truncate = False)

+----+----------------+-----+--------------+-------------+--------------------+--------------------+---------------------------------------------------------------+-------------------+---------------+-----------+-----------+
|Year|Date Of Incident|Route|Operator      |Group Name   |Bus Garage          |Borough             |Injury Result Description                                      |Incident Event Type|Victim Category|Victims Sex|Victims Age|
+----+----------------+-----+--------------+-------------+--------------------+--------------------+---------------------------------------------------------------+-------------------+---------------+-----------+-----------+
|2015|2015-01-01      |1    |London General|Go-Ahead     |Garage Not Available|Southwark           |Injuries treated on scene                                      |Onboard Injuries   |Passenger      |Male       |Child      |
|2015|2015-01-01      |4    |Metroline     |Metroline    |Garage Not Available|Islington           |

In [None]:
df_dsa = df_dsa.withColumnRenamed('Group Name', 'Group_Name') \
               .withColumnRenamed('Bus Garage', 'Bus_Garage') \
               .withColumnRenamed('Victim Category', 'Victim_Category')\
               .withColumnRenamed('Victims Sex', 'Genero')\
               .withColumnRenamed('Victims Age', 'Victims_Age')\
               .withColumnRenamed('Injury Result Description', 'Injury_Result_Description')\
               .withColumnRenamed('Incident Event Type', 'Incident_Event_Type')\
               .withColumnRenamed('Date Of Incident', 'Date')

df_dsa.show(truncate = False)

+----+----------+-----+--------------+-------------+--------------------+--------------------+---------------------------------------------------------------+-------------------+---------------+-------+-----------+
|Year|Date      |Route|Operator      |Group_Name   |Bus_Garage          |Borough             |Injury_Result_Description                                      |Incident_Event_Type|Victim_Category|Genero |Victims_Age|
+----+----------+-----+--------------+-------------+--------------------+--------------------+---------------------------------------------------------------+-------------------+---------------+-------+-----------+
|2015|2015-01-01|1    |London General|Go-Ahead     |Garage Not Available|Southwark           |Injuries treated on scene                                      |Onboard Injuries   |Passenger      |Male   |Child      |
|2015|2015-01-01|4    |Metroline     |Metroline    |Garage Not Available|Islington           |Injuries treated on scene                     

**1. Quantidade de incidentes por gênero**

In [None]:
# Group By por coluna com PySpark
df_dsa.groupBy('Genero').count().show()

+-------+-----+
| Genero|count|
+-------+-----+
| Female|11847|
|Unknown| 3602|
|   Male| 7709|
+-------+-----+



2. Faixa etária esteve mais envolvida nos incidentes

In [None]:
# Group By por coluna com PySpark
df_dsa.groupBy('Victims_Age').count().show()

+-----------+-----+
|Victims_Age|count|
+-----------+-----+
|    Unknown| 7135|
|      Youth|  319|
|      Adult|10754|
|      Child| 2181|
|    Elderly| 2769|
+-----------+-----+



**3. Qual o percentual de incidentes por tipo de evento (Incident Event Type)**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,avg


In [None]:
Event_Type = df_dsa.groupBy('Incident_Event_Type').count()
total_incidents = Event_Type.agg({"count": "sum"}).collect()[0][0]
event_type_percentages = Event_Type.withColumn("Percentage", (col("count") / total_incidents) * 100)
event_type_percentages.show()

+--------------------+-----+--------------------+
| Incident_Event_Type|count|          Percentage|
+--------------------+-----+--------------------+
|Vandalism Hooliga...|   73|   0.315225839882546|
|     Personal Injury| 4596|   19.84627342602988|
|             Assault|  590|   2.547715692201399|
|             Robbery|    3|0.012954486570515587|
|    Onboard Injuries| 6563|  28.340098454097934|
|  Collision Incident| 4166|   17.98946368425598|
|Activity Incident...|  114|  0.4922704896795924|
|                Fire|    6|0.025908973141031175|
|      Slip Trip Fall| 6981|  30.145090249589774|
|Safety Critical F...|   66| 0.28499870455134296|
+--------------------+-----+--------------------+



**4. Evolução de incidentes por mês ao longo do tempo**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [None]:
# Extrair o mês da coluna "YearMonth" e criar uma nova coluna "Month"
MonthYear = df_dsa.withColumn("Date", substring("Date", 1, 7))

MonthYear = MonthYear.withColumn("Month", substring("Date", 6, 2))


MonthYear.show()

+----+-------+-----+--------------+-------------+--------------------+--------------------+-------------------------+-------------------+---------------+-------+-----------+-----+
|Year|   Date|Route|      Operator|   Group_Name|          Bus_Garage|             Borough|Injury_Result_Description|Incident_Event_Type|Victim_Category| Genero|Victims_Age|Month|
+----+-------+-----+--------------+-------------+--------------------+--------------------+-------------------------+-------------------+---------------+-------+-----------+-----+
|2015|2015-01|    1|London General|     Go-Ahead|Garage Not Available|           Southwark|     Injuries treated ...|   Onboard Injuries|      Passenger|   Male|      Child|   01|
|2015|2015-01|    4|     Metroline|    Metroline|Garage Not Available|           Islington|     Injuries treated ...|   Onboard Injuries|      Passenger|   Male|    Unknown|   01|
|2015|2015-01|    5|   East London|   Stagecoach|Garage Not Available|            Havering|     Take

In [None]:
Event_Date = MonthYear.groupBy('Incident_Event_Type','Month', 'Year').count().orderBy( 'Year','Month', ascending = False).show()


+--------------------+-----+----+-----+
| Incident_Event_Type|Month|Year|count|
+--------------------+-----+----+-----+
|  Collision Incident|   09|2018|   70|
|             Assault|   09|2018|    6|
|     Personal Injury|   09|2018|  169|
|      Slip Trip Fall|   09|2018|  239|
|Activity Incident...|   09|2018|    4|
|Safety Critical F...|   09|2018|    2|
|Vandalism Hooliga...|   09|2018|    1|
|Safety Critical F...|   08|2018|    1|
|  Collision Incident|   08|2018|   57|
|      Slip Trip Fall|   08|2018|  224|
|Activity Incident...|   08|2018|    1|
|             Assault|   08|2018|   11|
|     Personal Injury|   08|2018|  195|
|Vandalism Hooliga...|   08|2018|    2|
|      Slip Trip Fall|   07|2018|  275|
|             Assault|   07|2018|    7|
|Activity Incident...|   07|2018|    6|
|  Collision Incident|   07|2018|   73|
|     Personal Injury|   07|2018|  191|
|Vandalism Hooliga...|   07|2018|    1|
+--------------------+-----+----+-----+
only showing top 20 rows



**5. Quando o incidente foi “Collision Incident” em qual mês houve o maior número de incidentes envolvendo pessoas do sexo feminino?**

In [None]:
Collision_Incident = MonthYear.groupBy('Incident_Event_Type','Month', 'Year','Genero').count().orderBy( 'Month','Genero', ascending = False)
# Filtrar apenas os incidentes "Collision Incident"

Collision_Incident_filter = Collision_Incident.filter(col('Incident_Event_Type')=='Collision Incident')

# Filtrar apenas incidentes envolvendo pessoas do sexo feminino

Collision_Incident_female = Collision_Incident_filter.filter(col('Genero') == 'Female')

# Encontrar o mês com o maior número de incidentes

max_incidents_month = Collision_Incident_female.groupBy('Month', 'Year').agg({'count': 'max'}).orderBy('max(count)', ascending=False).first()

# Exibir o resultado
print("Mês com o maior número de incidentes envolvendo mulheres: ")
print("Mês: " , (max_incidents_month['Month']))
print("Ano: " , (max_incidents_month['Year']))
print("Número de Incidentes: " + str(max_incidents_month['max(count)']))


Mês com o maior número de incidentes envolvendo mulheres: 
Mês:  11
Ano:  2016
Número de Incidentes: 63


**6. Qual foi a média de incidentes por mês envolvendo crianças (Child)?**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,avg
from pyspark.sql.window import Window


In [None]:
Victims_Age = MonthYear.groupBy('Victims_Age', 'Month', 'Year').count().orderBy( 'Month','Victims_Age', ascending = False)
Victims_Age_filter = Victims_Age.filter(col('Victims_Age')=='Child')
media_Victims_Age_month = Victims_Age_filter.groupBy('Month', 'Year').agg({'count': 'avg'}).orderBy('avg(count)', ascending=False).first()
print("Mês com o maior número de incidentes envolvendo crianças: ")
print("Mês: " , (media_Victims_Age_month['Month']))
print("Ano: " , (media_Victims_Age_month['Year']))
print("Número de Incidentes: " + str(media_Victims_Age_month['avg(count)']))


Mês com o maior número de incidentes envolvendo crianças: 
Mês:  10
Ano:  2017
Número de Incidentes: 70.0


**Considerando a descrição de incidente como “Injuries treated on scene” (coluna Injury Result Description), qual o total de incidentes de pessoas do sexo masculino e sexo feminino?**



In [None]:
# Functions
from pyspark.sql import functions
print(dir(functions))



In [None]:
# Registra uma Temporary Table

MonthYear.createOrReplaceTempView("temp")

In [None]:
#Select de 15 registros da tabela (SQL ANSI)
spark.sql("select * from temp limit 15").show(truncate = False)

+----+-------+-----+--------------+-------------+--------------------+--------------------+---------------------------------------------------------------+-------------------+---------------+-------+-----------+-----+
|Year|Date   |Route|Operator      |Group_Name   |Bus_Garage          |Borough             |Injury_Result_Description                                      |Incident_Event_Type|Victim_Category|Genero |Victims_Age|Month|
+----+-------+-----+--------------+-------------+--------------------+--------------------+---------------------------------------------------------------+-------------------+---------------+-------+-----------+-----+
|2015|2015-01|1    |London General|Go-Ahead     |Garage Not Available|Southwark           |Injuries treated on scene                                      |Onboard Injuries   |Passenger      |Male   |Child      |01   |
|2015|2015-01|4    |Metroline     |Metroline    |Garage Not Available|Islington           |Injuries treated on scene            

In [None]:
# Where, Group by e Order by
RESULT = spark.sql("""select Injury_Result_Description, Genero, COUNT(*)
         FROM temp
         WHERE Injury_Result_Description = 'Injuries treated on scene'
         GROUP BY Injury_Result_Description, Genero
         ORDER BY Injury_Result_Description, Genero desc
""")

RESULT.show()

+-------------------------+-------+--------+
|Injury_Result_Description| Genero|count(1)|
+-------------------------+-------+--------+
|     Injuries treated ...|Unknown|    2888|
|     Injuries treated ...|   Male|    5632|
|     Injuries treated ...| Female|    8816|
+-------------------------+-------+--------+



**No ano de 2017 em qual mês houve mais incidentes com idosos (Elderly)**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [None]:
Elderly = spark.sql("""select Victims_Age,Year, Month, COUNT(*)
         FROM temp
         WHERE Victims_Age = 'Elderly' and Year = '2017'
         GROUP BY Victims_Age, Victims_Age,Year, Month
         ORDER BY Victims_Age, Victims_Age,Year, Month desc
""")

Elderly.show(1)

+-----------+----+-----+--------+
|Victims_Age|Year|Month|count(1)|
+-----------+----+-----+--------+
|    Elderly|2017|   12|      67|
+-----------+----+-----+--------+
only showing top 1 row





**9.Considerando o Operador qual a distribuição de incidentes ao longo do tempo?**


In [None]:
Operator = spark.sql("""select Operator,Year, COUNT(*) as DIST
         FROM temp
         GROUP BY Operator, Year
         ORDER BY Operator, Year desc
""")

Operator.show()

+--------------------+----+----+
|            Operator|Year|DIST|
+--------------------+----+----+
|      Abellio London|2018| 284|
|      Abellio London|2017| 329|
|      Abellio London|2016| 233|
|      Abellio London|2015| 117|
|        Abellio West|2018|  15|
|        Abellio West|2017|  50|
|        Abellio West|2016|  34|
|        Abellio West|2015|  27|
|Arriva Kent Thame...|2016|  34|
|Arriva Kent Thame...|2015|  73|
| Arriva London North|2018| 737|
| Arriva London North|2017|1026|
| Arriva London North|2016| 656|
| Arriva London North|2015| 789|
| Arriva London South|2018| 362|
| Arriva London South|2017| 470|
| Arriva London South|2016| 429|
| Arriva London South|2015| 482|
|   Arriva The Shires|2017|  72|
|   Arriva The Shires|2016| 107|
+--------------------+----+----+
only showing top 20 rows



**10. Qual o tipo de incidente mais comum com ciclistas?**

In [None]:
Victim_Category = spark.sql("""select Incident_Event_Type, COUNT(*) as DIST
         FROM temp
         WHERE Victim_Category = 'Cyclist'
         GROUP BY Incident_Event_Type
         ORDER BY Incident_Event_Type, DIST DESC
""")

Victim_Category.show()

+-------------------+----+
|Incident_Event_Type|DIST|
+-------------------+----+
| Collision Incident| 256|
|   Onboard Injuries|   4|
|    Personal Injury|   8|
|     Slip Trip Fall|   7|
+-------------------+----+



# Fim