# Projet Final Apache Spark

**Nom Etudiant : SY**  

**Prenom Etudiant : Ibrahima**  

**Classe : Master1 IA**  


## Description
Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Dataset** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données cliquer sur ce [lien](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

## Travail à faire.
L'objectif de ce projet est de comprendre le **SF Fire Dataset** afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquates.

- Créer un repos git (public ou privé) et partager le repos avec mon mail (limahin10@gmail.com)
- Ecrire un code lisible et bien indenté 
- N'oublier pas de mettre en commentaire la justification de vos réponses sur les cellules Markdown. 


## Note:
- Le projet est personnel, c'est-à-dire chaque notebook ne concerne qu'un seul étudiant. 
- Deadline : **Jeudi 10 janvier 2021** (Aucune de dérogation ne sera acceptée)

### Chargement des données

Importation des packages Spark

In [2]:
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._ 
import spark.implicits._

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._


Nous allons jeter un coup d'oeil sur la structure des données avant de définir un schéma

In [3]:
!head -1 "datasets/sf-fire/sf-fire-calls.csv"

head : Le terme head n'est pas reconnu comme nom d'applet de commande, fonction, fichier de script ou programme 
excutable. Vrifiez l'orthographe du nom, ou si un chemin d'accs existe, vrifiez que le chemin d'accs est correct 
et ressayez.
Au caractre Ligne:1 : 1
+ head -1 "datasets/sf-fire/sf-fire-calls.csv"
+ ~~~~
    + CategoryInfo          : ObjectNotFound: (head:String) [], CommandNotFoundException
    + FullyQualifiedErrorId : CommandNotFoundException
 



Vu que la taille de ces données est énormes, inferer le schema pour un très grande volumes de données s'avère un peu couteux. Nous allons ainsi définir un schema pour le Dataset.

In [4]:
val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))

fireSchema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,IntegerType,true), StructField(Battalion,StringType,true), StructField(StationArea,StringType,true), StructField(Box,StringType,true), StructField(OriginalPriority,StringType,true), StructField(Priority,StringType,true), StructField(FinalPriority,IntegerType,true), StructField(ALSUnit,BooleanType,true)...


In [5]:
val sfFireFile = "datasets/sf-fire/sf-fire-calls.csv"
val fireDF = spark
  .read
  .schema(fireSchema)
  .option("header", "true")
  .csv(sfFireFile)

sfFireFile: String = datasets/sf-fire/sf-fire-calls.csv
fireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


Nous allons mettre en cache le Dataframe

In [6]:
fireDF.cache()

res1: fireDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


In [7]:
fireDF.count()

res2: Long = 175296


In [8]:
fireDF.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [9]:
fireDF.show(5)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

Filtrage des d'appels de type "Medical Incident"

In [10]:
val fewFireDF = fireDF
  .select("IncidentNumber", "AvailableDtTm", "CallType") 
  .where($"CallType" =!= "Medical Incident")

fewFireDF.show(5, false)
//fewFireDF.count()

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



fewFireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [IncidentNumber: int, AvailableDtTm: string ... 1 more field]


In [11]:
import org.apache.spark.sql.functions._

import org.apache.spark.sql.functions._


### Question 1
**Combien de types d'appels distincts ont été passés ?**  
Pour être sûr, il ne faut pas compter les valeurs «nulles» dans la colonne.

