NOM Etudiant:ISMAIL ABDILLAHI AHMED
Classe:M1RSI online

Description
Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Dataset** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données cliquer sur ce [lien](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

## Travail à faire.
L'objectif de ce projet est de comprendre le **SF Fire Dataset** afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquates.

- Créer un repos git (public) et partager le repos avec mon mail (limahin10@gmail.com)
- Ecrire un code lisible et bien indenté 
- N'oublier pas de mettre en commentaire la justification de vos réponses sur les cellules Markdown. 


## Note:
- Le projet est personnel, c'est-à-dire chaque notebook ne concerne qu'un seul étudiant. 
- Deadline : **Lundi 31 Janvier 2022  à 23h 59** (Aucune de dérogation ne sera acceptée)

### Chargement des données

In [None]:
Importation des packages Spark

In [2]:
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._ 
import spark.implicits._

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._


In [3]:
val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))

fireSchema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,IntegerType,true), StructField(Battalion,StringType,true), StructField(StationArea,StringType,true), StructField(Box,StringType,true), StructField(OriginalPriority,StringType,true), StructField(Priority,StringType,true), StructField(FinalPriority,IntegerType,true), StructField(ALSUnit,BooleanType,true)...


In [4]:
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.Row


import org.apache.spark.sql.types._
import org.apache.spark.sql.Row


In [5]:
// Charger le dataset
val sfFireFile = "datasets/sf-fire-calls.csv"
val fireDF = spark
  .read
  .schema(fireSchema)
  .option("header", "true")
  .csv(sfFireFile)

sfFireFile: String = datasets/sf-fire-calls.csv
fireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


In [6]:
fireDF.cache()

res0: fireDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


In [7]:
fireDF.count()

res1: Long = 175296


In [8]:
fireDF.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [9]:
fireDF.show(5)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

In [10]:
val fewFireDF = fireDF
  .select("IncidentNumber", "AvailableDtTm", "CallType") 
  .where($"CallType" =!= "Medical Incident")

fewFireDF.show(5, false)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



fewFireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [IncidentNumber: int, AvailableDtTm: string ... 1 more field]


Question 1
Combien de types d'appels distincts ont été passés ?**  
Pour être sûr, il ne faut pas compter les valeurs «nulles» dans la colonne.

In [11]:
val fewFireDF = fireDF
  .select("IncidentNumber", "AvailableDtTm", "CallType") 
  .where($"CallType" =!= "Medical Incident")

fewFireDF.show(5, false)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



fewFireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [IncidentNumber: int, AvailableDtTm: string ... 1 more field]


Quels types d'appels  ont été passés au service d'incendie?

In [12]:
// Cette methode permet de les afficher (Les types distincts de la colonne "CallType")

/* Ici on precise 30 dans la methode .show() pour afficher les 30 "CallType" */

fewFireDF.select("CallType").distinct.show(30,false)

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Assist Polic

### Question 3

Trouver toutes les réponses ou les délais sont supérieurs à 5 minutes?**

Indication
1. Renommer la colonne Delay -> ReponseDelayedinMins
2. Retourner un nouveau DataFrame
3. Afficher tous les appels où le temps de réponse à un site d'incendie a eu lieu après un retard de plus de 5 minutes

In [13]:
// Renommons la colonne ~Delay~ en ~ReponseDelaydinmins~, puis retournons un nouveau df
val newFireDF = fireDF.withColumnRenamed("Delay", "ResponseDelayedinMins")


newFireDF.select("ResponseDelayedinMins").show()
val newFireDF1 = newFireDF.withColumn("Reponse_gt_5", col("ReponseDelayedinMins").gt(5))0

newFireDF1.select(
    "ReponseDelayedinMins",
    "Reponse_gt_5"
).show


+---------------------+
|ResponseDelayedinMins|
+---------------------+
|                 2.95|
|                  4.7|
|            2.4333334|
|                  1.5|
|            3.4833333|
|                 1.75|
|            2.7166667|
|            1.7833333|
|            1.5166667|
|            2.7666667|
|            2.1833334|
|                  2.5|
|            2.4166667|
|                 4.95|
|            1.4166666|
|            2.5333333|
|            1.8833333|
|                 5.35|
|                  2.0|
|            1.8166667|
+---------------------+
only showing top 20 rows



newFireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


In [14]:
val fireTSDF = newFireDF
  .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate") 
  .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate") 
  .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm")

fireTSDF.cache()

fireTSDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]
res8: fireTSDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


In [15]:
fireTSDF.select(
    "IncidentDate",
    "OnWatchDate",
    "AvailableDtTS"
).show

+-------------------+-------------------+-------------------+
|       IncidentDate|        OnWatchDate|      AvailableDtTS|
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 08:03:26|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 09:46:44|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 09:58:53|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 12:06:57|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 13:08:40|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 15:31:02|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 14:59:04|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 16:22:49|
|2002-01

### Question 4
Quels sont les types d'appels les plus courants?**

In [16]:
// 1) On recupere le nombre de type d'appel, pour chaque type d'appel
val countDistinctCallType =  fireTSDF.groupBy("CallType").count()
countDistinctCallType.orderBy("count").show(35,false)

