# Big Data avec Spark : Spark SQL

*`Nom & Prenom : Djiadji DIAW`*

## Problematique

Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Datasets** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données [ici](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

**Download csv file [here](https://data.sfgov.org/api/views/nuek-vuh3/rows.csv?accessType=DOWNLOAD)**

## Travail à faire.
L'objectif de ce travail est de comprendre le Dataset SF Fire afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquats.

- Code lisible et bien indenté, 
- N'oublier pas de mettre en commentaire la justification de votre réponse sur les cellule Markdown. 


#### Note:
- Vous pouvez en groupe (au plus deux étudiants) . 

## Q1. Importez les modules Spark necessaires

In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.5`
import $ivy.`sh.almond::almond-spark:0.10.9`

[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36m$ivy.$                               [39m

In [2]:
import org.apache.log4j.{Level, Logger}

val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}

[39m
[36mrootLogger[39m: [32mLogger[39m = org.apache.log4j.spi.RootLogger@da1825a

## Q2. Creez la Spark Session

In [3]:
import org.apache.spark.sql._

val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    .getOrCreate()
}

Loading spark-stubs
Getting spark JARs
Creating SparkSession


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
23/04/25 20:10:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/25 20:10:54 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
	at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:378)
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:393)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:386)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:116)
	at org.apache.hadoop.security.Groups.<init>(Groups.java:93)
	at org.apache.hadoop.security.Groups.<init>(Groups.java:73)
	at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:293)
	at org.apache.hadoop.security.UserGroupInform

[32mimport [39m[36morg.apache.spark.sql._

[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@41cef3eb

## Q3. Chargez les données

Utilisez le `fireSchema` definit dans la cellule suivante pour le chargement.

In [4]:
import org.apache.spark.sql.types._

val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))



[32mimport [39m[36morg.apache.spark.sql.types._

[39m
[36mfireSchema[39m: [32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"CallNumber"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"UnitID"[39m, StringType, true, {}),
  [33mStructField[39m([32m"IncidentNumber"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"CallType"[39m, StringType, true, {}),
  [33mStructField[39m([32m"CallDate"[39m, StringType, true, {}),
  [33mStructField[39m([32m"WatchDate"[39m, StringType, true, {}),
  [33mStructField[39m([32m"CallFinalDisposition"[39m, StringType, true, {}),
  [33mStructField[39m([32m"AvailableDtTm"[39m, StringType, true, {}),
  [33mStructField[39m([32m"Address"[39m, StringType, true, {}),
  [33mStructField[39m([32m"City"[39m, StringType, true, {}),
  [33mStructField[39m([32m"Zipcode"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"Battalion"[39m, StringType, true, {}),
  [33mStructField[39m(

In [5]:
val path = "data/sf-fire-calls.csv"
val df = spark
        .read
        .format("csv")
        .option("delimiter", ",")
        .schema(fireSchema)
        .load(path)

[36mpath[39m: [32mString[39m = [32m"data/sf-fire-calls.csv"[39m
[36mdf[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 26 more fields]

In [6]:
print("Le nombre de ligne " + df.count())

23/04/25 20:11:06 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.


Le nombre de ligne 175297

## Q4. Mettez en cache les donnees chargees

In [7]:
import org.apache.spark.storage.StorageLevel
df.cache
df.persist(StorageLevel.DISK_ONLY)

23/04/25 20:11:12 WARN CacheManager: Asked to cache already cached data.


[32mimport [39m[36morg.apache.spark.storage.StorageLevel
[39m
[36mres6_1[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 26 more fields]
[36mres6_2[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 26 more fields]

On utilise la mise en cache quand on effectue plusieurs actions sur le même DataFrame. 

## Q5. Supprimez tous les appels de type `Medical Incident`

Hint: appliquez la methode `.filter()` a la colonne `CallType` avec l'operateur `=!=`

In [8]:
val medIncidentDf = df.filter(df("CallType") =!= "Medical Incident")
medIncidentDf.show()

+----------+------+--------------+--------------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|            CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+--------------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+

+----------+------+--------------+--------------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
only showing top 20 rows



[36mmedIncidentDf[39m: [32mDataset[39m[[32mRow[39m] = [CallNumber: int, UnitID: string ... 26 more fields]

## Q6. Combien de types d'appels distincts ont été passés ?**  

In [9]:
import org.apache.spark.sql.functions.countDistinct

val count = df.select(countDistinct("CallType") as "Nombre d'appel")
count.show()

+--------------+
|Nombre d'appel|
+--------------+
|            30|
+--------------+



[32mimport [39m[36morg.apache.spark.sql.functions.countDistinct

[39m
[36mcount[39m: [32mDataFrame[39m = [Nombre d'appel: bigint]

## Q7. Quels types d'appels  ont été passés au service d'incendie?

In [10]:
import org.apache.spark.sql.functions._
import spark.sqlContext.implicits._

df.select("CallType").distinct.show(40, truncate=false)

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|null                                        |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Coll

[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36mspark.sqlContext.implicits._

[39m

## Q8. Trouvez toutes les réponses ou les délais sont supérieurs à 5 minutes

Hint:
1. Renommez la colonne `Delay` -> `ReponseDelayedinMins`
2. Retournez un nouveau DataFrame
3. Affichez tous les appels où le temps de réponse au site d'incendie a eu un retard de plus de 5 minutes

In [11]:
val newDf = df.withColumnRenamed("Delay", "ResponseDelayedinMins")
newDf
    .where(col("ResponseDelayedinMins") > 5)
    .show()

+----------+------+--------------+--------------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------------------+
|CallNumber|UnitID|IncidentNumber|            CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|ResponseDelayedinMins|
+----------+------+--------------+--------------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-------

|  20190097|   RS1|       2005623|      Structure Fire|01/19/2002|01/19/2002|               Other|01/19/2002 10:50:...| 100 Block of 2ND ST|  SF|  94105|      B03|         01|2146|               3|       3|            3|   true|         null|        1|  RESCUE SQUAD|                         2|                     3|                 6|Financial Distric...|(37.787352867216,...|020190097-RS1|            13.583333|
+----------+------+--------------+--------------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------------------+
only showing top 20 rows



[36mnewDf[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 26 more fields]

## Q9. Convertissez les colonnes dates en timestamp

Hint:
* `CallDate` -> `IncidentDate`
* `WatchDate` -> `OnWatchDate`
* `AvailableDtTm` -> `AvailableDtTS`
exemple code pour le cas de `CallDate`:
`dataframe.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate")`

In [12]:
val dfDateConvert = newDf
    .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate")
    .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate")
    .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy")).drop("AvailableDtTm")

dfDateConvert.show(10)

+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------------------+-------------------+-------------------+-------------------+
|CallNumber|UnitID|IncidentNumber|        CallType|CallFinalDisposition|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|ResponseDelayedinMins|       IncidentDate|        OnWatchDate|      AvailableDtTS|
+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+------

[36mdfDateConvert[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 26 more fields]

## Q10. Quels sont les types d'appels les plus courants?

In [13]:
df.groupBy("CallType").count().orderBy(col("count").desc).show(40)

+--------------------+------+
|            CallType| count|
+--------------------+------+
|    Medical Incident|113794|
|      Structure Fire| 23319|
|              Alarms| 19406|
|   Traffic Collision|  7013|
|Citizen Assist / ...|  2524|
|               Other|  2166|
|        Outside Fire|  2094|
|        Vehicle Fire|   854|
|Gas Leak (Natural...|   764|
|        Water Rescue|   755|
|Odor (Strange / U...|   490|
|   Electrical Hazard|   482|
|Elevator / Escala...|   453|
|Smoke Investigati...|   391|
|          Fuel Spill|   193|
|              HazMat|   124|
|Industrial Accidents|    94|
|           Explosion|    89|
|Train / Rail Inci...|    57|
|  Aircraft Emergency|    36|
|       Assist Police|    35|
|   High Angle Rescue|    32|
|Watercraft in Dis...|    28|
|Extrication / Ent...|    28|
|           Oil Spill|    21|
|  Suspicious Package|    15|
|         Marine Fire|    14|
|Confined Space / ...|    13|
|Mutual Aid / Assi...|     9|
|      Administrative|     3|
|         

## Q11. Quels sont les boites postales rencontrées dans les appels les plus courants?

In [14]:
df.filter($"CallType" === "Medical Incident").select("ZipCode").show()

+-------+
|ZipCode|
+-------+
|  94124|
|  94102|
|  94115|
|  94114|
|  94110|
|  94109|
|  94121|
|  94116|
|  94118|
|  94118|
|  94133|
|  94111|
|  94132|
|  94134|
|  94118|
|  94117|
|  94103|
|  94115|
|  94109|
|  94110|
+-------+
only showing top 20 rows



## Q12. Quels sont les quartiers de San Francisco dont les codes postaux sont `94102` et `94103`?**

In [15]:
df
    .select("Neighborhood", "Zipcode")
    .filter($"Zipcode" === 94102 || $"Zipcode" === 94103)
    .show()

+--------------------+-------+
|        Neighborhood|Zipcode|
+--------------------+-------+
|          Tenderloin|  94102|
|          Tenderloin|  94102|
|     South of Market|  94103|
|             Mission|  94103|
|             Mission|  94103|
|     South of Market|  94103|
|          Tenderloin|  94102|
|        Hayes Valley|  94102|
|          Tenderloin|  94102|
|          Tenderloin|  94102|
|     South of Market|  94103|
|        Hayes Valley|  94102|
|Financial Distric...|  94103|
|        Hayes Valley|  94102|
|             Mission|  94103|
|          Tenderloin|  94102|
|          Tenderloin|  94102|
|        Hayes Valley|  94102|
|     South of Market|  94103|
|     South of Market|  94103|
+--------------------+-------+
only showing top 20 rows



## Q13. Determinez le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?

In [20]:
newDf.select("ResponseDelayedinMins").describe().show()

+-------+---------------------+
|summary|ResponseDelayedinMins|
+-------+---------------------+
|  count|               175296|
|   mean|    3.892364154521585|
| stddev|    9.378286226254204|
|    min|          0.016666668|
|    max|              1844.55|
+-------+---------------------+



## Q14. Combien d'années distinctes trouve t-on dans ce Dataset? 

Hint: Appliquer la fonction `year()` a la colonne `IncidentDate`

In [21]:
dfDateConvert
    .select(year($"IncidentDate"))
    .distinct()
    .show()

+------------------+
|year(IncidentDate)|
+------------------+
|              2003|
|              2007|
|              2018|
|              2015|
|              2006|
|              2013|
|              null|
|              2014|
|              2004|
|              2012|
|              2009|
|              2016|
|              2001|
|              2005|
|              2000|
|              2010|
|              2011|
|              2008|
|              2017|
|              2002|
+------------------+



## Q15. Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?

In [22]:
val fireCallsByWeekCount = dfDateConvert
  .groupBy("WeekOfYear")
  .agg(count("*").as("NumFireCalls"))
  .orderBy(desc("NumFireCalls"))

: 

## Q16. Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?

## Q17. Stocker les données sous format de fichiers Parquet

In [None]:
df.write.format("parquet").save("/tmp/firedataService_parquet/files/")

## Q18. Rechargez  les données stockées en format Parquet

In [None]:
val newdataDF = sparkSession.read.parquet("/tmp/firedataService_parquet/files/")

In [None]:
newdataDF.printSchema