## Goals of this project:

We're going to use our *beloved Transport for London data set*; in particular, we'll use the *Accident Statistics data already ingested*, and the "*business questions" we want to answer* is:
1. "***Which are the top 5 boroughs with the highest number of fatal accidents?***".
2. "***Which age group is involved in the most accidents??***".
3. "***For casualties in road accidents, what are the 3 most common modes of transportation?***".

By answering these questions, we hope to get some valuable insights regarding accidents that happened in London in 2019.

## 1. Setting up the SparkSession

#### 1.2 Search for Spark Installation 

In [3]:
import findspark
findspark.init()

In [4]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

#### 1.3 Create SparkSession

In [5]:
from pyspark.sql.session import SparkSession

spark = SparkSession.builder\
                    .appName("Transport for London - Accidents analysis - Group Assignement")\
                    .getOrCreate()

print(f"This cluster relies on Spark '{spark.version}'")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


This cluster relies on Spark '3.2.1'


## 2. Creating the dataframe from data stored in Hadoop

In [7]:
from  pyspark.sql.functions import input_file_name

# DataFrame creation
Accidents2019InfoRawDF = spark.read.json("hdfs://localhost:9000/datalake/raw/tfl/accidentstats/2022/12/*/*/*")

# DataFrame updated with an additional column containing the filename contributing to the data in every row.
Accidents2019InfoRawDF = Accidents2019InfoRawDF.withColumn("filename_path", input_file_name())

#Showing the inferred schema
Accidents2019InfoRawDF.printSchema()

                                                                                

root
 |-- $type: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- casualties: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- $type: string (nullable = true)
 |    |    |-- age: long (nullable = true)
 |    |    |-- ageBand: string (nullable = true)
 |    |    |-- class: string (nullable = true)
 |    |    |-- mode: string (nullable = true)
 |    |    |-- severity: string (nullable = true)
 |-- date: string (nullable = true)
 |-- id: long (nullable = true)
 |-- lat: double (nullable = true)
 |-- location: string (nullable = true)
 |-- lon: double (nullable = true)
 |-- severity: string (nullable = true)
 |-- vehicles: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- $type: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- filename_path: string (nullable = false)



In [8]:
#Checking the first 10 rows of the dataset and import pyspark functions
from  pyspark.sql.functions import col, explode, min, max, avg
Accidents2019InfoRawDF.show(10, 10, False)

                                                                                

