# Projet Final Apache Spark

**Nom Etudiant :**  SYLL

**Prenom Etudiant:**  ABDOULAYE

**Classe :**  MASTER 1 BIG DATA


## Description
Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Dataset** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données cliquer sur ce [lien](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

## Travail à faire.
L'objectif de ce projet est de comprendre le **SF Fire Dataset** afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquates.

- Créer un repos git (public ou privé) et partager le repos avec mon mail (limahin10@gmail.com)
- Ecrire un code lisible et bien indenté 
- N'oublier pas de mettre en commentaire la justification de vos réponses sur les cellules Markdown. 


## Note:
- Le projet est personnel, c'est-à-dire chaque notebook ne concerne qu'un seul étudiant. 
- Deadline : **Jeudi 10 janvier 2021** (Aucune de dérogation ne sera acceptée)

### Chargement des données

Importation des packages Spark

In [2]:
1+1

res1: Int = 2


In [3]:
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._ 
import spark.implicits._
import org.apache.spark.sql.{functions => F}

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.{functions=>F}


Nous allons jeter un coup d'oeil sur la structure des données avant de définir un schéma

In [4]:
!head -1 "datasets/sf-fire/sf-fire-calls.csv"

CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,CallFinalDisposition,AvailableDtTm,Address,City,Zipcode,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,Delay



Vu que la taille de ces données est énormes, inferer le schema pour un très grande volumes de données s'avère un peu couteux. Nous allons ainsi définir un schema pour le Dataset.

In [5]:
val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))

fireSchema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,IntegerType,true), StructField(Battalion,StringType,true), StructField(StationArea,StringType,true), StructField(Box,StringType,true), StructField(OriginalPriority,StringType,true), StructField(Priority,StringType,true), StructField(FinalPriority,IntegerType,true), StructField(ALSUnit,BooleanType,true)...

In [6]:
val sfFireFile = "datasets/sf-fire/sf-fire-calls.csv"
val fireDF = spark
  .read
  .schema(fireSchema)
  .option("header", "true")
  .csv(sfFireFile)

sfFireFile: String = datasets/sf-fire/sf-fire-calls.csv
fireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


Nous allons mettre en cache le Dataframe

In [7]:
fireDF.cache()

res2: fireDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


In [8]:
fireDF.count()

java.lang.IllegalArgumentException:  Unsupported class file major version 55

In [9]:
fireDF.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [10]:
fireDF.show(5)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

Filtrage des d'appels de type "Medical Incident"

In [11]:
val fewFireDF = fireDF
  .select("IncidentNumber", "AvailableDtTm", "CallType") 
  .where($"CallType" =!= "Medical Incident")

fewFireDF.show(5, false)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



fewFireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [IncidentNumber: int, AvailableDtTm: string ... 1 more field]


### Question 1
**Combien de types d'appels distincts ont été passés ?**  
Pour être sûr, il ne faut pas compter les valeurs «nulles» dans la colonne.

In [15]:
// dans cette question j'ai pas compter les valeurs null c'est pourquoi j'ai enlever les na et aussi 
//mettre l'aggregat l'ensemble des données entier
fireDF.na.drop.agg(countDistinct(col("CallType")).alias("count")).show()

+-----+
|count|
+-----+
|   28|
+-----+



### Question 2

**Quels types d'appels différents ont été passés au service d'incendie?**

In [16]:
// dans cette question j'ai selectionner CallType qui permet de sortir l'ensemble des types d'appels differents
fireDF.select("CallType").show()

+----------------+
|        CallType|
+----------------+
|  Structure Fire|
|Medical Incident|
|Medical Incident|
|    Vehicle Fire|
|          Alarms|
|  Structure Fire|
|          Alarms|
|          Alarms|
|Medical Incident|
|Medical Incident|
|Medical Incident|
|  Structure Fire|
|Medical Incident|
|Medical Incident|
|  Structure Fire|
|  Structure Fire|
|  Structure Fire|
|Medical Incident|
|Medical Incident|
|Medical Incident|
+----------------+
only showing top 20 rows



### Question 3

**Trouver toutes les réponses ou les délais sont supérieurs à 5 minutes?**

*Indication
1. Renommer la colonne Delay -> ReponseDelayedinMins
2. Retourner un nouveau DataFrame
3. Afficher tous les appels où le temps de réponse à un site d'incendie a eu lieu après un retard de plus de 5 minutes

In [18]:
// dans cette question après avoir renommé, j'ai fait un select sur les temps de réponses 
//et utiliser un where pour un delais supérieur à 5mn
val newFireDF = fireDF.withColumnRenamed("Delay", "ResponseDelayedinMins")
newFireDF
.select("ResponseDelayedinMins")
.where($"ResponseDelayedinMins">5)
.show()


