# Projet Final Apache Spark

**Nom Etudiant :**  

**Prenom Etudiant:**  

**Classe :**  


## Description
Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Dataset** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données cliquer sur ce [lien](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

## Travail à faire.
L'objectif de ce projet est de comprendre le **SF Fire Dataset** afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquates.

- Créer un repos git (public ou privé) et partager le repos avec mon mail (limahin10@gmail.com)
- Ecrire un code lisible et bien indenté 
- N'oublier pas de mettre en commentaire la justification de vos réponses sur les cellules Markdown. 


## Note:
- Le projet est personnel, c'est-à-dire chaque notebook ne concerne qu'un seul étudiant. 
- Deadline : **Jeudi 10 janvier 2021** (Aucune de dérogation ne sera acceptée)

### Chargement des données

Importation des packages Spark 

In [5]:
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._ 
import spark.implicits._

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._


Nous allons jeter un coup d'oeil sur la structure des données avant de définir un schéma

In [2]:
!head -1 "datasets/sf-fire/sf-fire-calls.csv"

CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,CallFinalDisposition,AvailableDtTm,Address,City,Zipcode,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,Delay



Vu que la taille de ces données est énormes, inferer le schema pour un très grande volumes de données s'avère un peu couteux. Nous allons ainsi définir un schema pour le Dataset.

In [6]:
val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))

fireSchema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,IntegerType,true), StructField(Battalion,StringType,true), StructField(StationArea,StringType,true), StructField(Box,StringType,true), StructField(OriginalPriority,StringType,true), StructField(Priority,StringType,true), StructField(FinalPriority,IntegerType,true), StructField(ALSUnit,BooleanType,true)...


In [8]:
val sfFireFile = "datasets/sf-fire/sf-fire-calls.csv"
val fireDF = spark
  .read
  .schema(fireSchema)
  .option("header", "true")
  .csv(sfFireFile)

sfFireFile: String = datasets/sf-fire/sf-fire-calls.csv
fireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


Nous allons mettre en cache le Dataframe

In [9]:
fireDF.cache()

res4: fireDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


In [10]:
fireDF.count()

res5: Long = 175296


In [11]:
fireDF.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [12]:
fireDF.show(5)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

Filtrage des d'appels de type "Medical Incident"

In [13]:
val fewFireDF = fireDF
  .select("IncidentNumber", "AvailableDtTm", "CallType") 
  .where($"CallType" =!= "Medical Incident")

fewFireDF.show(5, false)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



fewFireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [IncidentNumber: int, AvailableDtTm: string ... 1 more field]


### Question 1
**Combien de types d'appels distincts ont été passés ?**  
Pour être sûr, il ne faut pas compter les valeurs «nulles» dans la colonne.

In [18]:
// Reponse 1
/*Sélectionner d'abord tous les types d'appels non nuls avec select et where,
puis éliminer les doublons avec distinct() et pour finir appliquer count pour dénombrer*/

val nombre_type_appels_distinct = fireDF
    .select("CallType")
    .where(col("CallType") =!= "")
    .distinct().count

//affichage du nombre de types d'appels
print(nombre_type_appels_distinct)

30

nombre_type_appels_distinct: Long = 30


### Question 2

**Quels types d'appels différents ont été passés au service d'incendie?**

In [19]:
// Reponse 2
/*Sélectionner d'abord tous les types d'appels non nuls avec select et where, 
puis éliminer les doublons avec distinct() et afficher le resultat*/
val type_apels_differents = fireDF
    .select("CallType")
    .where(col("CallType") =!= "")
    .distinct()

type_apels_differents.show(false)

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Assist Polic

type_apels_differents: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CallType: string]


### Question 3

**Trouver toutes les réponses ou les délais sont supérieurs à 5 minutes?**

*Indication
1. Renommer la colonne Delay -> ReponseDelayedinMins
2. Retourner un nouveau DataFrame
3. Afficher tous les appels où le temps de réponse à un site d'incendie a eu lieu après un retard de plus de 5 minutes

In [83]:
// Reponse 3
val newFireDF = fireDF.withColumnRenamed("Delay", "ResponseDelayedinMins")

