# Projet Final Apache Spark

**Nom Etudiant :ASSINE**  

**Prenom Etudiant:Géraud**  

**Classe :M1 Big Data Analytics**  


## Description
Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Dataset** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données cliquer sur ce [lien](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

## Travail à faire.
L'objectif de ce projet est de comprendre le **SF Fire Dataset** afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquates.

- Créer un repos git (public) et partager le repos avec mon mail (limahin10@gmail.com)
- Ecrire un code lisible et bien indenté 
- N'oublier pas de mettre en commentaire la justification de vos réponses sur les cellules Markdown. 


## Note:
- Le projet est personnel, c'est-à-dire chaque notebook ne concerne qu'un seul étudiant. 
- Deadline : **Samedi 31 Juillet 2021  à 23h 59** (Aucune de dérogation ne sera acceptée)

### Chargement des données

Importation des packages Spark

In [1]:
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.functions.col
import spark.implicits._
import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.functions.{current_date, current_timestamp, datediff}

Intitializing Scala interpreter ...

Spark Web UI available at http://10.0.2.15:4040
SparkContext available as 'sc' (version = 3.1.2, master = local[*], app id = local-1627851467470)
SparkSession available as 'spark'


import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.col
import spark.implicits._
import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.functions.{current_date, current_timestamp, datediff}


Nous allons jeter un coup d'oeil sur la structure des données avant de définir un schéma

In [2]:
!head -1 "datasets/sf-fire/sf-fire-calls.csv"

CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,CallFinalDisposition,AvailableDtTm,Address,City,Zipcode,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,Delay




Vu que la taille de ces données est énormes, inferer le schema pour un très grande volumes de données s'avère un peu couteux. Nous allons ainsi définir un schema pour le Dataset.

In [3]:
val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))

fireSchema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,IntegerType,true), StructField(Battalion,StringType,true), StructField(StationArea,StringType,true), StructField(Box,StringType,true), StructField(OriginalPriority,StringType,true), StructField(Priority,StringType,true), StructField(FinalPriority,IntegerType,true), StructField(ALSUnit,BooleanType,true)...


In [4]:
val sfFireFile = "datasets/sf-fire/sf-fire-calls.csv"
val fireDF = spark
  .read
  .schema(fireSchema)
  .option("header", "true")
  .csv(sfFireFile)

sfFireFile: String = datasets/sf-fire/sf-fire-calls.csv
fireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


Nous allons mettre en cache le Dataframe

In [5]:
fireDF.cache()

res0: fireDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


In [17]:
fireDF.count()

res1: Long = 175296


In [18]:
fireDF.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [113]:
fireDF.show(5)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

Filtrage des d'appels de type "Medical Incident"

In [6]:
val fewFireDF = fireDF
  .select("IncidentNumber", "AvailableDtTm", "CallType") 
  .where($"CallType" =!= "Medical Incident")

fewFireDF.show(5, false)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



fewFireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [IncidentNumber: int, AvailableDtTm: string ... 1 more field]


### Question 1
**Combien de types d'appels distincts ont été passés ?**  
Pour être sûr, il ne faut pas compter les valeurs «nulles» dans la colonne.

In [31]:
// Reponse 1
/*
Ecrire ici votre code
*/
/*Vérifier s' il n y a pas de valeurs nulles dans CallType*/
/*val verif = fireDF.filter("CallType is null")
verif.count*/
/*pas de valeurs nulles dans CallType*/
val CallType = fireDF.select("CallType").distinct()
CallType.count()
/*Donc on a 30 appels distincts passés*/

CallType: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CallType: string]
res15: Long = 30


### Question 2

**Quels types d'appels différents ont été passés au service d'incendie?**

In [103]:
// Reponse 2
/*
Ecrire ici votre code
*/
fireDF.select("CallType").distinct.show

