# Big Data avec Spark : Spark SQL

*`Nom & Prenom : `*
Abal Khassim TRAORE

## Problematique

Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Datasets** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données [ici](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

**Download csv file [here](https://data.sfgov.org/api/views/nuek-vuh3/rows.csv?accessType=DOWNLOAD)**

## Travail à faire.
L'objectif de ce travail est de comprendre le Dataset SF Fire afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquats.

- Code lisible et bien indenté, 
- N'oublier pas de mettre en commentaire la justification de votre réponse sur les cellule Markdown. 


#### Note:
- Vous pouvez en groupe (au plus deux étudiants) . 

In [11]:
//creation d un dataframe
import $ivy.`org.apache.spark::spark-sql:2.4.5`
import $ivy.`sh.almond::almond-spark:0.10.9`

[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36m$ivy.$                               [39m

## Q1. Importez les modules Spark necessaires

In [12]:
import org.apache.log4j.{Level, Logger}

val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)
import org.apache.spark.sql.functions._

[32mimport [39m[36morg.apache.log4j.{Level, Logger}

[39m
[36mrootLogger[39m: [32mLogger[39m = org.apache.log4j.spi.RootLogger@328949f6
[32mimport [39m[36morg.apache.spark.sql.functions._[39m

## Q2. Creez la Spark Session

In [13]:
import org.apache.spark.sql._

val spark = {
  SparkSession.builder()
    .master("local")
    .appName("BD-FS FIRE")
    .getOrCreate()
}

23/04/25 19:04:26 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.


[32mimport [39m[36morg.apache.spark.sql._

[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@7d3274ad

## Q3. Chargez les données

Utilisez le `fireSchema` definit dans la cellule suivante pour le chargement.

In [14]:
import org.apache.spark.sql.types._
val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))

// your code here (hint spark session name is sparkSession Q2)


[32mimport [39m[36morg.apache.spark.sql.types._
[39m
[36mfireSchema[39m: [32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"CallNumber"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"UnitID"[39m, StringType, true, {}),
  [33mStructField[39m([32m"IncidentNumber"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"CallType"[39m, StringType, true, {}),
  [33mStructField[39m([32m"CallDate"[39m, StringType, true, {}),
  [33mStructField[39m([32m"WatchDate"[39m, StringType, true, {}),
  [33mStructField[39m([32m"CallFinalDisposition"[39m, StringType, true, {}),
  [33mStructField[39m([32m"AvailableDtTm"[39m, StringType, true, {}),
  [33mStructField[39m([32m"Address"[39m, StringType, true, {}),
  [33mStructField[39m([32m"City"[39m, StringType, true, {}),
  [33mStructField[39m([32m"Zipcode"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"Battalion"[39m, StringType, true, {}),
  [33mStructField[39m(

In [15]:
val path = "data/sf-fire-calls.csv"
val fireDF = spark
        .read
        .option("header","true")
        .csv(path)

println("le nombre de ligne est " +fireDF.count())
println("le nombre de colonne est " +fireDF.columns.length)
fireDF.show()

le nombre de ligne est 175296
le nombre de colonne est 28
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+---

[36mpath[39m: [32mString[39m = [32m"data/sf-fire-calls.csv"[39m
[36mfireDF[39m: [32mDataFrame[39m = [CallNumber: string, UnitID: string ... 26 more fields]

## Q4. Mettez en cache les donnees chargees

In [16]:
fireDF.cache
fireDF.persist

23/04/25 19:04:30 WARN CacheManager: Asked to cache already cached data.


[36mres15_0[39m: [32mDataFrame[39m = [CallNumber: string, UnitID: string ... 26 more fields]
[36mres15_1[39m: [32mDataFrame[39m = [CallNumber: string, UnitID: string ... 26 more fields]

In [17]:
import org.apache.spark.sql.functions._

[32mimport [39m[36morg.apache.spark.sql.functions._[39m

On utilise la mise en cache quand on effectue plusieurs actions sur le même DataFrame. 

## Q5. Supprimez tous les appels de type `Medical Incident`

Hint: appliquez la methode `.filter()` a la colonne `CallType` avec l'operateur `=!=`

In [18]:
val df_without_mi =fireDF.filter(col("CallType")=!= "Medical Incident")
df_without_mi.show(10)

+----------+------+--------------+--------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|      CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+--------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+----

[36mdf_without_mi[39m: [32mDataset[39m[[32mRow[39m] = [CallNumber: string, UnitID: string ... 26 more fields]

## Q6. Combien de types d'appels distincts ont été passés ?**  

In [19]:
fireDF.agg(countDistinct("CallType") as "Nombre de Type d'appel").show

+----------------------+
|Nombre de Type d'appel|
+----------------------+
|                    30|
+----------------------+



## Q7. Quels types d'appels  ont été passés au service d'incendie?

In [20]:
fireDF.select("CallType").distinct.show(30,truncate =false)

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Assist Polic

## Q8. Trouvez toutes les réponses ou les délais sont supérieurs à 5 minutes

Hint:
1. Renommez la colonne `Delay` -> `ReponseDelayedinMins`
2. Retournez un nouveau DataFrame
3. Affichez tous les appels où le temps de réponse au site d'incendie a eu un retard de plus de 5 minutes

In [21]:
val fireWithDelayRename = fireDF.withColumnRenamed("Delay", "ResponseDelayedinMins")


fireWithDelayRename.filter(col("ResponseDelayedinMins") > 5).show(false)


+----------+------+--------------+-----------------------------+----------+----------+--------------------+----------------------+---------------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+------------+--------------------------+----------------------+------------------+------------------------------+-------------------------------------+-------------+---------------------+
|CallNumber|UnitID|IncidentNumber|CallType                     |CallDate  |WatchDate |CallFinalDisposition|AvailableDtTm         |Address                    |City|Zipcode|Battalion|StationArea|Box |OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType    |UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|Neighborhood                  |Location                             |RowID        |ResponseDelayedinMins|
+----------+------+--------------+-----------------------------+----------+---------

[36mfireWithDelayRename[39m: [32mDataFrame[39m = [CallNumber: string, UnitID: string ... 26 more fields]

## Q9. Convertissez les colonnes dates en timestamp

Hint:
* `CallDate` -> `IncidentDate`
* `WatchDate` -> `OnWatchDate`
* `AvailableDtTm` -> `AvailableDtTS`
exemple code pour le cas de `CallDate`:
`dataframe.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate")`

In [22]:
val dfWithEncondedDate =(fireDF.withColumn("IncidentDate",to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate")
                 .withColumn("OnWatchDate",to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate")
                 .withColumn("AvailableDtTS",to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy")).drop("AvailableDtTm")
                 .withColumnRenamed("Delay", "ResponseDelayedinMins")
)

                 
               

[36mdfWithEncondedDate[39m: [32mDataFrame[39m = [CallNumber: string, UnitID: string ... 26 more fields]

## Q10. Quels sont les types d'appels les plus courants?

In [23]:
dfWithEncondedDate.groupBy("CallType")
.count()
.orderBy(col("count").desc)
.show(5, truncate =false)

+-----------------------------+------+
|CallType                     |count |
+-----------------------------+------+
|Medical Incident             |113794|
|Structure Fire               |23319 |
|Alarms                       |19406 |
|Traffic Collision            |7013  |
|Citizen Assist / Service Call|2524  |
+-----------------------------+------+
only showing top 5 rows



## Q11. Quels sont les boites postales rencontrées dans les appels les plus courants?

In [24]:
val topZipcodes = fireDF.groupBy("CallType", "Zipcode")
  .count()
  .orderBy(desc("count"))
  .limit(10)

  topZipcodes.show(false)


+----------------+-------+-----+
|CallType        |Zipcode|count|
+----------------+-------+-----+
|Medical Incident|94102  |16130|
|Medical Incident|94103  |14775|
|Medical Incident|94110  |9995 |
|Medical Incident|94109  |9479 |
|Medical Incident|94124  |5885 |
|Medical Incident|94112  |5630 |
|Medical Incident|94115  |4785 |
|Medical Incident|94122  |4323 |
|Medical Incident|94107  |4284 |
|Medical Incident|94133  |3977 |
+----------------+-------+-----+



[36mtopZipcodes[39m: [32mDataset[39m[[32mRow[39m] = [CallType: string, Zipcode: string ... 1 more field]

## Q12. Quels sont les quartiers de San Francisco dont les codes postaux sont `94102` et `94103`?**

In [25]:
val filteredDF = fireDF.filter(col("Zipcode").isin("94102", "94103"))

val neighborhoods = filteredDF.select("Neighborhood","Zipcode").distinct()

neighborhoods.show(false)


+------------------------------+-------+
|Neighborhood                  |Zipcode|
+------------------------------+-------+
|Mission Bay                   |94103  |
|Financial District/South Beach|94103  |
|Castro/Upper Market           |94103  |
|Western Addition              |94102  |
|Nob Hill                      |94102  |
|South of Market               |94103  |
|Potrero Hill                  |94103  |
|Hayes Valley                  |94103  |
|South of Market               |94102  |
|Tenderloin                    |94102  |
|Tenderloin                    |94103  |
|Mission                       |94103  |
|Financial District/South Beach|94102  |
|Hayes Valley                  |94102  |
+------------------------------+-------+



[36mfilteredDF[39m: [32mDataset[39m[[32mRow[39m] = [CallNumber: string, UnitID: string ... 26 more fields]
[36mneighborhoods[39m: [32mDataset[39m[[32mRow[39m] = [Neighborhood: string, Zipcode: string]

## Q13. Determinez le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?

In [26]:
fireWithDelayRename.describe("ResponseDelayedinMins").show()
fireWithDelayRename.printSchema()



+-------+---------------------+
|summary|ResponseDelayedinMins|
+-------+---------------------+
|  count|               175296|
|   mean|   3.8923641541750413|
| stddev|    9.378286170882717|
|    min|          0.016666668|
|    max|                 99.9|
+-------+---------------------+

root
 |-- CallNumber: string (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: string (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: string (nullable = tr

## Q14. Combien d'années distinctes trouve t-on dans ce Dataset? 

Hint: Appliquer la fonction `year()` a la colonne `IncidentDate`

In [27]:
import org.apache.spark.sql.functions.year

val numYears = dfWithEncondedDate.select(year(col("IncidentDate")).alias("year"))
                    .distinct()
                    .count()
println(s"Nombre d'années distinctes dans le Dataset : $numYears")


Nombre d'années distinctes dans le Dataset : 19


[32mimport [39m[36morg.apache.spark.sql.functions.year

[39m
[36mnumYears[39m: [32mLong[39m = [32m19L[39m

## Q15. Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?

In [28]:
import org.apache.spark.sql.functions.weekofyear

val fireDFwithWeekOfYear = dfWithEncondedDate.withColumn("IncidentDate", to_timestamp(col("IncidentDate"), "MM/dd/yyyy"))
                                  .withColumn("WeekOfYear", weekofyear(col("IncidentDate")))
import org.apache.spark.sql.functions.max

val callsByWeek = fireDFwithWeekOfYear.filter(year(col("IncidentDate")) === 2018)
                                      .groupBy("WeekOfYear")
                                      .agg(count("*").alias("TotalCalls"))
                                      .sort(desc("TotalCalls"))

callsByWeek.show(1)


+----------+----------+
|WeekOfYear|TotalCalls|
+----------+----------+
|        22|       259|
+----------+----------+
only showing top 1 row



[32mimport [39m[36morg.apache.spark.sql.functions.weekofyear

[39m
[36mfireDFwithWeekOfYear[39m: [32mDataFrame[39m = [CallNumber: string, UnitID: string ... 27 more fields]
[32mimport [39m[36morg.apache.spark.sql.functions.max

[39m
[36mcallsByWeek[39m: [32mDataset[39m[[32mRow[39m] = [WeekOfYear: int, TotalCalls: bigint]

## Q16. Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?

In [29]:
import org.apache.spark.sql.functions._

 dfWithEncondedDate.filter(year(col("IncidentDate")) === 2018)
  .select("Neighborhood","ResponseDelayedinMins")
  .orderBy(desc("ResponseDelayedinMins"))
  .show(10)
 


+--------------------+---------------------+
|        Neighborhood|ResponseDelayedinMins|
+--------------------+---------------------+
|     South of Market|             94.71667|
|Bayview Hunters P...|            92.816666|
|     South of Market|            91.666664|
|      Inner Richmond|            90.433334|
|        Russian Hill|             9.983334|
|             Mission|                 9.95|
|     South of Market|             9.933333|
|    Golden Gate Park|             9.933333|
|        Potrero Hill|             9.916667|
|           Chinatown|                  9.9|
+--------------------+---------------------+
only showing top 10 rows



[32mimport [39m[36morg.apache.spark.sql.functions._

 [39m

## Q17. Stocker les données sous format de fichiers Parquet

In [32]:
dfWithEncondedDate.write.format("parquet").save("./data/parquet_data_1")

23/04/25 19:08:51 ERROR Executor: Exception in task 0.0 in stage 48.0 (TID 1423)
java.io.IOException: (null) entry in command string: null chmod 0644 C:\Users\KH\Desktop\Master 1\BigData\bigdata\bigdata\data\parquet_data_1\_temporary\0\_temporary\attempt_20230425190851_0048_m_000000_1423\part-00000-4560d31a-9d8e-40ee-8898-bf5b767531ad-c000.snappy.parquet
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:762)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:859)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:842)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:661)
	at org.apache.hadoop.fs.ChecksumFileSystem$1.apply(ChecksumFileSystem.java:501)
	at org.apache.hadoop.fs.ChecksumFileSystem$FsOperation.run(ChecksumFileSystem.java:482)
	at org.apache.hadoop.fs.ChecksumFileSystem.setPermission(ChecksumFileSystem.java:498)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:467)
	at org.apache.hado

: 

## Q18. Rechargez  les données stockées en format Parquet

In [None]:
val newdataDF = sparkSession.read.parquet("data/parquet_data")

cmd8.sc:1: not found: value sparkSession
val newdataDF = sparkSession.read.parquet("data/parquet_data")
                ^Compilation Failed

: 

In [None]:
dfWithEncondedDate.printSchema

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Location: string (nullable =