**Nom Etudiant : NDIAYE**  

**Prenom Etudiant:Djibril**  

**Classe : Master 1 IA**  


## Description
Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Dataset** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données cliquer sur ce [lien](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

## Travail à faire.
L'objectif de ce projet est de comprendre le **SF Fire Dataset** afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquates.

- Créer un repos git (public) et partager le repos avec mon mail (limahin10@gmail.com)
- Ecrire un code lisible et bien indenté 
- N'oublier pas de mettre en commentaire la justification de vos réponses sur les cellules Markdown. 


## Note:
- Le projet est personnel, c'est-à-dire chaque notebook ne concerne qu'un seul étudiant. 
- Deadline : **Samedi 31 Juillet 2021  à 23h 59** (Aucune de dérogation ne sera acceptée)

### Chargement des données

Importation des packages Spark

In [None]:
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._ 
import spark.implicits._

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.1.17:4040
SparkContext available as 'sc' (version = 3.1.2, master = local[*], app id = local-1627836042385)
SparkSession available as 'spark'


import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._


Nous allons jeter un coup d'oeil sur la structure des données avant de définir un schéma

In [None]:
!head -1 "datasets/sf-fire-calls.csv"

﻿Call Number,Unit ID,Incident Number,Call Type,Call Date,Watch Date,Received DtTm,Entry DtTm,Dispatch DtTm,Response DtTm,On Scene DtTm,Transport DtTm,Hospital DtTm,Call Final Disposition,Available DtTm,Address,City,Zipcode of Incident,Battalion,Station Area,Box,Original Priority,Priority,Final Priority,ALS Unit,Call Type Group,Number of Alarms,Unit Type,Unit sequence in call dispatch,Fire Prevention District,Supervisor District,Neighborhooods - Analysis Boundaries,RowID,case_location,Analysis Neighborhoods



Vu que la taille de ces données est énormes, inferer le schema pour un très grande volumes de données s'avère un peu couteux. Nous allons ainsi définir un schema pour le Dataset.

In [None]:
val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))

fireSchema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,IntegerType,true), StructField(Battalion,StringType,true), StructField(StationArea,StringType,true), StructField(Box,StringType,true), StructField(OriginalPriority,StringType,true), StructField(Priority,StringType,true), StructField(FinalPriority,IntegerType,true), StructField(ALSUnit,BooleanType,true)...


In [None]:
val sfFireFile = "datasets/sf-fire-calls.csv"
val fireDF = spark
  .read
  .schema(fireSchema)
  .option("header", "true")
  .csv(sfFireFile)

sfFireFile: String = datasets/sf-fire-calls.csv
fireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


Nous allons mettre en cache le Dataframe

In [None]:
fireDF.cache()

res0: fireDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


In [None]:
fireDF.count()

res1: Long = 5616299


In [None]:
fireDF.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [None]:
fireDF.show(5)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+--------------------+-----+--------------------+--------------------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+------------+--------+-----+-----+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|                City|Zipcode|           Battalion|         StationArea|  Box|    OriginalPriority|            Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|Neighborhood|Location|RowID|Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+--------------------+-------+-------

Filtrage des d'appels de type "Medical Incident"

In [None]:
val fewFireDF = fireDF
  .select("IncidentNumber", "AvailableDtTm", "CallType") 
  .where($"CallType" =!= "Medical Incident")

fewFireDF.show(5, false)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|21030278      |03/10/2021 12:18:36 AM|Alarms        |
|21030264      |03/09/2021 11:08:23 PM|Structure Fire|
|9050618       |06/20/2009 02:02:42 PM|Other         |
|5006338       |01/24/2005 07:04:51 AM|Alarms        |
|10025262      |03/22/2010 08:16:27 PM|Structure Fire|
+--------------+----------------------+--------------+
only showing top 5 rows



fewFireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [IncidentNumber: int, AvailableDtTm: string ... 1 more field]


### Question 1
**Combien de types d'appels distincts ont été passés ?**  
Pour être sûr, il ne faut pas compter les valeurs «nulles» dans la colonne.

In [None]:
val typA = fewFireDF.select($"CallType").distinct().count()

typA: Long = 31


### Question 2

**Quels types d'appels différents ont été passés au service d'incendie?**

In [None]:
val result = fewFireDF.
             select($"CallType").
             distinct().
             show(31)

