# Big Data avec Spark : Spark SQL

*`Nom & Prenom : Fatou CISSE

## Problematique

Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Datasets** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données [ici](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

**Download csv file [here](https://data.sfgov.org/api/views/nuek-vuh3/rows.csv?accessType=DOWNLOAD)**

## Travail à faire.
L'objectif de ce travail est de comprendre le Dataset SF Fire afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquats.

- Code lisible et bien indenté, 
- N'oublier pas de mettre en commentaire la justification de votre réponse sur les cellule Markdown. 


#### Note:
- Vous pouvez en groupe (au plus deux étudiants) . 

## Q1. Importez les modules Spark necessaires

In [None]:
import $ivy.`org.apache.spark::spark-sql:2.4.5` // Or use any other 2.x version here
import $ivy.`sh.almond::almond-spark:0.10.9` // Not required since almond 0.7.0 (will be automatically added when importing spark)

Intitializing Scala interpreter ...

In [None]:
import org.apache.log4j.{Level, Logger}

val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)

Intitializing Scala interpreter ...

## Q2. Creez la Spark Session

In [None]:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

Intitializing Scala interpreter ...

In [None]:
import org.apache.spark.sql._

val spark = {
  SparkSession.builder()
    .master("local")
    .appName("BD-FS FIRE")
    .config("spark.some.option.config", "config-value")
    .getOrCreate()
}

Intitializing Scala interpreter ...

## Q3. Chargez les données

In [None]:
val path = "data/departement_call/sf-fire-calls.csv"
val df = spark.read
        .option("header","true")
        .schema(fireSchema)
        .csv(path)

Intitializing Scala interpreter ...

Utilisez le `fireSchema` definit dans la cellule suivante pour le chargement.

In [None]:
import org.apache.spark.sql.types._

val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))

// your code here (hint spark session name is sparkSession Q2)
// val data = 


Intitializing Scala interpreter ...

## Q4. Mettez en cache les donnees chargees

In [None]:
//On utilise la mise en cache quand on effectue plusieurs actions sur le même DataFrame. 
df.cache
df.persist()

Intitializing Scala interpreter ...

## Q5. Supprimez tous les appels de type `Medical Incident`

Hint: appliquez la methode `.filter()` a la colonne `CallType` avec l'operateur `=!=`

In [None]:
//Avant la suppression des lignes contenant tous les appels de type "Medical Incident"
df.select("CallType").show()

Intitializing Scala interpreter ...

In [None]:
//La méthode col() permet de sélectionner la colonne à filtrer et l'opérateur =!= est utilisé pour filtrer les valeurs dont CallType est egale à "Medical Incident"
import org.apache.spark.sql.functions.col

val df_filtered = df.filter(col("CallType") =!= "Medical Incident")

Intitializing Scala interpreter ...

## Q6. Combien de types d'appels distincts ont été passés ?**  

In [None]:
//Pour obtenir les valeurs distinctes dans la colonne CaalType on utilise la méthode distinct() 
//et la méthode count() est utilisée pour compter le nombre de valeurs distinctes. 
//Ainsi on obtient 30 valeurs distinctes
import org.apache.spark.sql.functions.countDistinct
df_filtered.agg(countDistinct("CallType") as "Nombre de type d'appel")

Intitializing Scala interpreter ...

## Q7. Quels types d'appels  ont été passés au service d'incendie?

In [None]:
df_filtered.select("CallType").distinct.show(30, truncate=false)

Intitializing Scala interpreter ...

## Q8. Trouvez toutes les réponses ou les délais sont supérieurs à 5 minutes

Hint:
1. Renommez la colonne `Delay` -> `ReponseDelayedinMins`
2. Retournez un nouveau DataFrame
3. Affichez tous les appels où le temps de réponse au site d'incendie a eu un retard de plus de 5 minutes

In [None]:
//Pour renommez une colonne on utilise la fonction withColumnRenamed()
val newdf = df_filtered.withColumnRenamed("Delay", "ResponseDelayedinMins")

Intitializing Scala interpreter ...

In [None]:
//Verification
newdf.printSchema()

Intitializing Scala interpreter ...

In [None]:
//Affichons tous les appels où le temps de réponse au site d'incendie a eu un retard de plus de 5 minutes
newdf.filter(col("ResponseDelayedinMins")>5)

Intitializing Scala interpreter ...

## Q9. Convertissez les colonnes dates en timestamp

Hint:
* `CallDate` -> `IncidentDate`
* `WatchDate` -> `OnWatchDate`
* `AvailableDtTm` -> `AvailableDtTS`
exemple code pour le cas de `CallDate`:
`dataframe.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate")`