//sélectionner tous les délais, puis filtrer les délais supérieurs à 5 en utilisant where
newFireDF.select("ResponseDelayedinMins").where($"ResponseDelayedinMins" > 5).show()

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|                 5.35|
|                 6.25|
|                  5.2|
|                  5.6|
|                 7.25|
|            11.916667|
|             5.116667|
|             8.633333|
|             95.28333|
|                 5.45|
|                  7.6|
|             6.133333|
|            5.1833334|
|            6.9166665|
|                  5.2|
|                 6.35|
|             7.983333|
|                13.55|
|                 5.15|
|            13.583333|
+---------------------+
only showing top 20 rows



newFireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


### Transformations des dates  
Maintenant nous allons d'abord:  
1. Transformer les dates de type String en Spark Timestamp afin que nous puissions effectuer des requêtes basées sur la date plus tard    
2. Retourner le Dataframe transformée  
3. Mettre en cache le nouveau DataFrame  

In [21]:
val fireTSDF = newFireDF
  .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate") 
  .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate") 
  .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm")

fireTSDF.cache()

fireTSDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]
res16: fireTSDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


### Question 4
**Quels sont les types d'appels les plus courants?**

In [24]:
//Reponse 4

/*regrouper les données par type d'appels avec grouBy ensuite compter le nombre de chaque type d'appel,
puis appliquer au résultat la fonction sort(desc) pour effectuer un tri décroissant  
et afficher les 10 premières lignes avec show(10)*/

val type_appel_plus_courant = fireTSDF.groupBy("CallType").count.sort(desc("count"))

type_appel_plus_courant.show(10, false)

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



type_appel_plus_courant: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CallType: string, count: bigint]


### Question 5-a
**Quels sont boites postaux rencontrés dans les appels les plus courants?**

In [25]:
//Reponse 5-a
//transformer type_appel_plus_courant en List de string et récupérer les 10 premières lignes
val l = type_appel_plus_courant.select("CallType").map(f=>f.getString(0)).collect.toList
val li = l.take(10)

/*filtrer les types d'appels contenus dans la liste li en utilisant filter et isin, 
puis afficher les types d'appels correspondants et leurs codes postaux  */
val codes_postaux = fireTSDF
    .filter($"CallType".isin(li:_*))
    .select("CallType", "Zipcode")
    
codes_postaux.show()

+----------------+-------+
|        CallType|Zipcode|
+----------------+-------+
|  Structure Fire|  94109|
|Medical Incident|  94124|
|Medical Incident|  94102|
|    Vehicle Fire|  94110|
|          Alarms|  94109|
|  Structure Fire|  94105|
|          Alarms|  94112|
|          Alarms|  94102|
|Medical Incident|  94115|
|Medical Incident|  94114|
|Medical Incident|  94110|
|  Structure Fire|  94112|
|Medical Incident|  94109|
|Medical Incident|  94121|
|  Structure Fire|  94110|
|  Structure Fire|  94110|
|  Structure Fire|  94110|
|Medical Incident|  94116|
|Medical Incident|  94118|
|Medical Incident|  94118|
+----------------+-------+
only showing top 20 rows



