# Big Data avec Spark : Projet Final

*`Nom : Mouhamadane MBOUP`*

## Problematique

Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Datasets** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données [ici](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

## Travail à faire.
L'objectif de ce projet est de comprendre le Dataset SF Fire afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquats.

- Code lisible et bien indenté, 
- N'oublier pas de mettre en commentaire la justification de votre réponse sur les cellule Markdown. 


#### Note:
- Le projet est personnel (un étudiant) . 
- Deadline : **Jeudi 22 janvier 2021**

## Q1. Importez les modules Spark necessaires

In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.5`
import $ivy.`sh.almond::almond-spark:0.10.9`
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql._

[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36m$ivy.$                               
[39m
[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36morg.apache.spark.sql.DataFrame
[39m
[32mimport [39m[36morg.apache.spark.sql.SparkSession
[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m
[32mimport [39m[36morg.apache.spark.sql._[39m

In [2]:
import org.apache.log4j.{Level, Logger}

val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}

[39m
[36mrootLogger[39m: [32mLogger[39m = org.apache.log4j.spi.RootLogger@5b83ec57

## Q2. Creez la Spark Session

In [3]:
val sparkSession = SparkSession.builder
  .master("local")
  .appName("my-spark-app")
  .config("spark.some.config.option", "config-value")
  .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/01/17 20:12:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[36msparkSession[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@d7f1834

## Q3. Chargez les données

Utilisez le `fireSchema` definit dans la cellule suivante pour le chargement.

In [4]:
import org.apache.spark.sql.types._

val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))

val data = sparkSession.read
        .format("csv")
        .option("header", "true")
        .option("delimiter", ",")
        .schema(fireSchema)
        .load("data/sf-fire-calls.csv")


[32mimport [39m[36morg.apache.spark.sql.types._

[39m
[36mfireSchema[39m: [32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"CallNumber"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"UnitID"[39m, StringType, true, {}),
  [33mStructField[39m([32m"IncidentNumber"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"CallType"[39m, StringType, true, {}),
  [33mStructField[39m([32m"CallDate"[39m, StringType, true, {}),
  [33mStructField[39m([32m"WatchDate"[39m, StringType, true, {}),
  [33mStructField[39m([32m"CallFinalDisposition"[39m, StringType, true, {}),
  [33mStructField[39m([32m"AvailableDtTm"[39m, StringType, true, {}),
  [33mStructField[39m([32m"Address"[39m, StringType, true, {}),
  [33mStructField[39m([32m"City"[39m, StringType, true, {}),
  [33mStructField[39m([32m"Zipcode"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"Battalion"[39m, StringType, true, {}),
  [33mStructField[39m(

## Q4. Mettez en cache les donnees chargees

Hint: `dataframe.cache().count()`

In [47]:
import org.apache.spark.storage.StorageLevel
data.cache
data.persist(StorageLevel.DISK_ONLY)

21/01/16 14:16:26 WARN CacheManager: Asked to cache already cached data.
21/01/16 14:16:26 WARN CacheManager: Asked to cache already cached data.


[32mimport [39m[36morg.apache.spark.storage.StorageLevel
[39m
[36mres46_1[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 26 more fields]
[36mres46_2[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 26 more fields]

On utilise la mise en cache quand on effectue plusieurs actions sur le même DataFrame. 

## Q5. Supprimez tous les appels de type `Medical Incident`

Hint: appliquez la methode `.filter()` a la colonne `CallType` avec l'operateur `=!=`

In [48]:
val data_new = data.filter(data("CallType") =!= "Medical Incident")
data_new.select("CallType", "Delay").show(2)

+--------------+-----+
|      CallType|Delay|
+--------------+-----+
|Structure Fire| 2.95|
|  Vehicle Fire|  1.5|
+--------------+-----+
only showing top 2 rows



[36mdata_new[39m: [32mDataset[39m[[32mRow[39m] = [CallNumber: int, UnitID: string ... 26 more fields]

J'ai créé un nouveau dataset **data_new** ne contenant aucun appel de type **Medical Incident**.

## Q6. Combien de types d'appels distincts ont été passés ?**  

In [117]:
import org.apache.spark.sql.functions.countDistinct
data.agg(countDistinct("CallType")).show()

+------------------------+
|count(DISTINCT CallType)|
+------------------------+
|                      30|
+------------------------+



[32mimport [39m[36morg.apache.spark.sql.functions.countDistinct
[39m

Le jeu de données comporte **30 types d'appels distincts**.

## Q7. Quels types d'appels  ont été passés au service d'incendie?

In [121]:
data.select("CallType").distinct().show(30,truncate = false)

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Assist Polic

## Q8. Trouvez toutes les réponses ou les délais sont supérieurs à 5 minutes

Hint:
1. Renommez la colonne `Delay` -> `ReponseDelayedinMins`
2. Retournez un nouveau DataFrame
3. Affichez tous les appels où le temps de réponse au site d'incendie a eu un retard de plus de 5 minutes

In [9]:
import org.apache.spark.sql.functions.col
val data2 = data.withColumnRenamed("Delay","ReponseDelayedinMins")
val callMostOf5min = data2.filter(col("ReponseDelayedinMins") > 5)
callMostOf5min.show(2)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+------------------+--------------------+-------------+--------------------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|      Neighborhood|            Location|        RowID|ReponseDelayedinMins|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+--------

[32mimport [39m[36morg.apache.spark.sql.functions.col
[39m
[36mdata2[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 26 more fields]
[36mcallMostOf5min[39m: [32mDataset[39m[[32mRow[39m] = [CallNumber: int, UnitID: string ... 26 more fields]

## Q9. Convertissez les colonnes dates en timestamp

Hint:
* `CallDate` -> `IncidentDate`
* `WatchDate` -> `OnWatchDate`
* `AvailableDtTm` -> `AvailableDtTS`
exemple code pour le cas de `CallDate`:
`dataframe.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate")`

In [113]:
val pattern = "MM/dd/yyyy"
val pattern1 = "MM/dd/yyyy hh:mm:ss aa"
val df1 = data
       .withColumn("IncidentDate", to_timestamp(col("CallDate"), pattern))
       .drop("CallDate")
       .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), pattern))
       .drop("WatchDate")
       .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), pattern1))
       .drop("AvailableDtTm")