In [37]:
//Reponse 1
fireDF
  .select("CallType")
  .where(col("CallType").isNotNull) // isNotNull pour ne pas compter les valeurs «nulles»
  .agg(countDistinct('CallType) as 'TypeAppelDistinct)
  .show()

+-----------------+
|TypeAppelDistinct|
+-----------------+
|               30|
+-----------------+



### Question 2

**Quels types d'appels différents ont été passés au service d'incendie?**

In [41]:
//Reponse 2
val fFireDF = fireDF
  .select("IncidentNumber", "AvailableDtTm", "CallType")
  .where($"CallType" ==="Structure Fire") // Structure Fire c'est le service d'incendie
  .show(false)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003382       |01/11/2002 02:59:04 PM|Structure Fire|
|2003408       |01/11/2002 04:09:08 PM|Structure Fire|
|2003408       |01/11/2002 04:09:08 PM|Structure Fire|
|2003408       |01/11/2002 04:09:08 PM|Structure Fire|
|2003497       |01/11/2002 09:03:17 PM|Structure Fire|
|2003554       |01/12/2002 01:56:32 AM|Structure Fire|
|2003695       |01/12/2002 04:46:59 PM|Structure Fire|
|2003777       |01/12/2002 09:14:13 PM|Structure Fire|
|2003887       |01/13/2002 09:38:41 AM|Structure Fire|
|2004070       |01/13/2002 09:10:07 PM|Structure Fire|
|2004070       |01/13/2002 09:22:34 PM|Structure Fire|
|2004160       |01/14/2002 08:47:46 AM|Structure Fire|
|2004218       |01/14/2002 12:38:50 PM|Structure Fire|
|2004238  

fFireDF: Unit = ()


### Question 3

**Trouver toutes les réponses ou les délais sont supérieurs à 5 minutes?**

*Indication
1. Renommer la colonne Delay -> ReponseDelayedinMins
2. Retourner un nouveau DataFrame
3. Afficher tous les appels où le temps de réponse à un site d'incendie a eu lieu après un retard de plus de 5 minutes

In [42]:
//Reponse 3
val newFireDF = fireDF.withColumnRenamed("Delay", "ResponseDelayedinMins") // Renommer la colonne Delay par ReponseDelayedinMins
newFireDF
  .select("ResponseDelayedinMins") 
  .where($"ResponseDelayedinMins" > 5)
  .show(5,false) // permet d'afficher 5 lignes

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|5.35                 |
|6.25                 |
|5.2                  |
|5.6                  |
|7.25                 |
+---------------------+
only showing top 5 rows



newFireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


### Transformations des dates
Maintenant nous allons d'abord:
1. Transformer les dates de type String en Spark Timestamp afin que nous puissions effectuer des requêtes basées sur la date plus tard
2. Retourner le Dataframe transformée
3. Mettre en cache le nouveau DataFrame

In [15]:
val fireTSDF = newFireDF
  .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate") 
  .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate") 
  .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm")

fireTSDF.cache()

fireTSDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]
res9: fireTSDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


### Question 4
**Quels sont les types d'appels les plus courants?**

In [25]:
//Reponse 4
fireTSDF
  .select("CallType")
  .where(col("CallType").isNotNull)
  .groupBy("CallType")
  .count()  // comptage
  .orderBy(desc("count")) // affichage décroissant; donc on a les plus grandes valeurs vers les plus petites
  .show(5,false)

+-----------------------------+------+
|CallType                     |count |
+-----------------------------+------+
|Medical Incident             |113794|
|Structure Fire               |23319 |
|Alarms                       |19406 |
|Traffic Collision            |7013  |
|Citizen Assist / Service Call|2524  |
+-----------------------------+------+
only showing top 5 rows



### Question 5-a
**Quels sont boites postaux rencontrés dans les appels les plus courants?**

In [26]:
//Reponse 5-a
fireTSDF
  .select("CallType", "Box")
  .where(col("CallType").isNotNull)
  .groupBy("CallType", "Box") //Box est la colonne boite postale
  .count()
  .orderBy(desc("count"))
  .show(5,false)

+----------------+----+-----+
|CallType        |Box |count|
+----------------+----+-----+
|Medical Incident|2251|1544 |
|Medical Incident|1453|1491 |
|Medical Incident|1461|1205 |
|Medical Incident|5236|1015 |
|Medical Incident|1545|933  |
+----------------+----+-----+
only showing top 5 rows



### Question 5-b
**Quels sont les quartiers de San Francisco dont les codes postaux sont 94102 et 94103?**

In [27]:
//Reponse 5-b
val fewFireDF = fireDF
  .select("City","Address","Zipcode") 
  .where($"City" === "San Francisco" ) // permet de choisir la ville San Francisco parmi les autres
  .filter($"Zipcode" === 94102 || $"Zipcode" === 94103) // filtrer les codes postaux 94102 et 94103 des quartiers de San Francisco
  .show(5,false)

