# Big Data avec Spark : Spark SQL

*`Nom & Prenom : GUEYE DIE

## Problematique

Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Datasets** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données [ici](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

**Download csv file [here](https://data.sfgov.org/api/views/nuek-vuh3/rows.csv?accessType=DOWNLOAD)**

## Travail à faire.
L'objectif de ce travail est de comprendre le Dataset SF Fire afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquats.

- Code lisible et bien indenté, 
- N'oublier pas de mettre en commentaire la justification de votre réponse sur les cellule Markdown. 


#### Note:
- Vous pouvez en groupe (au plus deux étudiants) . 

## Q1. Importez les modules Spark necessaires

In [2]:
import $ivy.`org.apache.spark::spark-sql:2.4.5` // Or use any other 2.x version here
import $ivy.`sh.almond::almond-spark:0.10.9` // Not required since almond 0.7.0 (will be automatically added when importing spark)

[32mimport [39m[36m$ivy.$                                   // Or use any other 2.x version here
[39m
[32mimport [39m[36m$ivy.$                                // Not required since almond 0.7.0 (will be automatically added when importing spark)[39m

In [3]:
import org.apache.log4j.{Level, Logger}

val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}

[39m
[36mrootLogger[39m: [32mLogger[39m = org.apache.log4j.spi.RootLogger@409e56a0

## Q2. Creez la Spark Session

In [4]:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m

In [5]:
import org.apache.spark.sql._

val spark = {
  SparkSession.builder()
    .master("local")
    .appName("BD-FS FIRE")
    .config("spark.some.option.config", "config-value")
    .getOrCreate()
}

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[32mimport [39m[36morg.apache.spark.sql._

[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@207ccb20

## Q3. Chargez les données

In [6]:
val path = "data/departement_call/sf-fire-calls.csv"
val df = spark.read
        .option("header","true")
        .csv(path)

[36mpath[39m: [32mString[39m = [32m"data/departement_call/sf-fire-calls.csv"[39m
[36mdf[39m: [32mDataFrame[39m = [CallNumber: string, UnitID: string ... 26 more fields]

Utilisez le `fireSchema` definit dans la cellule suivante pour le chargement.

In [7]:
import org.apache.spark.sql.types._

val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))

// your code here (hint spark session name is sparkSession Q2)
// val data = 


[32mimport [39m[36morg.apache.spark.sql.types._

[39m
[36mfireSchema[39m: [32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"CallNumber"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"UnitID"[39m, StringType, true, {}),
  [33mStructField[39m([32m"IncidentNumber"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"CallType"[39m, StringType, true, {}),
  [33mStructField[39m([32m"CallDate"[39m, StringType, true, {}),
  [33mStructField[39m([32m"WatchDate"[39m, StringType, true, {}),
  [33mStructField[39m([32m"CallFinalDisposition"[39m, StringType, true, {}),
  [33mStructField[39m([32m"AvailableDtTm"[39m, StringType, true, {}),
  [33mStructField[39m([32m"Address"[39m, StringType, true, {}),
  [33mStructField[39m([32m"City"[39m, StringType, true, {}),
  [33mStructField[39m([32m"Zipcode"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"Battalion"[39m, StringType, true, {}),
  [33mStructField[39m(

## Q4. Mettez en cache les donnees chargees

In [8]:
//On utilise la mise en cache quand on effectue plusieurs actions sur le même DataFrame. 
df.cache
df.persist()

23/04/17 22:28:10 WARN CacheManager: Asked to cache already cached data.


[36mres7_0[39m: [32mDataFrame[39m = [CallNumber: string, UnitID: string ... 26 more fields]
[36mres7_1[39m: [32mDataFrame[39m = [CallNumber: string, UnitID: string ... 26 more fields]

## Q5. Supprimez tous les appels de type `Medical Incident`

Hint: appliquez la methode `.filter()` a la colonne `CallType` avec l'operateur `=!=`

In [9]:
//Avant la suppression des lignes contenant tous les appels de type "Medical Incident"
df.select("CallType").show()

23/04/17 22:28:14 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.


+----------------+
|        CallType|
+----------------+
|  Structure Fire|
|Medical Incident|
|Medical Incident|
|    Vehicle Fire|
|          Alarms|
|  Structure Fire|
|          Alarms|
|          Alarms|
|Medical Incident|
|Medical Incident|
|Medical Incident|
|  Structure Fire|
|Medical Incident|
|Medical Incident|
|  Structure Fire|
|  Structure Fire|
|  Structure Fire|
|Medical Incident|
|Medical Incident|
|Medical Incident|
+----------------+
only showing top 20 rows



In [10]:
//La méthode col() permet de sélectionner la colonne à filtrer et l'opérateur =!= est utilisé pour filtrer les valeurs dont CallType est egale à "Medical Incident"
import org.apache.spark.sql.functions.col

val df_filtered = df.filter(col("CallType") =!= "Medical Incident")

[32mimport [39m[36morg.apache.spark.sql.functions.col

[39m
[36mdf_filtered[39m: [32mDataset[39m[[32mRow[39m] = [CallNumber: string, UnitID: string ... 26 more fields]

## Q6. Combien de types d'appels distincts ont été passés ?**  

In [11]:
//Pour obtenir les valeurs distinctes dans la colonne CaalType on utilise la méthode distinct() 
//et la méthode count() est utilisée pour compter le nombre de valeurs distinctes. 
//Ainsi on obtient 30 valeurs distinctes
import org.apache.spark.sql.functions.countDistinct
df_filtered.agg(countDistinct("CallType") as "Nombre de type d'appel")

[32mimport [39m[36morg.apache.spark.sql.functions.countDistinct
[39m
[36mres10_1[39m: [32mDataFrame[39m = [Nombre de type d'appel: bigint]

## Q7. Quels types d'appels  ont été passés au service d'incendie?

In [12]:
df_filtered.select("CallType").distinct.show(30, truncate=false)

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Assist Polic

## Q8. Trouvez toutes les réponses ou les délais sont supérieurs à 5 minutes

Hint:
1. Renommez la colonne `Delay` -> `ReponseDelayedinMins`
2. Retournez un nouveau DataFrame
3. Affichez tous les appels où le temps de réponse au site d'incendie a eu un retard de plus de 5 minutes

In [13]:
//Pour renommez une colonne on utilise la fonction withColumnRenamed()
val newdf = df_filtered.withColumnRenamed("Delay", "ResponseDelayedinMins")

[36mnewdf[39m: [32mDataFrame[39m = [CallNumber: string, UnitID: string ... 26 more fields]

In [14]:
//Verification
newdf.printSchema()

root
 |-- CallNumber: string (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: string (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: string (nullable = true)
 |-- ALSUnit: string (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: string (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: string (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- Sup

In [39]:
//Affichons tous les appels où le temps de réponse au site d'incendie a eu un retard de plus de 5 minutes
newdf.filter(col("ResponseDelayedinMins")>5)

[36mres38[39m: [32mDataset[39m[[32mRow[39m] = [CallNumber: string, UnitID: string ... 26 more fields]

## Q9. Convertissez les colonnes dates en timestamp

Hint:
* `CallDate` -> `IncidentDate`
* `WatchDate` -> `OnWatchDate`
* `AvailableDtTm` -> `AvailableDtTS`
exemple code pour le cas de `CallDate`:
`dataframe.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate")`