df1.select("IncidentDate","OnWatchDate","AvailableDtTS").show(2)

+-------------------+-------------------+-------------------+
|       IncidentDate|        OnWatchDate|      AvailableDtTS|
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
+-------------------+-------------------+-------------------+
only showing top 2 rows



[36mpattern[39m: [32mString[39m = [32m"MM/dd/yyyy"[39m
[36mpattern1[39m: [32mString[39m = [32m"MM/dd/yyyy hh:mm:ss aa"[39m
[36mdf1[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 26 more fields]

## Q10. Quels sont les types d'appels les plus courants?

In [6]:
data
    .groupBy("CallType")
    .count()
    .orderBy($"count".desc)
    .show(5)

+--------------------+------+
|            CallType| count|
+--------------------+------+
|    Medical Incident|113794|
|      Structure Fire| 23319|
|              Alarms| 19406|
|   Traffic Collision|  7013|
|Citizen Assist / ...|  2524|
+--------------------+------+
only showing top 5 rows



[32mimport [39m[36msparkSession.implicits._
[39m

On remarque que les appels de type **Medical Incident** sont les plus fréquents suivis des **Structure Fire**

## Q11. Quels sont les boites postales rencontrées dans les appels les plus courants?

In [41]:
data
    .groupBy("CallType","Box")
    .count()
    .orderBy($"count".desc)
    .show(5)

+----------------+----+-----+
|        CallType| Box|count|
+----------------+----+-----+
|Medical Incident|2251| 1544|
|Medical Incident|1453| 1491|
|Medical Incident|1461| 1205|
|Medical Incident|5236| 1015|
|Medical Incident|1545|  933|
+----------------+----+-----+
only showing top 5 rows



## Q12. Quels sont les quartiers de San Francisco dont les codes postaux sont `94102` et `94103`?**

In [8]:
data
    .select("City")
    .distinct()
    .show(50)

+---------------+
|           City|
+---------------+
|     FORT MASON|
|     Fort Mason|
|            OAK|
|             DC|
|             TI|
|TREASURE ISLAND|
|           null|
|  San Francisco|
|             HP|
|             YB|
|             BN|
|       Brisbane|
|      Daly City|
|    Yerba Buena|
|Treasure Island|
|             SF|
|       PRESIDIO|
|       Presidio|
|             FM|
|            SFO|
|  Hunters Point|
|             PR|
|  SAN FRANCISCO|
|  Treasure Isla|
+---------------+



En explorant la variable **City** nous avons remarqué des erreurs relatives de la saisie des données. La colonne **City** en plus des differentes écritures de la ville **San Francisco** (e.g **SAN FRANCISCO**, **SF**, **San Francisco**) contient aussi des noms d'autres villes et des valeurs nulles.

Il serait alors nécessaire d'uniformiser cette colonne avant de répondre à la question.

In [114]:
import org.apache.spark.sql.functions.col
import sparkSession.implicits._
val df2 = df1
            .withColumn("City_New", when(data("City") === "San Francisco","SF")
            .when(data("City") === "SAN FRANCISCO","SF"))
            .drop(data("City"))
df2.select("City_New").distinct().show() 

+--------+
|City_New|
+--------+
|    null|
|      SF|
+--------+



[32mimport [39m[36morg.apache.spark.sql.functions.col
[39m
[32mimport [39m[36msparkSession.implicits._
[39m
[36mdf2[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 26 more fields]

In [23]:
print(data.select("City").where($"City"=!="SF" && $"City"=!="SAN FRANCISCO"&& $"City"=!="San Francisco").count())

1602

In [64]:
df1
    .select("neighborhood")
    .where($"City_New" === "SF" && ($"Zipcode" === "94102" || $"Zipcode" ==="94103"))
    .distinct()
    .show()

+--------------------+
|        neighborhood|
+--------------------+
|    Western Addition|
|         Mission Bay|
|        Hayes Valley|
|Financial Distric...|
|            Nob Hill|
|             Mission|
|          Tenderloin|
|        Potrero Hill|
| Castro/Upper Market|
|     South of Market|
+--------------------+



## Q13. Determinez le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?

In [6]:
data.select("Delay").describe().show()

+-------+-----------------+
|summary|            Delay|
+-------+-----------------+
|  count|           175296|
|   mean|3.892364154521585|
| stddev|9.378286226254257|
|    min|      0.016666668|
|    max|          1844.55|
+-------+-----------------+



In [61]:
val result = Seq(
    (2281,3.89,0.02,1844.55)
).toDF("Nombre d'Appels", "Moyenne", "Minimun", "Maximum")
result.show()

+---------------+-------+-------+-------+
|Nombre d'Appels|Moyenne|Minimun|Maximum|
+---------------+-------+-------+-------+
|           2281|   3.89|   0.02|1844.55|
+---------------+-------+-------+-------+



[36mresult[39m: [32mDataFrame[39m = [Nombre d'Appels: int, Moyenne: double ... 2 more fields]

## Q14. Combien d'années distinctes trouve t-on dans ce Dataset? 

Hint: Appliquer la fonction `year()` a la colonne `IncidentDate`

In [67]:
val nbreYear = df1
    .select(year($"IncidentDate"))
    .orderBy(year($"IncidentDate"))
    .distinct()
    .count()
println("Nous avons "+nbreYear+" années dans le jeu de données.")

Nous avons 19 années dans le jeu de données.


[36mnbreYear[39m: [32mLong[39m = [32m19L[39m

## Q15. Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?

In [75]:
df1
        .select(weekofyear($"IncidentDate"), $"CallType")
        .where(year($"IncidentDate") === 2018 && $"CallType" === "Structure Fire")
        .groupBy("weekofyear(IncidentDate)")
        .count()
        .orderBy($"count".desc)
        .show(1)


+------------------------+-----+
|weekofyear(IncidentDate)|count|
+------------------------+-----+
|                      25|   31|
+------------------------+-----+
only showing top 1 row



## Q16. Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?

In [115]:
df2
    .select("Neighborhood", "Delay")
    .where(year($"IncidentDate") === 2018 && $"City_New" === "SF")
    .groupBy($"Neighborhood")
    .sum("Delay")
    .orderBy($"sum(Delay)")
    .show()


+--------------------+------------------+
|        Neighborhood|        sum(Delay)|
+--------------------+------------------+
|                None|12.633333206176758|
|        Lincoln Park|29.800000071525574|
|            Seacliff|63.916667103767395|
|        McLaren Park|  66.4166669845581|
|     Treasure Island| 90.56666600704193|
|            Presidio|112.46666532754898|
|           Glen Park|157.33333325386047|
|          Twin Peaks| 227.4833328127861|
|    Golden Gate Park| 242.0666659027338|
|             Portola| 277.1000000536442|
|          Noe Valley| 278.5500009059906|
|           Japantown| 314.6833351254463|
|    Presidio Heights| 326.1833352446556|
|   Lone Mountain/USF| 350.7000018954277|
|   Visitacion Valley| 365.7999996840954|
|       Outer Mission|497.55000269412994|
|Oceanview/Merced/...| 543.0333331376314|
|      Inner Richmond|  551.683336943388|
|        Russian Hill| 556.6333324313164|
|      Haight Ashbury| 597.3000011146069|
+--------------------+------------

## Q17. Stocker les données sous format de fichiers Parquet

In [122]:
df2.write.format("parquet").save("/tmp/firedataService_parquet/files/")

## Q18. Rechargez  les données stockées en format Parquet

In [123]:
val newdataDF = sparkSession.read.parquet("/tmp/firedataService_parquet/files/")

[36mnewdataDF[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 26 more fields]

In [124]:
newdataDF.show(2)

+----------+------+--------------+----------------+--------------------+--------------------+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+-----+-------------------+-------------------+-------------------+--------+
|CallNumber|UnitID|IncidentNumber|        CallType|CallFinalDisposition|             Address|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|Delay|       IncidentDate|        OnWatchDate|      AvailableDtTS|City_New|
+----------+------+--------------+----------------+--------------------+--------------------+-------+---------+-----------+----+----------------+--------+-------------+-------+----