+--------------------+
|            CallType|
+--------------------+
|         Marine Fire|
|Elevator / Escala...|
|  Aircraft Emergency|
|Confined Space / ...|
|      Administrative|
|              Alarms|
|Odor (Strange / U...|
|Lightning Strike ...|
|Citizen Assist / ...|
|              HazMat|
|Watercraft in Dis...|
|           Explosion|
|           Oil Spill|
|        Vehicle Fire|
|  Suspicious Package|
|   Train / Rail Fire|
|Extrication / Ent...|
|               Other|
|        Outside Fire|
|   Traffic Collision|
|       Assist Police|
|Gas Leak (Natural...|
|        Water Rescue|
|   Electrical Hazard|
|   High Angle Rescue|
|      Structure Fire|
|Industrial Accidents|
|Mutual Aid / Assi...|
|          Fuel Spill|
|Smoke Investigati...|
|Train / Rail Inci...|
+--------------------+



result: Unit = ()


### Question 3

**Trouver toutes les réponses ou les délais sont supérieurs à 5 minutes?**

*Indication
1. Renommer la colonne Delay -> ReponseDelayedinMins
2. Retourner un nouveau DataFrame
3. Afficher tous les appels où le temps de réponse à un site d'incendie a eu lieu après un retard de plus de 5 minutes

In [None]:
//reponse 3
val newFireDF = fireDF.withColumnRenamed("Delay", "ResponseDelayedinMins")
val typA = newFireDF
         .select("ResponseDelayedinMins", "AvailableDtTm","CallType","Neighborhood")
         .filter($"ResponseDelayedinMins" > 5)
         .show()


+---------------------+-------------+--------+------------+
|ResponseDelayedinMins|AvailableDtTm|CallType|Neighborhood|
+---------------------+-------------+--------+------------+
+---------------------+-------------+--------+------------+



newFireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]
typA: Unit = ()


### Transformations des dates  
Maintenant nous allons d'abord:  
1. Transformer les dates de type String en Spark Timestamp afin que nous puissions effectuer des requêtes basées sur la date plus tard    
2. Retourner le Dataframe transformée  
3. Mettre en cache le nouveau DataFrame  

In [None]:
val fireTSDF = newFireDF
  .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate") 
  .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate") 
  .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm")

fireTSDF.cache()

fireTSDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]
res5: fireTSDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


### Question 4
**Quels sont les types d'appels les plus courants?**

In [None]:
//Reponse 4
val appelCourant = fireTSDF.select("CallType").
                  groupBy("CallType").count().
                  orderBy(desc("count"))
//nous allons afficher les quinze types d'appel les plus courants
appelCourant.show(15)

+--------------------+-------+
|            CallType|  count|
+--------------------+-------+
|    Medical Incident|3675857|
|      Structure Fire| 689143|
|              Alarms| 612721|
|   Traffic Collision| 229746|
|               Other|  89850|
|Citizen Assist / ...|  83835|
|        Outside Fire|  70407|
|        Water Rescue|  29380|
|        Vehicle Fire|  25947|
|Gas Leak (Natural...|  23674|
|   Electrical Hazard|  17568|
|Elevator / Escala...|  15227|
|Odor (Strange / U...|  13058|
|Smoke Investigati...|  12784|
|          Fuel Spill|   6326|
+--------------------+-------+
only showing top 15 rows



appelCourant: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CallType: string, count: bigint]


### Question 5-a
**Quels sont boites postaux rencontrés dans les appels les plus courants?**

In [None]:
//Reponse 5-a
val appelCourant = fireTSDF.
                  groupBy($"CallType",$"Box").count().
                  orderBy(desc("count"))
//nous allons afficher les quinzes Box les plus courants
appelCourant.show(15)


+-----------------+--------------------+-------+
|         CallType|                 Box|  count|
+-----------------+--------------------+-------+
| Medical Incident|    Code 2 Transport|1519805|
| Medical Incident|               Other|1475199|
|   Structure Fire|               Other| 455111|
|           Alarms|               Other| 320188|
|           Alarms|                Fire| 287035|
|   Structure Fire|                Fire| 220168|
| Medical Incident|Patient Declined ...| 169444|
| Medical Incident|            No Merit| 148419|
| Medical Incident|    Code 3 Transport| 121967|
|Traffic Collision|               Other| 100151|
|Traffic Collision|    Code 2 Transport|  63609|
| Medical Incident|Against Medical A...|  46814|
| Medical Incident|    Unable to Locate|  45398|
| Medical Incident|           Cancelled|  45017|
|            Other|                Fire|  39389|
+-----------------+--------------------+-------+
only showing top 15 rows



appelCourant: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CallType: string, Box: string ... 1 more field]


### Question 5-a
**Quels sont les quartiers de San Francisco dont les codes postaux sont 94102 et 94103?**

In [None]:
//Reponse 5-b
val fewFireDF = fireDF.select("Neighborhood", "ZipCode").
                where($"ZipCode" === 94102 || $"ZipCode" === 94103 && $"City" === "San Francisco").
                distinct()
fewFireDF.show()

+------------+-------+
|Neighborhood|ZipCode|
+------------+-------+
+------------+-------+



fewFireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Neighborhood: string, ZipCode: int]


### Question 6
**Determiner le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?**

In [None]:
//Reponse 6
// nous allonsla fonction describe pour déterminer le nombre total d'appel ainsi que la moyenne sur les temps de reponses
fireDF.select("Delay").describe().show()

+-------+-----+
|summary|Delay|
+-------+-----+
|  count|    0|
|   mean| null|
| stddev| null|
|    min| null|
|    max| null|
+-------+-----+