+-------------+--------------------------+-------+
|City         |Address                   |Zipcode|
+-------------+--------------------------+-------+
|San Francisco|6TH ST/MISSION ST         |94103  |
|San Francisco|15TH ST/MISSION ST        |94103  |
|San Francisco|900 Block of MARKET ST    |94103  |
|San Francisco|1300 Block of HOWARD ST   |94103  |
|San Francisco|300 Block of CLEMENTINA ST|94103  |
+-------------+--------------------------+-------+
only showing top 5 rows



fewFireDF: Unit = ()


### Question 6
**Determiner le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?**

In [43]:
//Reponse 6
import org.apache.spark.sql.{functions => F} // Création de la fonction
fireTSDF
  .select(F.sum("NumAlarms"), F.avg("ResponseDelayedinMins"), F.min("ResponseDelayedinMins"), F.max("ResponseDelayedinMins")) // On applique la fonction à la somme,la moyenne, le minimum et le maximum
  .show()

+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+--------------+--------------------------+--------------------------+--------------------------+
|        176170|         3.892364154521585|               0.016666668|                   1844.55|
+--------------+--------------------------+--------------------------+--------------------------+



import org.apache.spark.sql.{functions=>F}


### Question 7-a
**Combien d'années distinctes trouve t-on dans ce Dataset?**  
Dans ce dataset nous avons des données comprises entre 2000-2018. Vous pouvez utilisez la fonction Spark `year()` pour les dates en Timestamp

In [29]:
//Reponse 7-a
fireTSDF
  .select(year($"IncidentDate")) // IncidentDate est le nouveau nom de la colonne CallDate après conversion
  .distinct() // choisir les dates distinctement
  .orderBy(year($"IncidentDate"))
  .show()

+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



### Question 7-b
**Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?**

In [47]:
//Reponse 7-b
fireTSDF
  .select(($"IncidentDate"))
  .where(year($"IncidentDate") === 2018)
  .groupBy(($"IncidentDate"))
  .count()
  .orderBy(desc("count"))
  .show(10,false)
//Ici j'ai le jour de l'année 2018 qui a eu le plus d'appels d'incendie
// j'ai utilisé weekofyear pour la semaine mais ca ne passe pas
// Merci de vos suggestions

+-------------------+-----+
|IncidentDate       |count|
+-------------------+-----+
|2018-01-01 00:00:00|58   |
|2018-06-20 00:00:00|50   |
|2018-04-22 00:00:00|47   |
|2018-03-16 00:00:00|47   |
|2018-10-02 00:00:00|47   |
|2018-03-07 00:00:00|46   |
|2018-02-15 00:00:00|45   |
|2018-09-18 00:00:00|44   |
|2018-05-07 00:00:00|44   |
|2018-05-11 00:00:00|44   |
+-------------------+-----+
only showing top 10 rows



### Question 8
**Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?**

In [33]:
//Reponse 8
// On considère que si le délais dépasse 5; donc le temps de réponse est pire
fireDF
  .select("City","Address","Delay") 
  .where($"City" === "San Francisco" )
  .filter($"Delay" > 5)
  .orderBy(desc("Delay"))
  .show(5,false)

+-------------+-------------------------+---------+
|City         |Address                  |Delay    |
+-------------+-------------------------+---------+
|San Francisco|0 Block of 6TH ST        |1844.55  |
|San Francisco|1100 Block of STOCKTON ST|931.45   |
|San Francisco|600 Block of UNION ST    |491.26666|
|San Francisco|200 Block of MARKET ST   |406.63333|
|San Francisco|500 Block of CAMPBELL AVE|405.7    |
+-------------+-------------------------+---------+
only showing top 5 rows



### Question 9

**Comment stocker les données du Dataframe sous format de fichiers Parquet?**

In [None]:
//Reponse 9
fireDF.write.format("parquet").save("datasets/sf-fire")

### Question 10
**Comment relire les données stockée en format Parquet?**

In [None]:
//Reponse 10
spark.read.parquet("datasets/sf-fire")

## FIN