# Big Data avec Spark : Projet Final

*`Nom : Mouhamadou Bamba DIOP`*

## Problematique

Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Datasets** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données [ici](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

## Travail à faire.
L'objectif de ce projet est de comprendre le Dataset SF Fire afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquats.

- Code lisible et bien indenté, 
- N'oublier pas de mettre en commentaire la justification de votre réponse sur les cellule Markdown. 


#### Note:
- Le projet est personnel (un étudiant) . 
- Deadline : **Jeudi 22 janvier 2021**

## Q1. Importez les modules Spark necessaires

In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.7` // Or use any other 2.x version here
import $ivy.`sh.almond::almond-spark:0.10.0` // Not required since almond 0.7.0 (will be automatically added when importing spark)

[32mimport [39m[36m$ivy.$                                   // Or use any other 2.x version here
[39m
[32mimport [39m[36m$ivy.$                                // Not required since almond 0.7.0 (will be automatically added when importing spark)[39m

On désactive le logging pour avoir des sorties claires et lisibles, car les loggings ont tendance a etre 
trop bavard en sortie.

In [2]:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m

## Q2. Creez la Spark Session

- Pour créer SparkSession, nous devons utiliser la méthode de générateur `.builder()` et appeler la méthode `getOrCreate()`. Si SparkSession existe déjà, il retourne sinon crée une nouvelle SparkSession.
- `master("local[*]"]) ` :  on exécute sur le cluster en mode standalone (autonome).
- `appName` : Le nom de l'application

In [3]:
// Your code goes here
import org.apache.spark.sql._

val spark = {
    NotebookSparkSession.builder()
    .master("local[*]")
    .appName("Spoid : Mon Projet Big Data")
    .config("spark.sql.shuffle.partitions", 6)
    .getOrCreate()

}


Loading spark-stubs
Getting spark JARs
Creating SparkSession


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[32mimport [39m[36morg.apache.spark.sql._

[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@74dcbb1d

## Q3. Chargez les données

Utilisez le `fireSchema` definit dans la cellule suivante pour le chargement.

In [4]:
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.Row

// Creer un fireSchema pour notre dataset

val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))


// Charger le dataset

val fireData = spark.read.option("header", "false").schema(fireSchema).csv("sf-fire-calls.csv")



[32mimport [39m[36morg.apache.spark.sql.types._ 
[39m
[32mimport [39m[36morg.apache.spark.sql.Row

// Creer un fireSchema pour notre dataset

[39m
[36mfireSchema[39m: [32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"CallNumber"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"UnitID"[39m, StringType, true, {}),
  [33mStructField[39m([32m"IncidentNumber"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"CallType"[39m, StringType, true, {}),
  [33mStructField[39m([32m"CallDate"[39m, StringType, true, {}),
  [33mStructField[39m([32m"WatchDate"[39m, StringType, true, {}),
  [33mStructField[39m([32m"CallFinalDisposition"[39m, StringType, true, {}),
  [33mStructField[39m([32m"AvailableDtTm"[39m, StringType, true, {}),
  [33mStructField[39m([32m"Address"[39m, StringType, true, {}),
  [33mStructField[39m([32m"City"[39m, StringType, true, {}),
  [33mStructField[39m([32m"Zipcode"[39m, IntegerType, true, {

 - Nombre de colonnes de notre Dataset

In [5]:
fireData.columns.length

[36mres4[39m: [32mInt[39m = [32m28[39m

- Afficher les schemas

In [6]:
fireData.printSchema

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

 - On selectionne quelques lignes a afficher, pour regarder la structure de nos variables
 - NB : Pour des raisons d'affichage, on ne peut pas afficher toutes les colonnes en meme temps

In [7]:
fireData.select(
    "CallNumber",
    "UnitID",
    "IncidentNumber",
    "CallType"
).show


+----------+------+--------------+----------------+
|CallNumber|UnitID|IncidentNumber|        CallType|
+----------+------+--------------+----------------+
|      null|  null|          null|            null|
|  20110016|   T13|       2003235|  Structure Fire|
|  20110022|   M17|       2003241|Medical Incident|
|  20110023|   M41|       2003242|Medical Incident|
|  20110032|   E11|       2003250|    Vehicle Fire|
|  20110043|   B04|       2003259|          Alarms|
|  20110072|   T08|       2003279|  Structure Fire|
|  20110125|   E33|       2003301|          Alarms|
|  20110130|   E36|       2003304|          Alarms|
|  20110197|   E05|       2003343|Medical Incident|
|  20110215|   E06|       2003348|Medical Incident|
|  20110274|   M07|       2003381|Medical Incident|
|  20110275|   T15|       2003382|  Structure Fire|
|  20110304|   E03|       2003399|Medical Incident|
|  20110308|   E14|       2003403|Medical Incident|
|  20110313|   B10|       2003408|  Structure Fire|
|  20110313|

## Q4. Mettez en cache les donnees chargees

Hint: `dataframe.cache().count()`

In [8]:
// Appel de la methode .cache()
val data=fireData.cache()

// Nombres de lignes
val lines=data.count()
println(lines)


175297


[36mdata[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 26 more fields]
[36mlines[39m: [32mLong[39m = [32m175297L[39m

## Q5. Supprimez tous les appels de type `Medical Incident`

Hint: appliquez la methode `.filter()` a la colonne `CallType` avec l'operateur `=!=`

In [9]:
// Importer le module necessaire
import org.apache.spark.sql.functions.col

/* Utilisation de la methode .filter pour filtrer par rapport a la colonne "CallType" et retirer toutes les lignes qui ont le
    d'appel  "Medical Incident"
*/

val filterDF = data.filter(col("CallType") =!= "Medical Incident")


[32mimport [39m[36morg.apache.spark.sql.functions.col

/* Utilisation de la methode .filter pour filtrer par rapport a la colonne "CallType" et retirer toutes les lignes qui ont le
    d'appel  "Medical Incident"
*/

[39m
[36mfilterDF[39m: [32mDataset[39m[[32mRow[39m] = [CallNumber: int, UnitID: string ... 26 more fields]

 - Affichons les resultats

In [10]:
filterDF.select(
    "CallNumber",
    "UnitID",
    "IncidentNumber",
    "CallType"
).show


+----------+------+--------------+--------------------+
|CallNumber|UnitID|IncidentNumber|            CallType|
+----------+------+--------------+--------------------+
|  20110016|   T13|       2003235|      Structure Fire|
|  20110032|   E11|       2003250|        Vehicle Fire|
|  20110043|   B04|       2003259|              Alarms|
|  20110072|   T08|       2003279|      Structure Fire|
|  20110125|   E33|       2003301|              Alarms|
|  20110130|   E36|       2003304|              Alarms|
|  20110275|   T15|       2003382|      Structure Fire|
|  20110313|   B10|       2003408|      Structure Fire|
|  20110313|    D3|       2003408|      Structure Fire|
|  20110313|   E32|       2003408|      Structure Fire|
|  20110344|   T06|       2003429|Odor (Strange / U...|
|  20110375|   B05|       2003453|              Alarms|
|  20110425|   B01|       2003497|      Structure Fire|
|  20120020|   E36|       2003554|      Structure Fire|
|  20120111|   E18|       2003618|Odor (Strange 

## Q6. Combien de types d'appels distincts ont été passés ?**  

In [11]:
// Importer le module "countDistinct"
import org.apache.spark.sql.functions.countDistinct

/* Cette methode ~agg(countDistinct)~ permet de recuperer les types distincts et d'aggreger de "1" (+1), 
lorsque le type rencontree est unique
*/
val distinctsCallType = data.agg(countDistinct("CallType"))
distinctsCallType.show()

+------------------------+
|count(DISTINCT CallType)|
+------------------------+
|                      30|
+------------------------+



[32mimport [39m[36morg.apache.spark.sql.functions.countDistinct

/* Cette methode ~agg(countDistinct)~ permet de recuperer les types distincts et d'aggreger de "1" (+1), 
lorsque le type rencontree est unique
*/
[39m
[36mdistinctsCallType[39m: [32mDataFrame[39m = [count(DISTINCT CallType): bigint]

## Q7. Quels types d'appels  ont été passés au service d'incendie?

In [13]:
// Cette methode permet de les afficher (Les types distincts de la colonne "CallType")

/* Ici on precise 30 dans la methode .show() pour afficher les 30 "CallType" */

data.select("CallType").distinct.show(30,false)

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|null                                        |
|Electrical Hazard                           |
|Train / Rail Incident                       |
|High Angle Rescue                           |
|Assist Police                               |
|Medical Incident                            |
|Vehicle Fire                                |
|Industrial Accidents                        |
|Explosion                                   |
|Administrative                              |
|Confined Space / Structure Collapse         |
|Structure Fire                              |
|Alarms                                      |
|Smoke Investigation (Outside)               |
|Elevator / Escalator Rescue                 |
|Water Rescue                                |
|HazMat                                      |
|Marine Fire                                 |
|Odor (Strang

## Q8. Trouvez toutes les réponses ou les délais sont supérieurs à 5 minutes

Hint:
1. Renommez la colonne `Delay` -> `ReponseDelayedinMins`
2. Retournez un nouveau DataFrame
3. Affichez tous les appels où le temps de réponse au site d'incendie a eu un retard de plus de 5 minutes

In [16]:

// Renommons la colonne ~Delay~ en ~ReponseDelaydinmins~, puis retournons un nouveau df
val df = data.withColumnRenamed("Delay","ReponseDelayedinMins")

// Selectionner tous les appels où le temps de réponse au site d'incendie a eu un retard de plus de 5 minutes
val df1 = df.withColumn("Reponse_gt_5", col("ReponseDelayedinMins").gt(5))


[36mdf[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 26 more fields]
[36mdf1[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 27 more fields]

 - Affichons le resultat

In [17]:

df1.select(
    "ReponseDelayedinMins",
    "Reponse_gt_5"
).show


+--------------------+------------+
|ReponseDelayedinMins|Reponse_gt_5|
+--------------------+------------+
|                null|        null|
|                2.95|       false|
|                 4.7|       false|
|           2.4333334|       false|
|                 1.5|       false|
|           3.4833333|       false|
|                1.75|       false|
|           2.7166667|       false|
|           1.7833333|       false|
|           1.5166667|       false|
|           2.7666667|       false|
|           2.1833334|       false|
|                 2.5|       false|
|           2.4166667|       false|
|                4.95|       false|
|           1.4166666|       false|
|           2.5333333|       false|
|           1.8833333|       false|
|                5.35|        true|
|                 2.0|       false|
+--------------------+------------+
only showing top 20 rows



## Q9. Convertissez les colonnes dates en timestamp

Hint:
* `CallDate` -> `IncidentDate`
* `WatchDate` -> `OnWatchDate`
* `AvailableDtTm` -> `AvailableDtTS`
exemple code pour le cas de `CallDate`:
`dataframe.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate")`