+--------------------------------------------+------+
|CallType                                    |count |
+--------------------------------------------+------+
|Administrative                              |3     |
|Mutual Aid / Assist Outside Agency          |9     |
|Confined Space / Structure Collapse         |13    |
|Marine Fire                                 |14    |
|Suspicious Package                          |15    |
|Oil Spill                                   |21    |
|Watercraft in Distress                      |28    |
|Extrication / Entrapped (Machinery, Vehicle)|28    |
|High Angle Rescue                           |32    |
|Assist Police                               |35    |
|Aircraft Emergency                          |36    |
|Train / Rail Incident                       |57    |
|Explosion                                   |89    |
|Industrial Accidents                        |94    |
|HazMat                                      |124   |
|Fuel Spill                 

countDistinctCallType: org.apache.spark.sql.DataFrame = [CallType: string, count: bigint]


# Question 5-a
Quels sont boites postaux rencontrés dans les appels les plus courants?**

In [17]:
// D'abord, Le nombre Zipcode
val dfZipcode =  fireTSDF.groupBy("Zipcode").count()
dfZipcode.orderBy("count").show(40,false)

+-------+-----+
|Zipcode|count|
+-------+-----+
|null   |142  |
|94129  |512  |
|94158  |882  |
|94130  |1100 |
|94104  |1341 |
|94127  |1881 |
|94111  |2974 |
|94131  |3236 |
|94123  |3719 |
|94116  |3933 |
|94108  |4084 |
|94105  |4236 |
|94132  |4321 |
|94121  |4555 |
|94134  |5009 |
|94118  |5157 |
|94114  |5175 |
|94117  |5804 |
|94133  |6246 |
|94122  |6355 |
|94107  |6941 |
|94115  |7812 |
|94112  |8421 |
|94124  |9236 |
|94109  |14686|
|94110  |14801|
|94103  |20897|
|94102  |21840|
+-------+-----+



dfZipcode: org.apache.spark.sql.DataFrame = [Zipcode: int, count: bigint]


# Question 6
Determiner le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?**

 - le nombre total d'appels

In [18]:

fireTSDF.groupBy("CallType").count().show(31,false)

+--------------------------------------------+------+
|CallType                                    |count |
+--------------------------------------------+------+
|Elevator / Escalator Rescue                 |453   |
|Marine Fire                                 |14    |
|Aircraft Emergency                          |36    |
|Confined Space / Structure Collapse         |13    |
|Administrative                              |3     |
|Alarms                                      |19406 |
|Odor (Strange / Unknown)                    |490   |
|Citizen Assist / Service Call               |2524  |
|HazMat                                      |124   |
|Watercraft in Distress                      |28    |
|Explosion                                   |89    |
|Oil Spill                                   |21    |
|Vehicle Fire                                |854   |
|Suspicious Package                          |15    |
|Extrication / Entrapped (Machinery, Vehicle)|28    |
|Other                      

 - la moyenne, le minimum et le maximum du temps de réponse des appels

In [21]:
fireTSDF.select("ReponseDelayedinMins").describe().show(),N

<console>: 2: error: ';' expected but ',' found.

# Question 7-a
**Combien d'années distinctes trouve t-on dans ce Dataset?**  
Dans ce dataset nous avons des données comprises entre 2000-2018. Vous pouvez utilisez la fonction Spark `year()` pour les dates en Timestamp

In [22]:
// On cree une nouvelle colonne "year"
val fireTSDF1 = fireTSDF.withColumn("year", year(to_timestamp($"IncidentDate", "MM/dd/yyyy")))

// On recupere tous les annees distincts par aggregation
fireTSDF1.agg(countDistinct("year")).show()

+-----------+
|count(year)|
+-----------+
|         19|
+-----------+



fireTSDF1: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 27 more fields]


# Question 7-b
**Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?**

In [None]:
// Creer une colonne avec le format semaine "week_of_year"
val fireTSDF2 = fireTSDF1.withColumn("week_of_year", date_format(col("IncidentDate"), "w"))

// Calculer le nombre d'incident par semaine en 2018
val dfIncidentByWeek = fireTSDF2.groupBy($"week_of_year",$"IncidentDate")
.agg(count($"IncidentDate").as("count"))
.filter($"IncidentDate"
.between("2018-01-01 00:00:00","2018-12-31 00:00:00"))
.orderBy(desc("count"))
.show(10)

### Question 8
**Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?**

 - Creer une nouvelle colonne `worst_reply_time` qui retourne `True` or `False`
 - Renvoi `True`, si le temps de reponse est inferieur a la moyenne `( < 2)`
 - Renvoi `False`, si le temps de reponse est superieur a la moyenne `( > 2)`

In [None]:

fireTSDF1.withColumn("worst_reply_time", col("ReponseDelayedinMins") > 2 )
  .select(
    "Address",
      "ReponseDelayedinMins",
    "worst_reply_time",
).show()

### Question 9

**Comment stocker les données du Dataframe sous format de fichiers Parquet?**

In [None]:
fireTSDF1.write.parquet("MySparkproject.parquet")

### Question 10
**Comment relire les données stockée en format Parquet?**

In [25]:
val parquetFileDF = spark.read.parquet("MySparkproject.parquet")

parquetFileDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 27 more fields]


# FIN