+----------+----------+----------+----------+------+---------+----------+---------+--------+----------+-------------+
|     $type|   borough|casualties|      date|    id|      lat|  location|      lon|severity|  vehicles|filename_path|
+----------+----------+----------+----------+------+---------+----------+---------+--------+----------+-------------+
|Tfl.Api...|   Croydon|[{Tfl.A...|2019-04...|353158|51.329752|On Limp...|-0.074149|  Slight|[{Tfl.A...|   hdfs://...|
|Tfl.Api...|    Newham|[{Tfl.A...|2019-04...|353160|51.531924|On Bark...|    0.048| Serious|[{Tfl.A...|   hdfs://...|
|Tfl.Api...|Barking...|[{Tfl.A...|2019-04...|353162|51.523658|On Tham...| 0.153556|  Slight|[{Tfl.A...|   hdfs://...|
|Tfl.Api...|Hammers...|[{Tfl.A...|2019-04...|353164|51.475006|On Harb...|-0.182532| Serious|[{Tfl.A...|   hdfs://...|
|Tfl.Api...| Islington|[{Tfl.A...|2019-04...|353166|51.529224|On City...|-0.095414|  Slight|[{Tfl.A...|   hdfs://...|
|Tfl.Api...|   Lambeth|[{Tfl.A...|2019-04...|353168|51.4

## 3. Getting Insights from the Data

### 3.1 Q1: Which are the top 5 boroughs with the highest number of fatal accidents?

In [9]:
# top 5 boroughs with fatal severity rate
Accidents2019InfoRawDF.where(col("severity") == "Fatal")\
                      .select("borough", "severity")\
                      .groupBy("borough", "severity")\
                      .count()\
                      .orderBy(col("count").desc())\
                      .limit(5)\
                      .toPandas()

                                                                                

Unnamed: 0,borough,severity,count
0,Merton,Fatal,48
1,Croydon,Fatal,42
2,Wandsworth,Fatal,42
3,Bromley,Fatal,36
4,Brent,Fatal,36


### 3.2 Q2: Which age group is involved in the most accidents?

#### 3.2.1 Flattening the age_info array

In [10]:
#Flattening the array in casualties -> each age is on one row
Accidents2019Casualties = Accidents2019InfoRawDF.withColumn("age_info", explode("casualties.age"))\
                                            .select("id", "date", "borough", "severity", "age_info",)
Accidents2019Casualties.show(5) #show first 5 to check if correct

+------+--------------------+--------------------+--------+--------+
|    id|                date|             borough|severity|age_info|
+------+--------------------+--------------------+--------+--------+
|353158|2019-04-28T18:30:00Z|             Croydon|  Slight|      59|
|353160|2019-04-28T22:55:00Z|              Newham| Serious|      26|
|353160|2019-04-28T22:55:00Z|              Newham| Serious|      53|
|353162|2019-04-29T06:42:00Z|Barking and Dagenham|  Slight|      23|
|353164|2019-04-29T09:24:00Z|Hammersmith and F...| Serious|      43|
+------+--------------------+--------------------+--------+--------+
only showing top 5 rows



#### 3.2.2 Creating a new column age category

In [11]:
from pyspark.sql.functions import when

#Creating a new column "age category" with conditions
Accidents2019CasualtiesbyAge = Accidents2019Casualties\
.withColumn("age_category", when((col("age_info")<30), "Young Person") # People under 30 years old\
                           .when((col("age_info")>30) & (col("age_info")<60), "Mid-Aged Person") # people between 30-60 years\
                           .when((col("age_info")>60), "Old Person")) # people above 60 years old
Accidents2019CasualtiesbyAge.show(5) # Showing the first 5 results to see if correctly inputed

+------+--------------------+--------------------+--------+--------+---------------+
|    id|                date|             borough|severity|age_info|   age_category|
+------+--------------------+--------------------+--------+--------+---------------+
|353158|2019-04-28T18:30:00Z|             Croydon|  Slight|      59|Mid-Aged Person|
|353160|2019-04-28T22:55:00Z|              Newham| Serious|      26|   Young Person|
|353160|2019-04-28T22:55:00Z|              Newham| Serious|      53|Mid-Aged Person|
|353162|2019-04-29T06:42:00Z|Barking and Dagenham|  Slight|      23|   Young Person|
|353164|2019-04-29T09:24:00Z|Hammersmith and F...| Serious|      43|Mid-Aged Person|
+------+--------------------+--------------------+--------+--------+---------------+
only showing top 5 rows



#### 3.2.3 Answering the business question

In [12]:
# Counting for the number of casualties by age category
Accidents2019CasualtiesbyAge.where(col("age_category")!="None")\
                            .select("age_category")\
                            .groupBy("age_category")\
                            .count()\
                            .toPandas()

                                                                                

Unnamed: 0,age_category,count
0,Mid-Aged Person,86094
1,Young Person,65508
2,Old Person,13620


### 3.3 Q3: For casualties in road accidents, what are the 3 most common modes of transportation?

In [13]:
# Getting the most common mode of transportation for all casualties
accidentInfoRawDF = Accidents2019InfoRawDF.withColumn("mode_info", explode("casualties.mode"))\
    .select("mode_info")\
    .groupBy("mode_info")\
    .count()\
    .orderBy(col("count").desc())\
    .limit(3)\
    .toPandas()
accidentInfoRawDF

                                                                                

Unnamed: 0,mode_info,count
0,Car,68742
1,Pedestrian,33972
2,PoweredTwoWheeler,32346