+--------------------+
|            CallType|
+--------------------+
|Elevator / Escala...|
|         Marine Fire|
|  Aircraft Emergency|
|Confined Space / ...|
|      Administrative|
|              Alarms|
|Odor (Strange / U...|
|Citizen Assist / ...|
|              HazMat|
|Watercraft in Dis...|
|           Explosion|
|           Oil Spill|
|        Vehicle Fire|
|  Suspicious Package|
|Extrication / Ent...|
|               Other|
|        Outside Fire|
|   Traffic Collision|
|       Assist Police|
|Gas Leak (Natural...|
+--------------------+
only showing top 20 rows



### Question 3

**Trouver toutes les réponses ou les délais sont supérieurs à 5 minutes?**

*Indication
1. Renommer la colonne Delay -> ReponseDelayedinMins
2. Retourner un nouveau DataFrame
3. Afficher tous les appels où le temps de réponse à un site d'incendie a eu lieu après un retard de plus de 5 minutes

In [7]:
// Reponse 3
val newFireDF = fireDF.withColumnRenamed("Delay", "ResponseDelayedinMins")
/*
Completer le code
*/
/*2*/
newFireDF.show
/*3*/
newFireDF.select("CallType","ResponseDelayedinMins").
where($"ResponseDelayedinMins" > 5).show


+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------------------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|ResponseDelayedinMins|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+--

newFireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


### Transformations des dates  
Maintenant nous allons d'abord:  
1. Transformer les dates de type String en Spark Timestamp afin que nous puissions effectuer des requêtes basées sur la date plus tard    
2. Retourner le Dataframe transformée  
3. Mettre en cache le nouveau DataFrame  

In [8]:
val fireTSDF = newFireDF
  .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate") 
  .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate") 
  .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm")

fireTSDF.show
fireTSDF.cache()

+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------------------+-------------------+-------------------+-------------------+
|CallNumber|UnitID|IncidentNumber|        CallType|CallFinalDisposition|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|ResponseDelayedinMins|       IncidentDate|        OnWatchDate|      AvailableDtTS|
+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+-----------

fireTSDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]
res3: fireTSDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


### Question 4
**Quels sont les types d'appels les plus courants?**

In [23]:
//Reponse 4
/*
Ecrire ici votre code
Pour connaître les appels les plus courants j'ai considéré les
appels reçus qui sont supérieurs à 1000.
*/
val appCourants = fireTSDF.groupBy($"CallType").count.
where($"count" > 1000)

appCourants.show

+--------------------+------+
|            CallType| count|
+--------------------+------+
|              Alarms| 19406|
|Citizen Assist / ...|  2524|
|               Other|  2166|
|        Outside Fire|  2094|
|   Traffic Collision|  7013|
|      Structure Fire| 23319|
|    Medical Incident|113794|
+--------------------+------+



appCourants: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CallType: string, count: bigint]


### Question 5-a
**Quels sont boites postaux rencontrés dans les appels les plus courants?**

In [26]:
//Reponse 5-a
/*
Ecrire ici votre code
*/
val bpAppCourants = fireTSDF.groupBy($"CallType",$"Zipcode")
.count.where($"count" > 1000)

bpAppCourants.show

+----------------+-------+-----+
|        CallType|Zipcode|count|
+----------------+-------+-----+
|Medical Incident|  94110| 9995|
|  Structure Fire|  94103| 2221|
|  Structure Fire|  94115| 1023|
|  Structure Fire|  94110| 2267|
|Medical Incident|  94109| 9479|
|  Structure Fire|  94124| 1590|
|Medical Incident|  94114| 3225|
|  Structure Fire|  94109| 2160|
|Medical Incident|  94102|16130|
|  Structure Fire|  94112| 1432|
|Medical Incident|  94116| 2738|
|          Alarms|  94105| 1015|
|Medical Incident|  94108| 2162|
|Medical Incident|  94112| 5630|
|Medical Incident|  94127| 1206|
|Medical Incident|  94118| 3104|
|Medical Incident|  94132| 2594|
|Medical Incident|  94107| 4284|
|Medical Incident|  94124| 5885|
|Medical Incident|  94103|14775|
+----------------+-------+-----+
only showing top 20 rows



bpAppCourants: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CallType: string, Zipcode: int ... 1 more field]


### Question 5-b
**Quels sont les quartiers de San Francisco dont les codes postaux sont 94102 et 94103?**

In [41]:
//Reponse 5-b
/*
Ecrire ici votre code
*/
fireTSDF.select("City","Zipcode").
filter($"Zipcode" === 94102 or $"Zipcode" === 94103).
where($"City" === "San Francisco").show()


+-------------+-------+
|         City|Zipcode|
+-------------+-------+
|San Francisco|  94103|
|San Francisco|  94103|
|San Francisco|  94103|
|San Francisco|  94103|
|San Francisco|  94103|
|San Francisco|  94102|
|San Francisco|  94103|
|San Francisco|  94103|
|San Francisco|  94103|
|San Francisco|  94103|
|San Francisco|  94103|
|San Francisco|  94103|
|San Francisco|  94102|
|San Francisco|  94102|
|San Francisco|  94103|
|San Francisco|  94102|
|San Francisco|  94102|
|San Francisco|  94103|
|San Francisco|  94102|
|San Francisco|  94103|
+-------------+-------+
only showing top 20 rows



### Question 6
**Determiner le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?**

In [251]:
//Reponse 6
/*
Ecrire ici votre code
*/
/*Nombre total d'appels
On a calculé le nombre d'appels dans chaque service*/

fireTSDF.groupBy("CallType").count.show

/*Moyenne du temps de réponse*/
 fireTSDF.select(avg($"ResponseDelayedinMins")).show
/*Minimum du temps de réponse*/
fireTSDF.select(min($"ResponseDelayedinMins")).show
/*Maximum du temps de réponse*/
fireTSDF.select(max($"ResponseDelayedinMins")).show