In [72]:
import org.apache.spark.sql.functions.{col, to_timestamp}

val newdf2 = newdf
  .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
  .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
  .withColumn("AvailableDt", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy"))
  .drop("CallDate", "WatchDate", "AvailableDtTm")

[32mimport [39m[36morg.apache.spark.sql.functions.{col, to_timestamp}

[39m
[36mnewdf2[39m: [32mDataFrame[39m = [CallNumber: string, UnitID: string ... 26 more fields]

In [77]:
//Verification
newdf2.printSchema()

root
 |-- CallNumber: string (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: string (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: string (nullable = true)
 |-- ALSUnit: string (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: string (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: string (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Location: string (nullable = true)


## Q10. Quels sont les types d'appels les plus courants?

In [18]:
//La méthode groupBy() permet de regrouper les lignes de la base de données en fonction des valeurs de la colonne 
//"CallType". La méthode count() est ensuite utilisée pour compter le nombre de lignes pour chaque groupe et 
//la méthode orderBy() est utilisée pour trier les résultats par ordre décroissant de nombre d'appels.
import org.apache.spark.sql.functions.{col, desc}
val topCalls = newdf.groupBy("CallType").count().orderBy(desc("count")).show(5)

+--------------------+-----+
|            CallType|count|
+--------------------+-----+
|      Structure Fire|23319|
|              Alarms|19406|
|   Traffic Collision| 7013|
|Citizen Assist / ...| 2524|
|               Other| 2166|
+--------------------+-----+
only showing top 5 rows



[32mimport [39m[36morg.apache.spark.sql.functions.{col, desc}
[39m

## Q11. Quels sont les boites postales rencontrées dans les appels les plus courants?

In [96]:
newdf
    .filter(col("CallType") === "Medical Incident")
    .select("Zipcode")
    .show()

+-------+
|Zipcode|
+-------+
+-------+



## Q12. Quels sont les quartiers de San Francisco dont les codes postaux sont `94102` et `94103`?**

In [27]:
newdf
    .select("Neighborhood","Zipcode")
    .where (col("Zipcode") === 94102 || 
           col("Zipcode") === 94103)
    .distinct()
    .show()

+--------------------+-------+
|        Neighborhood|Zipcode|
+--------------------+-------+
|         Mission Bay|  94103|
|Financial Distric...|  94103|
| Castro/Upper Market|  94103|
|    Western Addition|  94102|
|            Nob Hill|  94102|
|     South of Market|  94103|
|        Potrero Hill|  94103|
|        Hayes Valley|  94103|
|     South of Market|  94102|
|          Tenderloin|  94102|
|          Tenderloin|  94103|
|             Mission|  94103|
|Financial Distric...|  94102|
|        Hayes Valley|  94102|
+--------------------+-------+



## Q13. Determinez le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?

In [32]:
newdf.select("ResponseDelayedinMins").describe().show()

+-------+---------------------+
|summary|ResponseDelayedinMins|
+-------+---------------------+
|  count|                61502|
|   mean|   3.9247360518539818|
| stddev|   14.446973866076661|
|    min|          0.016666668|
|    max|             99.63333|
+-------+---------------------+



## Q14. Combien d'années distinctes trouve t-on dans ce Dataset? 

Hint: Appliquer la fonction `year()` a la colonne `IncidentDate`

In [50]:
//Pour déterminer le nombre d'années distinctes dans ce Dataset, nous pouvons sélectionner la colonne "IncidentDate", 
//extraire l'année de chaque date avec la fonction year() et utiliser la fonction distinct() pour obtenir les années 
//uniques. Ensuite, nous pouvons utiliser la méthode count() pour connaître le nombre d'années distinctes.

import org.apache.spark.sql.functions._
import spark.implicits._
newdf
    .select(year(col("CallDate")))
    .distinct()
    .count()


[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36mspark.implicits._
[39m
[36mres49_2[39m: [32mLong[39m = [32m1L[39m

## Q15. Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?

In [87]:
newdf2
    .filter(year(col("IncidentDate")) === "2018" && col("callType") === "Structure Fire")
    .groupBy(weekofyear(col("IncidentDate")).as("Nombre de semaine"))
    .count()
    .orderBy(col("count").desc)
    .show(1)

+-----------------+-----+
|Nombre de semaine|count|
+-----------------+-----+
|               25|   31|
+-----------------+-----+
only showing top 1 row



Le résultat ci dessus montre que la 1er semaine de l’année 2018 a eu le plus d'appels d'incendie

## Q16. Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?

In [89]:
// Filtrer les appels d'incendie de l'année 2018
val calls2018 = newdf2.filter(year($"IncidentDate") === 2018)

// Grouper par quartier et calculer la moyenne du temps de réponse
val avgResponseByNeighborhood = calls2018.groupBy($"Neighborhood").agg(avg($"ResponseDelayedinMins").alias("AvgResponseTime"))

// Trier les résultats par ordre décroissant du temps de réponse
val worstResponseNeighborhoods = avgResponseByNeighborhood.sort($"AvgResponseTime".asc)

// Afficher les quartiers avec le pire temps de réponse
worstResponseNeighborhoods.show()

+-------------------+------------------+
|       Neighborhood|   AvgResponseTime|
+-------------------+------------------+
|           Seacliff|2.6428571285714284|
|       Hayes Valley|2.6988888820000003|
|   Western Addition|2.8509615380769233|
|          Excelsior|2.9350340020408163|
|          Japantown|2.9442029130434784|
|       Lincoln Park| 2.944444466666667|
|  Lone Mountain/USF|3.0205555449999997|
|   Presidio Heights|      3.1759999776|
|           Nob Hill|3.1841121410280384|
|             Marina|3.3417670865060245|
|            Mission|3.4078787935909083|
|        North Beach|3.5173611426388884|
|    Sunset/Parkside|3.5372641693396236|
|    South of Market|3.5448660579017854|
|     Outer Richmond| 3.551704540227272|
|Castro/Upper Market|3.5757777834666666|
|       Inner Sunset|        3.61604935|
|     Inner Richmond| 3.619858159361702|
|            Portola|3.7143677944827598|
|          Glen Park|3.8864583062499993|
+-------------------+------------------+
only showing top

[36mcalls2018[39m: [32mDataset[39m[[32mRow[39m] = [CallNumber: string, UnitID: string ... 26 more fields]
[36mavgResponseByNeighborhood[39m: [32mDataFrame[39m = [Neighborhood: string, AvgResponseTime: double]
[36mworstResponseNeighborhoods[39m: [32mDataset[39m[[32mRow[39m] = [Neighborhood: string, AvgResponseTime: double]

## Q17. Stocker les données sous format de fichiers Parquet

In [90]:
newdf2.write.format("parquet").save("tmp/parket/files/")

: 

## Q18. Rechargez  les données stockées en format Parquet

In [91]:
val newdataDF = spark.read.format("parquet").load("tmp/parket/files/")
              

[36mnewdataDF[39m: [32mDataFrame[39m = [CallNumber: string, UnitID: string ... 26 more fields]

In [92]:
newdataDF.printSchema

root
 |-- CallNumber: string (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: string (nullable = true)
 |-- CallType: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: string (nullable = true)
 |-- ALSUnit: string (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: string (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: string (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true