In [19]:
// Importons le module necessaire pour cette transformation
import org.apache.spark.sql.functions.to_timestamp

// CallDate
val df2 = df1.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate")

// WatchDate
val df3 = df2.withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate")

// AvailableDtTm
val df4 = df3.withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy")).drop("AvailableDtTm")

[32mimport [39m[36morg.apache.spark.sql.functions.to_timestamp

// CallDate
[39m
[36mdf2[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 27 more fields]
[36mdf3[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 27 more fields]
[36mdf4[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 27 more fields]

- Affichons le resultat

In [20]:
df4.select(
    "IncidentDate",
    "OnWatchDate",
    "AvailableDtTS"
).show

+-------------------+-------------------+-------------------+
|       IncidentDate|        OnWatchDate|      AvailableDtTS|
+-------------------+-------------------+-------------------+
|               null|               null|               null|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 00:00:00|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 00:00:00|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 00:00:00|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 00:00:00|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 00:00:00|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 00:00:00|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 00:00:00|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 00:00:00|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 00:00:00|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 00:00:00|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 00:00:00|
|2002-01-11 00:00:00|2002-01-11 00:00:00|2002-01-11 00:00:00|
|2002-01

## Q10. Quels sont les types d'appels les plus courants?

`Pour cela`:
- 1) On recupere le nombre de type d'appel, pour chaque type d'appel;
- 2) On regarde la statistique de ces nombres d'appels (moy, med, max, min, mean) avec la methode `describe`;
- 3) On recupre les types d'appels `superieure a la moyenne`.


In [22]:

// 1) On recupere le nombre de type d'appel, pour chaque type d'appel

val countDistinctCallType =  df4.groupBy("CallType").count()
countDistinctCallType.orderBy("count").show(35,false)

+--------------------------------------------+------+
|CallType                                    |count |
+--------------------------------------------+------+
|null                                        |1     |
|Administrative                              |3     |
|Mutual Aid / Assist Outside Agency          |9     |
|Confined Space / Structure Collapse         |13    |
|Marine Fire                                 |14    |
|Suspicious Package                          |15    |
|Oil Spill                                   |21    |
|Watercraft in Distress                      |28    |
|Extrication / Entrapped (Machinery, Vehicle)|28    |
|High Angle Rescue                           |32    |
|Assist Police                               |35    |
|Aircraft Emergency                          |36    |
|Train / Rail Incident                       |57    |
|Explosion                                   |89    |
|Industrial Accidents                        |94    |
|HazMat                     

[36mcountDistinctCallType[39m: [32mDataFrame[39m = [CallType: string, count: bigint]

In [24]:
// 2) On regarde la statistique de ces nombres d'appels (moy, med, max, min, mean) avec la methode describe;
countDistinctCallType.describe().show()

+-------+--------------------+------------------+
|summary|            CallType|             count|
+-------+--------------------+------------------+
|  count|                  30|                31|
|   mean|                null| 5654.741935483871|
| stddev|                null|20773.063720385173|
|    min|      Administrative|                 1|
|    max|Watercraft in Dis...|            113794|
+-------+--------------------+------------------+



- Le nombre minimal pour un `CallType` est `1` 
- Le nombre Maximal pour un `CallType` est `113794`
- La moyenne est de `5654.74`
Alors, les `CallType`, les plus courants sont ceux superieure a la moyenne `mean`

In [25]:
// 3) Enfin,  On recupre les types d'appels superieure a la moyenne

// show(35,false) : pour afficher toutes les valeurs, sans restriction

countDistinctCallType
  .withColumn("num_gt_5000", col("count").gt(5000))
  .show(35,false)

+--------------------------------------------+------+-----------+
|CallType                                    |count |num_gt_5000|
+--------------------------------------------+------+-----------+
|Electrical Hazard                           |482   |false      |
|Train / Rail Incident                       |57    |false      |
|High Angle Rescue                           |32    |false      |
|Assist Police                               |35    |false      |
|null                                        |1     |false      |
|Medical Incident                            |113794|true       |
|Vehicle Fire                                |854   |false      |
|Industrial Accidents                        |94    |false      |
|Explosion                                   |89    |false      |
|Administrative                              |3     |false      |
|Confined Space / Structure Collapse         |13    |false      |
|Structure Fire                              |23319 |true       |
|Alarms   

- Conclusion : Les types d'appels, les plus frequents sont `Medical Incident`, `Structure Fire`, `Alarms` et `Traffic Collision`

## Q11. Quels sont les boites postales rencontrées dans les appels les plus courants?

In [26]:
// D'abord, Le nombre Zipcode
val dfZipcode =  df4.groupBy("Zipcode").count()
dfZipcode.orderBy("count").show(40,false)


+-------+-----+
|Zipcode|count|
+-------+-----+
|null   |143  |
|94129  |512  |
|94158  |882  |
|94130  |1100 |
|94104  |1341 |
|94127  |1881 |
|94111  |2974 |
|94131  |3236 |
|94123  |3719 |
|94116  |3933 |
|94108  |4084 |
|94105  |4236 |
|94132  |4321 |
|94121  |4555 |
|94134  |5009 |
|94118  |5157 |
|94114  |5175 |
|94117  |5804 |
|94133  |6246 |
|94122  |6355 |
|94107  |6941 |
|94115  |7812 |
|94112  |8421 |
|94124  |9236 |
|94109  |14686|
|94110  |14801|
|94103  |20897|
|94102  |21840|
+-------+-----+



[36mdfZipcode[39m: [32mDataFrame[39m = [Zipcode: int, count: bigint]

- On recupere les plus frequents `Zipcode` en faisiant un filtre sur les types d'appels les plus frequents.

In [27]:
// les boites postales rencontrées dans les appels les plus courants

import org.apache.spark.sql.functions._
import spark.implicits._

df4.filter($"CallType" === "Medical Incident" || $"CallType" === "Structure Fire" || $"CallType" ===  "Alarms" || $"CallType" === "Traffic Collision")
    .select("Zipcode")
    .show()

+-------+
|Zipcode|
+-------+
|  94109|
|  94124|
|  94102|
|  94109|
|  94105|
|  94112|
|  94102|
|  94115|
|  94114|
|  94110|
|  94112|
|  94109|
|  94121|
|  94110|
|  94110|
|  94110|
|  94116|
|  94118|
|  94118|
|  94133|
+-------+
only showing top 20 rows



[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36mspark.implicits._

[39m

## Q12. Quels sont les quartiers de San Francisco dont les codes postaux sont `94102` et `94103`?**

In [29]:
// On fait un filtre avec ces deux "Zipcode" specifiquements 
df4.filter($"Zipcode" === 94102 || $"Zipcode" === 94103).select("Address").show()

+--------------------+
|             Address|
+--------------------+
|MARKET ST/MCALLIS...|
|600 Block of POLK ST|
|    9TH ST/HOWARD ST|
|400 Block of VALE...|
|  16TH ST/MISSION ST|
|   4TH ST/MISSION ST|
|400 Block of TURK ST|
|   OAK ST/WEBSTER ST|
| 0 Block of JONES ST|
|400 Block of EDDY ST|
|300 Block of CLEM...|
| 500 Block of OAK ST|
|700 Block of MARK...|
|HAIGHT ST/OCTAVIA ST|
|100 Block of JULI...|
|0 Block of LARKIN ST|
|100 Block of TURK ST|
|CALL BOX: BUCHANA...|
|    5TH ST/MARKET ST|
| 100 Block of 7TH ST|
+--------------------+
only showing top 20 rows



## Q13. Determinez le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?

 - le nombre total d'appels


In [30]:
df4.groupBy("CallType").count().show(31,false)


+--------------------------------------------+------+
|CallType                                    |count |
+--------------------------------------------+------+
|Electrical Hazard                           |482   |
|Train / Rail Incident                       |57    |
|High Angle Rescue                           |32    |
|Assist Police                               |35    |
|null                                        |1     |
|Medical Incident                            |113794|
|Vehicle Fire                                |854   |
|Industrial Accidents                        |94    |
|Explosion                                   |89    |
|Administrative                              |3     |
|Confined Space / Structure Collapse         |13    |
|Structure Fire                              |23319 |
|Alarms                                      |19406 |
|Smoke Investigation (Outside)               |391   |
|Elevator / Escalator Rescue                 |453   |
|Water Rescue               

 - la moyenne, le minimum et le maximum du temps de réponse des appels

In [21]:
df3.select("ReponseDelayedinMins").describe().show()

+-------+--------------------+
|summary|ReponseDelayedinMins|
+-------+--------------------+
|  count|              175296|
|   mean|   3.892364154521585|
| stddev|   9.378286226254206|
|    min|         0.016666668|
|    max|             1844.55|
+-------+--------------------+



## Q14. Combien d'années distinctes trouve t-on dans ce Dataset? 

Hint: Appliquer la fonction `year()` a la colonne `IncidentDate`

In [31]:
// On cree une nouvelle colonne "year"
val df5 = df4.withColumn("year", year(to_timestamp($"IncidentDate", "MM/dd/yyyy")))

// On recupere tous les annees distincts par aggregation
df5.agg(countDistinct("year")).show()

+--------------------+
|count(DISTINCT year)|
+--------------------+
|                  19|
+--------------------+



[36mdf5[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 28 more fields]

 - On compte `19 années distinctes` dans ce dataset

## Q15. Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?

- 1) Creer une nouvelle colonne `week_of_year` qui recupere le numero de la semaine dans `IncidentDate`;

- 2) Calculer le nombre d'incident par semaine `agg(count(col).as("count"))`;

- 3) Filtrer pour l'annee 2018 `filter(col.between(1_janvier_2018,31_decembre_2018))`;

- 4) Trier par ordre descroissant `orderBy(desc("count"))`

In [95]:
// Creer une colonne avec le format semaine "week_of_year"
val df6 = df5.withColumn("week_of_year", date_format(col("IncidentDate"), "w"))

// Calculer le nombre d'incident par semaine en 2018
val dfIncidentByWeek = df6.groupBy($"week_of_year",$"IncidentDate")
.agg(count($"IncidentDate").as("count"))
.filter($"IncidentDate"
.between("2018-01-01 00:00:00","2018-12-31 00:00:00"))
.orderBy(desc("count"))
.show(10)

+------------+-------------------+-----+
|week_of_year|       IncidentDate|count|
+------------+-------------------+-----+
|           1|2018-01-01 00:00:00|   58|
|          25|2018-06-20 00:00:00|   50|
|          11|2018-03-16 00:00:00|   47|
|          40|2018-10-02 00:00:00|   47|
|          17|2018-04-22 00:00:00|   47|
|          10|2018-03-07 00:00:00|   46|
|           7|2018-02-15 00:00:00|   45|
|          38|2018-09-18 00:00:00|   44|
|          22|2018-06-02 00:00:00|   44|
|          19|2018-05-11 00:00:00|   44|
+------------+-------------------+-----+
only showing top 10 rows



[36mdf6[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 29 more fields]

 - Le premiere semaine de l'anne `2018` (`week_of_year = 1`) a enregistre le plus d'appel d'incendie : `58`

## Q16. Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?

 - Creer une nouvelle colonne `worst_reply_time` qui retourne `True` or `False`
 - Renvoi `True`, si le temps de reponse est inferieur a la moyenne `( < 2)`
 - Renvoi `False`, si le temps de reponse est superieur a la moyenne `( > 2)`

In [96]:

df6.withColumn("worst_reply_time", col("ReponseDelayedinMins") > 2 )
  .select(
    "Address",
      "ReponseDelayedinMins",
    "worst_reply_time",
).show()

+--------------------+--------------------+----------------+
|             Address|ReponseDelayedinMins|worst_reply_time|
+--------------------+--------------------+----------------+
|                null|                null|            null|
|2000 Block of CAL...|                2.95|            true|
|0 Block of SILVER...|                 4.7|            true|
|MARKET ST/MCALLIS...|           2.4333334|            true|
|APPLETON AV/MISSI...|                 1.5|           false|
|1400 Block of SUT...|           3.4833333|            true|
|  BEALE ST/FOLSOM ST|                1.75|           false|
|0 Block of FARALL...|           2.7166667|            true|
|600 Block of POLK ST|           1.7833333|           false|
|1500 Block of WEB...|           1.5166667|           false|
|DIAMOND ST/MARKET ST|           2.7666667|            true|
|2700 Block of MIS...|           2.1833334|            true|
|BRUNSWICK ST/GUTT...|                 2.5|            true|
|1000 Block of SUT...|  

## Q17. Stocker les données sous format de fichiers Parquet

In [99]:

df6.write.parquet("MySparkproject.parquet")


## Q18. Rechargez  les données stockées en format Parquet

In [100]:

val parquetFileDF = spark.read.parquet("MySparkproject.parquet")


[36mparquetFileDF[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 29 more fields]