+--------------------+-----+
|            CallType|count|
+--------------------+-----+
|Elevator / Escala...|  453|
|         Marine Fire|   14|
|  Aircraft Emergency|   36|
|Confined Space / ...|   13|
|      Administrative|    3|
|              Alarms|19406|
|Odor (Strange / U...|  490|
|Citizen Assist / ...| 2524|
|              HazMat|  124|
|Watercraft in Dis...|   28|
|           Explosion|   89|
|           Oil Spill|   21|
|        Vehicle Fire|  854|
|  Suspicious Package|   15|
|Extrication / Ent...|   28|
|               Other| 2166|
|        Outside Fire| 2094|
|   Traffic Collision| 7013|
|       Assist Police|   35|
|Gas Leak (Natural...|  764|
+--------------------+-----+
only showing top 20 rows

+--------------------------+
|avg(ResponseDelayedinMins)|
+--------------------------+
|         3.892364154521585|
+--------------------------+

+--------------------------+
|min(ResponseDelayedinMins)|
+--------------------------+
|               0.016666668|
+---------------

### Question 7-a
**Combien d'années distinctes trouve t-on dans ce Dataset?**  
Dans ce dataset nous avons des données comprises entre 2000-2018. Vous pouvez utilisez la fonction Spark `year()` pour les dates en Timestamp

In [42]:
//Reponse 7-a
/*
Ecrire ici votre code
*/
val anneeDist= fireTSDF.select(year(col("IncidentDate"))).distinct
/*anneeDist.show*/
anneeDist.count

anneeDist: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [year(IncidentDate): int]
res26: Long = 19


### Question 7-b
**Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?**

In [56]:
//Reponse 7-b
/*
Ecrire ici votre code
Ici donc la variable maximale qui a enregistré le plus 
d'appels d'incendie est Unable to Locate "Non localisé"
*/
fireTSDF.select(weekofyear(col("IncidentDate")).as("2018")).
filter($"CallFinalDisposition" === "Fire" and 
       $"CallFinalDisposition" =!= "Unable to Locate").show


+----+
|2018|
+----+
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  19|
|  19|
|  19|
|  19|
|  19|
|  19|
|  19|
|  19|
|  19|
+----+
only showing top 20 rows



### Question 8
**Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?**

In [44]:
//Reponse 8
/*
Ecrire ici votre code
Le pire temps de réponse est celui qui est inférieur 
à celui de la moyenne 
*/
fireTSDF.select("City","IncidentDate","ResponseDelayedinMins").
filter($"City" === "San Francisco" and $"ResponseDelayedinMins"
      < 3.892364154521585 and $"IncidentDate" === "2018").show


+-------------+-------------------+---------------------+
|         City|       IncidentDate|ResponseDelayedinMins|
+-------------+-------------------+---------------------+
|San Francisco|2018-01-01 00:00:00|            1.7166667|
|San Francisco|2018-01-01 00:00:00|                 2.95|
|San Francisco|2018-01-01 00:00:00|                  1.1|
|San Francisco|2018-01-01 00:00:00|            1.7833333|
|San Francisco|2018-01-01 00:00:00|            1.5333333|
|San Francisco|2018-01-01 00:00:00|            3.3166666|
|San Francisco|2018-01-01 00:00:00|            2.3666666|
|San Francisco|2018-01-01 00:00:00|                 3.65|
|San Francisco|2018-01-01 00:00:00|            2.0166667|
|San Francisco|2018-01-01 00:00:00|                  2.3|
|San Francisco|2018-01-01 00:00:00|                  1.2|
|San Francisco|2018-01-01 00:00:00|                 2.15|
|San Francisco|2018-01-01 00:00:00|            3.0166667|
|San Francisco|2018-01-01 00:00:00|            1.2333333|
|San Francisco

### Question 9

**Comment stocker les données du Dataframe sous format de fichiers Parquet?**

In [15]:
//Reponse 9
/*
Ecrire ici votre code
les données du dataframe fireTSDF sont réécrites en format parquet
avec toujours le même shéma compilé précédement.
*/
fireTSDF.write.parquet("sf-fire-calls.parquet")



org.apache.spark.sql.AnalysisException:  path file:/home/assine/advanced_functional_programming/sf-fire-calls.parquet already exists.

### Question 10
**Comment relire les données stockée en format Parquet?**

In [16]:
//Reponse 10
/*
Ecrire ici votre code
Nous allons maintenant chargé le dataframe en format parquet avec
les commandes ci-dessous.
*/

val parquetDFsf_firecalls = spark.read.parquet("sf-fire-calls.parquet")
parquetDFsf_firecalls.show(10)

+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------------------+-------------------+-------------------+-------------------+
|CallNumber|UnitID|IncidentNumber|        CallType|CallFinalDisposition|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|ResponseDelayedinMins|       IncidentDate|        OnWatchDate|      AvailableDtTS|
+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+------

parquetDFsf_firecalls: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


## FIN