# Projet Final Apache Spark

**Nom Etudiant :**  Leye

**Prenom Etudiant:** Mame Penda

**Classe :**  Intélligence Artificielle

## Description
Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Dataset** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données cliquer sur ce [lien](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

## Travail à faire.
L'objectif de ce projet est de comprendre le **SF Fire Dataset** afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquates.

- Créer un repos git (public ou privé) et partager le repos avec mon mail (limahin10@gmail.com)
- Ecrire un code lisible et bien indenté 
- N'oublier pas de mettre en commentaire la justification de vos réponses sur les cellules Markdown. 


## Note:
- Le projet est personnel, c'est-à-dire chaque notebook ne concerne qu'un seul étudiant. 
- Deadline : **Jeudi 10 janvier 2021** (Aucune de dérogation ne sera acceptée)

### Chargement des données

Importation des packages Spark

In [2]:
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._ 
import spark.implicits._

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._


Nous allons jeter un coup d'oeil sur la structure des données avant de définir un schéma

In [3]:
!head -1 "datasets/sf-fire/sf-fire-calls.csv"

CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,CallFinalDisposition,AvailableDtTm,Address,City,Zipcode,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,Delay



Vu que la taille de ces données est énormes, inferer le schema pour un très grande volumes de données s'avère un peu couteux. Nous allons ainsi définir un schema pour le Dataset.

In [4]:
val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))

fireSchema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,IntegerType,true), StructField(Battalion,StringType,true), StructField(StationArea,StringType,true), StructField(Box,StringType,true), StructField(OriginalPriority,StringType,true), StructField(Priority,StringType,true), StructField(FinalPriority,IntegerType,true), StructField(ALSUnit,BooleanType,true)...


In [5]:
val sfFireFile = "datasets/sf-fire/sf-fire-calls.csv"
val fireDF = spark
  .read
  .schema(fireSchema)
  .option("header", "true")
  .csv(sfFireFile)

sfFireFile: String = datasets/sf-fire/sf-fire-calls.csv
fireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


Nous allons mettre en cache le Dataframe

In [6]:
fireDF.cache()

res1: fireDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


In [7]:
fireDF.count()

res2: Long = 175296


In [8]:
fireDF.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [24]:
fireDF.show(5)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

Filtrage des d'appels de type "Medical Incident"

In [9]:
val fewFireDF = fireDF
  .select("IncidentNumber", "AvailableDtTm", "CallType") 
  .where($"CallType" =!= "Medical Incident")

fewFireDF.show(5, false)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



fewFireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [IncidentNumber: int, AvailableDtTm: string ... 1 more field]


### Question 1
**Combien de types d'appels distincts ont été passés ?**  
Pour être sûr, il ne faut pas compter les valeurs «nulles» dans la colonne.

In [10]:
// Reponse 1
/*On élimine  les valeurs nulles de la colonne CallType
*/
val NewfireDF = fireDF.na.drop(cols = Seq("CallType"))
/*Vu qu'on a déja supprimé les valeurs nulles , Le code ci dessous permet d'afficher le nombre de types d'appels distincts
select ("CallType")permet de selectionner le type d"appel .distinct permet de supprimer les répétitions, chaque type est 
affiché une seule fois et .count() compte le nombre total de type d"appel present dans la base.
On trouve 30 types appel
*/
val distinctCall = NewfireDF.select("CallType").distinct.count()
distinctCall

NewfireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]
distinctCall: Long = 30
res5: Long = 30


### Question 2

**Quels types d'appels différents ont été passés au service d'incendie?**

In [12]:
// Reponse 2
/*
NewfireDF.select("CallType") permet de selectionner la colonne CallType
.distinct permet d'afficher chaque type une seule fois
CallTypeDiff.show(30 ,false) permet d"afficher le nom de ces differents appelset de façon explicite
*/
val CallTypeDiff = NewfireDF.select("CallType").distinct
CallTypeDiff.show(30,false)
    



+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Assist Polic

CallTypeDiff: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CallType: string]