+---------------------+
|ResponseDelayedinMins|
+---------------------+
|                 5.35|
|                 6.25|
|                  5.2|
|                  5.6|
|                 7.25|
|            11.916667|
|             5.116667|
|             8.633333|
|             95.28333|
|                 5.45|
|                  7.6|
|             6.133333|
|            5.1833334|
|            6.9166665|
|                  5.2|
|                 6.35|
|             7.983333|
|                13.55|
|                 5.15|
|            13.583333|
+---------------------+
only showing top 20 rows



newFireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


### Transformations des dates
Maintenant nous allons d'abord:
1. Transformer les dates de type String en Spark Timestamp afin que nous puissions effectuer des requêtes basées sur la date plus tard
2. Retourner le Dataframe transformée
3. Mettre en cache le nouveau DataFrame

In [19]:
//dans cette question nous avons un nouveau dataframe et j'ai utilisé un select sur 3 colonnes pour convertir 
//les dates
val fireTSDF = newFireDF
  .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate") 
  .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate") 
  .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm")
fireTSDF
.select("IncidentDate", "OnWatchDate", "AvailableDtTS")
.show()

fireTSDF.cache()

+-------------------+-------------------+-------------------+
|       IncidentDate|        OnWatchDate|      AvailableDtTS|
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 08:03:26|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 09:46:44|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 09:58:53|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 12:06:57|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 13:08:40|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 15:31:02|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 14:59:04|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 16:22:49|
|2002-01

fireTSDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]
res13: fireTSDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


### Question 4
**Quels sont les types d'appels les plus courants?**

In [21]:
//dans cette question j'ai fait un select sur les types appels et enlever les valeurs nulls puis faire 
//un regroupement par type d'appel en les comptants grace à compte et les agencer par ordre decroissant
fireTSDF
    .select("CallType")
    .where(col("CallType").isNotNull)
    .groupBy("CallType")
    .count()
    .orderBy(desc("count"))
    .show()