In [None]:
import org.apache.spark.sql.functions.{col, to_timestamp}
newdf.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate")
newdf.withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate")
newdf.withColumn("AvailableDtTs", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy")).drop("AvailableDtTm")

Intitializing Scala interpreter ...

In [None]:
//Verification
newdf.printSchema()

Intitializing Scala interpreter ...

## Q10. Quels sont les types d'appels les plus courants?

In [None]:
//La méthode groupBy() permet de regrouper les lignes de la base de données en fonction des valeurs de la colonne 
//"CallType". La méthode count() est ensuite utilisée pour compter le nombre de lignes pour chaque groupe et 
//la méthode orderBy() est utilisée pour trier les résultats par ordre décroissant de nombre d'appels.
import org.apache.spark.sql.functions.{col, desc}
val topCalls = newdf.groupBy("CallType").count().orderBy(desc("count")).show(5)

Intitializing Scala interpreter ...

## Q11. Quels sont les boites postales rencontrées dans les appels les plus courants?

In [None]:
newdf
    .filter(col("CallType") === "Medical Incident")
    .select("Zipcode")
    .show()

Intitializing Scala interpreter ...

## Q12. Quels sont les quartiers de San Francisco dont les codes postaux sont `94102` et `94103`?**

In [None]:
newdf
    .select("Neighborhood","Zipcode")
    .where col("Zipcode") === "94102" || 
           col("Zipcode") === "94103"
    .distinct()
    .show()

Intitializing Scala interpreter ...

## Q13. Determinez le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?

In [None]:
newdf.select("Delay").describe.show()

Intitializing Scala interpreter ...

## Q14. Combien d'années distinctes trouve t-on dans ce Dataset? 

Hint: Appliquer la fonction `year()` a la colonne `IncidentDate`

In [None]:
//Pour déterminer le nombre d'années distinctes dans ce Dataset, nous pouvons sélectionner la colonne "IncidentDate", 
//extraire l'année de chaque date avec la fonction year() et utiliser la fonction distinct() pour obtenir les années 
//uniques. Ensuite, nous pouvons utiliser la méthode count() pour connaître le nombre d'années distinctes.

import org.apache.spark.sql.functions._
import spark.implicits._
newdf3.select(year(col("IncidentDate")).distinct()


Intitializing Scala interpreter ...

## Q15. Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?

In [None]:
newdf
    .filter(year(col("IncidentDate")) === 2018 && col("calllType") === "Structure Fire")
    .groupBy(weekofyear(col("IncidentDate")))
    .count()
    .orderBy(col("count").desc)
    .show(1)

Intitializing Scala interpreter ...

Le résultat ci dessus montre que la 1er semaine de l’année 2018 a eu le plus d'appels d'incendie

## Q16. Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?

In [None]:
// Filtrer les appels d'incendie de l'année 2018
val calls2018 = newdf2.filter(year($"IncidentDate") === 2018)

// Grouper par quartier et calculer la moyenne du temps de réponse
val avgResponseByNeighborhood = calls2018.groupBy($"Neighborhood").agg(avg($"ResponseDelayedinMins").alias("AvgResponseTime"))

// Trier les résultats par ordre décroissant du temps de réponse
val worstResponseNeighborhoods = avgResponseByNeighborhood.sort($"AvgResponseTime".desc)

// Afficher les quartiers avec le pire temps de réponse
worstResponseNeighborhoods.show()

Intitializing Scala interpreter ...

## Q17. Stocker les données sous format de fichiers Parquet

In [None]:
newdf2.write.format("parquet").save("tmp/parket/files/")

Intitializing Scala interpreter ...

## Q18. Rechargez  les données stockées en format Parquet

In [None]:
val newdataDF = spark.read.format("parquet").load("tmp/parket/files/")
              

Intitializing Scala interpreter ...

In [None]:
newdataDF.printSchema

Intitializing Scala interpreter ...