### Question 3

**Trouver toutes les réponses ou les délais sont supérieurs à 5 minutes?**

*Indication
1. Renommer la colonne Delay -> ReponseDelayedinMins
2. Retourner un nouveau DataFrame
3. Afficher tous les appels où le temps de réponse à un site d'incendie a eu lieu après un retard de plus de 5 minutes

In [13]:
// Reponse 3
val newFireDF = fireDF.withColumnRenamed("Delay", "ResponseDelayedinMins")
/* newFireDF.select("CallNumber","ResponseDelayedinMins") permet de selectionner les colonnes(Delay et ResponseDelayedinMins )
el la clause where($"ResponseDelayedinMins" > 5) permet d'apeller la condition et  filtre uniquement les appels ou le temps 
de réponse à un site d'incendie a eu lieu après un retard de plus de 5 minutes.
Call.show(false) affiche le resultat
*/
val Call= newFireDF.select("CallNumber","ResponseDelayedinMins")
                   .where($"ResponseDelayedinMins" > 5)
Call.show(false)

+----------+---------------------+
|CallNumber|ResponseDelayedinMins|
+----------+---------------------+
|20110315  |5.35                 |
|20120147  |6.25                 |
|20130013  |5.2                  |
|20140067  |5.6                  |
|20140177  |7.25                 |
|20150056  |11.916667            |
|20150254  |5.116667             |
|20150265  |8.633333             |
|20150265  |95.28333             |
|20150380  |5.45                 |
|20150414  |7.6                  |
|20160059  |6.133333             |
|20160064  |5.1833334            |
|20170118  |6.9166665            |
|20170342  |5.2                  |
|20180129  |6.35                 |
|20180191  |7.983333             |
|20180382  |13.55                |
|20190062  |5.15                 |
|20190097  |13.583333            |
+----------+---------------------+
only showing top 20 rows



newFireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]
Call: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CallNumber: int, ResponseDelayedinMins: float]


### Transformations des dates
Maintenant nous allons d'abord:
1. Transformer les dates de type String en Spark Timestamp afin que nous puissions effectuer des requêtes basées sur la date plus tard
2. Retourner le Dataframe transformée
3. Mettre en cache le nouveau DataFrame

In [14]:
val fireTSDF = newFireDF
  .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate") 
  .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate") 
  .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm")

fireTSDF.cache()

fireTSDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]
res9: fireTSDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


### Question 4
**Quels sont les types d'appels les plus courants?**

In [15]:
//Reponse 4
/*
val CurrentCall = ... permet de nommer  une nouvelle variable
.select("CallType") séléctiionne la colonne Calltype
.groupBy("CallType").count() les groupe en affichant le nombre de fois qu'ils ont apparu dans la colonne
.orderBy(desc("count")) les ordonnent par ordre decroissant 
.limit(5) prend seulement les  10 types d"appel les plus frequants 
*/
val CurrentCall = fireTSDF
                         .select("CallType")
                         .groupBy("CallType")
                         .count()
                         .orderBy(desc("count"))
                         .limit(10)
 CurrentCall.show(false)


+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+



CurrentCall: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CallType: string, count: bigint]


### Question 5-a
**Quels sont boites postaux rencontrés dans les appels les plus courants?**

In [16]:
//Reponse 5-a
/*
 on a deja les types d'appels les plus courants grace à la question précédente donc on filtre les
boites postaux avec le code .where($"CallType" === "Medical Incident" || $"CallType" === "Structure Fire" ) cad  
 on specifie le type d"appel 
*/
val PObox =fireTSDF
                  .select("ZipCode")
                  .where($"CallType" === "Medical Incident" || $"CallType" === "Structure Fire" || $"CallType" === "Alarms ") 
                  .distinct()
 PObox.show(false)