+--------------------+------+
|            CallType| count|
+--------------------+------+
|    Medical Incident|113794|
|      Structure Fire| 23319|
|              Alarms| 19406|
|   Traffic Collision|  7013|
|Citizen Assist / ...|  2524|
|               Other|  2166|
|        Outside Fire|  2094|
|        Vehicle Fire|   854|
|Gas Leak (Natural...|   764|
|        Water Rescue|   755|
|Odor (Strange / U...|   490|
|   Electrical Hazard|   482|
|Elevator / Escala...|   453|
|Smoke Investigati...|   391|
|          Fuel Spill|   193|
|              HazMat|   124|
|Industrial Accidents|    94|
|           Explosion|    89|
|Train / Rail Inci...|    57|
|  Aircraft Emergency|    36|
+--------------------+------+
only showing top 20 rows



### Question 5-a
**Quels sont boites postaux rencontrés dans les appels les plus courants?**

In [25]:
//dans cette question j'ai fait un select sur les types d'appels et les boites postaux
// et j'ai aussi utiliser un where pour enlever les boites postaux non null
// et les regrouper par types d'appel et les compters par ordre décroissant
fireTSDF
    .select("CallType", "ZipCode")
    .where(col("Zipcode").isNotNull)
    .groupBy("Zipcode", "CallType")
    .count()
    .orderBy(desc("count"))
    .show()

+-------+----------------+-----+
|Zipcode|        CallType|count|
+-------+----------------+-----+
|  94102|Medical Incident|16130|
|  94103|Medical Incident|14775|
|  94110|Medical Incident| 9995|
|  94109|Medical Incident| 9479|
|  94124|Medical Incident| 5885|
|  94112|Medical Incident| 5630|
|  94115|Medical Incident| 4785|
|  94122|Medical Incident| 4323|
|  94107|Medical Incident| 4284|
|  94133|Medical Incident| 3977|
|  94117|Medical Incident| 3522|
|  94134|Medical Incident| 3437|
|  94114|Medical Incident| 3225|
|  94118|Medical Incident| 3104|
|  94121|Medical Incident| 2953|
|  94116|Medical Incident| 2738|
|  94132|Medical Incident| 2594|
|  94110|  Structure Fire| 2267|
|  94105|Medical Incident| 2258|
|  94102|  Structure Fire| 2229|
+-------+----------------+-----+
only showing top 20 rows



### Question 5-a
**Quels sont les quartiers de San Francisco dont les codes postaux sont 94102 et 94103?**

In [37]:
//dans cette question j'ai fait un select sur les boites postaux et les quartiers de san francisco(neighborhood)
//j'ai utilisé un where pour selectionner seulement les quartier de san francisco
//et j'ai utiliser un filter pour recupérer les boites postaux demandés


fireTSDF
    .select( "Zipcode", "neighborhood")
    .where($"City"==="San Francisco")
    .filter($"Zipcode"===94102 or $"Zipcode"===94103)
    .show()


+-------+----------------+
|Zipcode|    neighborhood|
+-------+----------------+
|  94103| South of Market|
|  94103|         Mission|
|  94103| South of Market|
|  94103| South of Market|
|  94103| South of Market|
|  94102|      Tenderloin|
|  94103| South of Market|
|  94103| South of Market|
|  94103| South of Market|
|  94103| South of Market|
|  94103|         Mission|
|  94103|         Mission|
|  94102|      Tenderloin|
|  94102|      Tenderloin|
|  94103| South of Market|
|  94102|      Tenderloin|
|  94102|      Tenderloin|
|  94103| South of Market|
|  94102|Western Addition|
|  94103| South of Market|
+-------+----------------+
only showing top 20 rows



### Question 6
**Determiner le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?**

In [30]:
//dans cette question j'ai utiliser une fonctions puis un select qui me permet de compter les types d'appels
//de calculer la moyenne, le minimum et le maximum
fireTSDF
.select(count("CallType"), F.avg("ResponseDelayedinMins"),
F.min("ResponseDelayedinMins"), F.max("ResponseDelayedinMins"))
.show()

+---------------+--------------------------+--------------------------+--------------------------+
|count(CallType)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+---------------+--------------------------+--------------------------+--------------------------+
|         175296|         3.892364154521585|               0.016666668|                   1844.55|
+---------------+--------------------------+--------------------------+--------------------------+



### Question 7-a
**Combien d'années distinctes trouve t-on dans ce Dataset?**  
Dans ce dataset nous avons des données comprises entre 2000-2018. Vous pouvez utilisez la fonction Spark `year()` pour les dates en Timestamp

In [53]:
//dans cette question j'ai utilisé une fonction et j'ai fait un select sur la colonne incidentdate et distinct()
//pour distinguer les date et les regrouper par ordre croissant
fireTSDF
.select(year($"IncidentDate"))
.distinct()
.orderBy(year($"IncidentDate"))
.show()

+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



### Question 7-b
**Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?**

In [39]:
//Reponse 7-b
fireTSDF
    .where(year(col("IncidentDate")) === 2018)
    .where($"CallType").like("%Fire%")
    .select(weekofyear(col("IncidentDate")).as("week"))
    .groupBy("week")
    .count()
    .orderBy(desc("count"))
    .limit(1)
    .show()

<console>: 41: error: value like is not a member of org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

### Question 8
**Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?**

In [42]:
//dans cette question j'ai utiliser un where pour selectionner les qurtier de san francisco
// une fontion qui me permet de recuperer la date 
//un select sur les quartiers et les delais de reponses et les regrouper par quartier
//j'ai utiliser l'aggregat pour les maximum temps de réponses et les regrouper par ordre decroissant
fireTSDF
    .where($"City" === "San Francisco")
    .where(year(col("IncidentDate")) === 2018)
    .select("Neighborhood", "ResponseDelayedinMins")
    .groupBy("Neighborhood")
    .agg(max("ResponseDelayedinMins"))
    .orderBy(max("ResponseDelayedinMins").desc).show()

+--------------------+--------------------------+
|        Neighborhood|max(ResponseDelayedinMins)|
+--------------------+--------------------------+
|           Chinatown|                 491.26666|
|Financial Distric...|                 406.63333|
|          Tenderloin|                 340.48334|
|      Haight Ashbury|                 175.86667|
|Bayview Hunters P...|                     155.8|
|     Pacific Heights|                 129.01666|
|        Potrero Hill|                     109.8|
|        Inner Sunset|                 106.13333|
|     South of Market|                  94.71667|
|      Inner Richmond|                 90.433334|
|           Excelsior|                  83.76667|
| Castro/Upper Market|                  74.13333|
|    Western Addition|                 67.916664|
|            Nob Hill|                     67.45|
|             Mission|                 54.666668|
|    Presidio Heights|                 52.883335|
|       Outer Mission|                 43.383335|


### Question 9

**Comment stocker les données du Dataframe sous format de fichiers Parquet?**

In [45]:
//dans cette question j'ai  un write.parquet pour le stockage
newFireDF.write.parquet("tableau.parquet")

### Question 10
**Comment relire les données stockée en format Parquet?**

In [46]:
//ici j'ai utiliser un read pour pouvoir le lire
val parquetFileDF = spark.read.parquet("tableau.parquet")

java.lang.IllegalArgumentException:  Unsupported class file major version 55

## FIN