### Question 7-a
**Combien d'années distinctes trouve t-on dans ce Dataset?**  
Dans ce dataset nous avons des données comprises entre 2000-2018. Vous pouvez utilisez la fonction Spark `year()` pour les dates en Timestamp

In [None]:
//Reponse 7-a
val annees = fireTSDF.select(year($"IncidentDate")).distinct
annees.show()

+------------------+
|year(IncidentDate)|
+------------------+
|              2003|
|              2007|
|              2018|
|              2015|
|              2006|
|              2013|
|              2014|
|              2019|
|              2004|
|              2020|
|              2012|
|              2009|
|              2016|
|              2001|
|              2005|
|              2000|
|              2010|
|              2011|
|              2008|
|              2017|
+------------------+
only showing top 20 rows



annees: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [year(IncidentDate): int]


In [None]:
// on calcul le nombre d'anne distinc avec count
val nbrAnneDistinct = annees.count()
print(s"le nombre d'années distinctes est: $nbrAnneDistinct")

le nombre d'années distinctes est: 22

nbrAnneDistinct: Long = 22


### Question 7-b
**Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?**

In [None]:
// 7-b
val week2018Enreg = fireTSDF.select(year($"IncidentDate").as("date"),
                weekofyear(col("IncidentDate")).as("weekofyear")).
                filter($"date"==="2018")
week2018Enreg.show()


+----+----------+
|date|weekofyear|
+----+----------+
|2018|        32|
|2018|         3|
|2018|         2|
|2018|         6|
|2018|        16|
|2018|        18|
|2018|        15|
|2018|        31|
|2018|        25|
|2018|        43|
|2018|        44|
|2018|        37|
|2018|        18|
|2018|        30|
|2018|        23|
|2018|        48|
|2018|        30|
|2018|        16|
|2018|        37|
|2018|         3|
+----+----------+
only showing top 20 rows



week2018Enreg: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [date: int, weekofyear: int]


In [None]:
/*Nous allons les enregistrement par les differentes semaine avec groupby-> count-> orderby

afin de recuperer les enregistrements des semaines les plus courant*/
val groupeByWeek  = week2018Enreg.groupBy($"weekofyear").
            count().orderBy(desc("count"))
groupeByWeek.show(10)


+----------+-----+
|weekofyear|count|
+----------+-----+
|         1| 7545|
|        25| 6425|
|        49| 6354|
|        22| 6328|
|        13| 6321|
|        27| 6289|
|        40| 6252|
|        44| 6250|
|        16| 6217|
|        46| 6209|
+----------+-----+
only showing top 10 rows



groupeByWeek: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [weekofyear: int, count: bigint]


In [None]:
/*on voit clairement que la semaine 1 à plus d'appels 
donc on recupere cette premiere enregistrement avec la fonction take et map*/
groupeByWeek.take(1)
.map(a => println(s"la semaine ${a(0)} a plus d'appel avec: ${a(1)}   enregistrements"))


la semaine 1 a plus d'appel avec: 7545   enregistrements


res16: Array[Unit] = Array(())


### Question 8
**Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?**

In [None]:
//Nous allons tout dabord recupérer les enregistrements de cette année les quartiers et les delai

val enreg2018 = fireTSDF.select(year($"IncidentDate").
                as("date"),$"ResponseDelayedinMins",$"Neighborhood").
                filter($"date"==="2018")
enreg2018.show(10)

+----+---------------------+------------+
|date|ResponseDelayedinMins|Neighborhood|
+----+---------------------+------------+
|2018|                 null|       false|
|2018|                 null|        true|
|2018|                 null|       false|
|2018|                 null|        true|
|2018|                 null|        true|
|2018|                 null|       false|
|2018|                 null|        true|
|2018|                 null|        true|
|2018|                 null|       false|
|2018|                 null|       false|
+----+---------------------+------------+
only showing top 10 rows



enreg2018: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [date: int, ResponseDelayedinMins: float ... 1 more field]


In [None]:
/*on regroupe les enrégistrements par quartier puis on fait la sommes totales de leurs délai qui représente le temps de réponse grace aux fonction groupby agg sum et orderBy(pour trier par ordre decroissant) et on recupere les 10 quartiers
ou il y a plus de temps de réponse*/

In [None]:
// suite Reponse 8
val neighborhood = enreg2018
         .groupBy($"Neighborhood")
         .agg(sum($"ResponseDelayedinMins").as("delai_total"))
         .orderBy(desc("delai_total"))

neighborhood.show(10)

+------------+-----------+
|Neighborhood|delai_total|
+------------+-----------+
|       false|       null|
|        true|       null|
+------------+-----------+



neighborhood: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Neighborhood: string, delai_total: double]


### Question 9

**Comment stocker les données du Dataframe sous format de fichiers Parquet?**

In [None]:
//Reponse 9
fewFireDF.write.parquet("fewFireDF.parquet")

### Question 10
**Comment relire les données stockée en format Parquet?**

In [None]:
//Reponse 10
spark.read.parquet("fewFireDF.parquet").show()

+------------+-------+
|Neighborhood|ZipCode|
+------------+-------+
+------------+-------+



## FIN