+-------+
|ZipCode|
+-------+
|94109  |
|94115  |
|94112  |
|94127  |
|94108  |
|94121  |
|94105  |
|null   |
|94131  |
|94116  |
|94134  |
|94124  |
|94102  |
|94114  |
|94107  |
|94111  |
|94103  |
|94117  |
|94122  |
|94110  |
+-------+
only showing top 20 rows



PObox: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ZipCode: int]


### Question 5-a
**Quels sont les quartiers de San Francisco dont les codes postaux sont 94102 et 94103?**

In [17]:
//Reponse 5-b
/*fireTSDF.select("neighborhood","Zipcode")permet de selectionner les quartiers et la colonne et la clause where 
permet de filtrer les quartiers dont leur codes postaux sont 94102 et 94103
*/ 
val neighborhoodsSF = fireTSDF
                             .select("neighborhood","Zipcode")
                             .where(($"Zipcode" === 94102 || $"Zipcode" === 94103) && ($"City" === "San Francisco" )) 
                             .distinct
neighborhoodsSF.show(false)

+------------------------------+-------+
|neighborhood                  |Zipcode|
+------------------------------+-------+
|Potrero Hill                  |94103  |
|Western Addition              |94102  |
|Tenderloin                    |94102  |
|Nob Hill                      |94102  |
|Castro/Upper Market           |94103  |
|South of Market               |94102  |
|South of Market               |94103  |
|Hayes Valley                  |94103  |
|Financial District/South Beach|94102  |
|Mission Bay                   |94103  |
|Tenderloin                    |94103  |
|Financial District/South Beach|94103  |
|Hayes Valley                  |94102  |
|Mission                       |94103  |
+------------------------------+-------+



neighborhoodsSF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [neighborhood: string, Zipcode: int]


### Question 6
**Determiner le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?**

In [18]:
//Reponse 6

val ToCallMaMiMean = fireTSDF.select("ResponseDelayedinMins").describe().show(false)
ToCallMaMiMean
/*
.select("ResponseDelayedinMins") selectionne la colonne temps de réponse des appels
.describe() permet d'avoir nombre total d'appels,la moyenne, le minimum et le maximum...
*/

+-------+---------------------+
|summary|ResponseDelayedinMins|
+-------+---------------------+
|count  |175296               |
|mean   |3.892364154521585    |
|stddev |9.378286226254206    |
|min    |0.016666668          |
|max    |1844.55              |
+-------+---------------------+



ToCallMaMiMean: Unit = ()


### Question 7-a
**Combien d'années distinctes trouve t-on dans ce Dataset?**  
Dans ce dataset nous avons des données comprises entre 2000-2018. Vous pouvez utilisez la fonction Spark `year()` pour les dates en Timestamp

### Question 7-b
**Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?**

In [19]:
//Reponse 
/*
Ecrire ici votre code
*/

### Question 8
**Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?**

In [20]:
//Reponse 8
/*
Ecrire ici votre code
*/

### Question 9

**Comment stocker les données du Dataframe sous format de fichiers Parquet?**

In [1]:
//Reponse 9

// En utilisant write.parquet
orders.
    write.
    mode("overwrite").
    option("compression", "none").
    parquet("SF Fire Dataset/training/retail_db/orders")
/*
orders.
    write.
    mode("overwrite").
    option("compression", "none").
    parquet("SF Fire Dataset/training/retail_db/orders")
    permet stocker les données du Dataframe sous format de fichiers Parquet en utilisant  write.parquet

*/

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.189.102:4042
SparkContext available as 'sc' (version = 3.0.1, master = local[*], app id = local-1610317825269)
SparkSession available as 'spark'


<console>: 29: error: not found: value orders

### Question 10
**Comment relire les données stockée en format Parquet?**

In [19]:
//Reponse 10
spark.read.parquet("SF Fire Dataset/training/retail_db/orders").show
/*
spark.read.parquet("SF Fire Dataset/training/retail_db/orders").show permet de relire les données stockée en format Parquet */

org.apache.spark.sql.AnalysisException:  Path does not exist: file:/home/aissatou/Documents/projet spark/SF Fire Dataset/training/retail_db/orders;

## FIN