l: List[String] = List(Medical Incident, Structure Fire, Alarms, Traffic Collision, Citizen Assist / Service Call, Other, Outside Fire, Vehicle Fire, Gas Leak (Natural and LP Gases), Water Rescue, Odor (Strange / Unknown), Electrical Hazard, Elevator / Escalator Rescue, Smoke Investigation (Outside), Fuel Spill, HazMat, Industrial Accidents, Explosion, Train / Rail Incident, Aircraft Emergency, Assist Police, High Angle Rescue, Watercraft in Distress, Extrication / Entrapped (Machinery, Vehicle), Oil Spill, Suspicious Package, Marine Fire, Confined Space / Structure Collapse, Mutual Aid / Assist Outside Agency, Administrative)
li: List[String] = List(Medical Incident, Structure Fire, Alarms, Traffic Collision, Citizen Assist / Service Call, Other, Outside Fire, Vehicle Fire, Gas Leak (N...


### Question 5-a
**Quels sont les quartiers de San Francisco dont les codes postaux sont 94102 et 94103?**

In [43]:
//Reponse 5-b
/*sélectionner les quartiers et leurs boites postales puis choisir ceux dont les boites postales sont
égales à 94102 ou 94103 avec la fonction filter et les opérations de comparaison*/
val quartier = fireTSDF
    .select("Neighborhood", "Zipcode")
    .filter($"Zipcode" >= 94102)
    .filter($"Zipcode" <= 94103)

quartier.show()

+--------------------+-------+
|        Neighborhood|Zipcode|
+--------------------+-------+
|          Tenderloin|  94102|
|          Tenderloin|  94102|
|     South of Market|  94103|
|             Mission|  94103|
|             Mission|  94103|
|     South of Market|  94103|
|          Tenderloin|  94102|
|        Hayes Valley|  94102|
|          Tenderloin|  94102|
|          Tenderloin|  94102|
|     South of Market|  94103|
|        Hayes Valley|  94102|
|Financial Distric...|  94103|
|        Hayes Valley|  94102|
|             Mission|  94103|
|          Tenderloin|  94102|
|          Tenderloin|  94102|
|        Hayes Valley|  94102|
|     South of Market|  94103|
|     South of Market|  94103|
+--------------------+-------+
only showing top 20 rows



quartier: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Neighborhood: string, Zipcode: int]


### Question 6
**Determiner le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?**

In [49]:
//Reponse 6
/*appliquer la fonction describe() sur la sélection des délais pour obtenir le nombre,
la moyenne, le minimum et le maximum des délais de réponses*/
val min_max_moy = fireTSDF.select("ResponseDelayedinMins").describe()

min_max_moy.show()

+-------+---------------------+
|summary|ResponseDelayedinMins|
+-------+---------------------+
|  count|               175296|
|   mean|    3.892364154521585|
| stddev|    9.378286226254197|
|    min|          0.016666668|
|    max|              1844.55|
+-------+---------------------+



min_max_moy: org.apache.spark.sql.DataFrame = [summary: string, ResponseDelayedinMins: string]


### Question 7-a
**Combien d'années distinctes trouve t-on dans ce Dataset?**  
Dans ce dataset nous avons des données comprises entre 2000-2018. Vous pouvez utilisez la fonction Spark `year()` pour les dates en Timestamp

In [50]:
//Reponse 7-a
/*recupérer l'année de chaque IncidentDate avec year(), supprimer les doublons via distinct()
puis compter le nombre d'année*/
val nombre_annee = fireTSDF.select(year($"IncidentDate")).distinct().count

println(nombre_annee)

19


nombre_annee: Long = 19


### Question 7-b
**Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?**

In [137]:
//Reponse 7-b
/* recupérer les IncidentDate et les CallNumber de l'année 2018 puis grouper par semaine
de IncidentDate avec groupBy(window) et compter le nombre d'appels dans chaque semaine avec agg(count)*/
val df = fireTSDF.select("IncidentDate", "CallNumber").where(year($"IncidentDate")===2018)
    .groupBy(window($"IncidentDate", "1 week").alias("semaine"))
    .agg(count("CallNumber").alias("nombre appel")) 

 //retourne la sémaine contenant le plus d'appels
val maximum = df.select(max("nombre appel")).first()
df.select("semaine", "nombre appel").where($"nombre appel" === maximum(0)).show(false)

+------------------------------------------+------------+
|semaine                                   |nombre appel|
+------------------------------------------+------------+
|{2018-10-25 00:00:00, 2018-11-01 00:00:00}|250         |
+------------------------------------------+------------+



df: org.apache.spark.sql.DataFrame = [semaine: struct<start: timestamp, end: timestamp>, nombre appel: bigint]
maximum: org.apache.spark.sql.Row = [250]


### Question 8
**Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?**

In [104]:
//Reponse 8
//recupérer le pire délais de réponse
val valeur_max = fireTSDF.select(max("ResponseDelayedinMins")).first()

//afficher les quartiers ayant le pire délais
fireTSDF
    .select("Neighborhood", "ResponseDelayedinMins")
    .where($"ResponseDelayedinMins" === valeur_max(0))
    .show()

1844.55
+---------------+---------------------+
|   Neighborhood|ResponseDelayedinMins|
+---------------+---------------------+
|South of Market|              1844.55|
+---------------+---------------------+



valeur_max: org.apache.spark.sql.Row = [1844.55]


### Question 9

**Comment stocker les données du Dataframe sous format de fichiers Parquet?**

In [139]:
//Reponse 9
fireDF.
    coalesce(1).
    write.
    mode("overwrite").
    option("compression", "none").
    format("parquet").
    save("dataSetFire")

### Question 10
**Comment relire les données stockée en format Parquet?**

In [140]:
//Reponse 10
spark.read.parquet("dataSetFire").show

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+

## FIN