# Big Data avec Spark : Spark SQL

*`Nom & Prenom : Yansane Mohamed `*

## Problematique

Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Datasets** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données [ici](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

**Download csv file [here](https://data.sfgov.org/api/views/nuek-vuh3/rows.csv?accessType=DOWNLOAD)**

## Travail à faire.
L'objectif de ce travail est de comprendre le Dataset SF Fire afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquats.

- Code lisible et bien indenté, 
- N'oublier pas de mettre en commentaire la justification de votre réponse sur les cellule Markdown. 


#### Note:
- Vous pouvez en groupe (au plus deux étudiants) . 

## Q1. Importez les modules Spark necessaires

In [None]:
import $ivy.`org.apache.spark::spark-sql:2.4.5` // Or use any other 2.x version here
import $ivy.`sh.almond::almond-spark:0.10.9` // Not required since almond 0.7.0 (will be automatically added when importing spark)

In [2]:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m
[36mrootLogger[39m: [32mLogger[39m = org.apache.log4j.spi.RootLogger@564011e9

## Q2. Creez la Spark Session

In [None]:
import org.apache.spark.sql._

val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    .getOrCreate()
}

## Q3. Chargez les données

Utilisez le `fireSchema` definit dans la cellule suivante pour le chargement.

In [None]:
val d = spark.read.option("header", true).csv("Fire_Department_Calls_for_Service.csv")

In [None]:
d.columns

In [None]:
import org.apache.spark.sql.types._

val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField(" ReceivedDtTm", StringType, true),
  StructField(" EntryDtTm", StringType, true),
  StructField(" DispatchDtTm", StringType, true),
  StructField(" ResponseDtTm", StringType, true),
  StructField(" OnSceneDtTm", StringType, true),
  StructField(" TransportDtTm", StringType, true),
  StructField(" HospitalDtTm", StringType, true),                                
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))

// your code here (hint spark session name is sparkSession Q2)
val data = spark.read.option("header", false).schema(fireSchema).csv("Fire_Department_Calls_for_Service.csv")
//remarque: data.show() renvoyez des valeurs null
// pour resoudre le probleme j'ai rajouté les colonnes manquantes dans fireSchema
// car le nombre de colonnes dans la donnée etaient supérieures à celles de fireSchema 

In [None]:
data.columns.length

In [None]:
data.select("CallType").show(5)

## Q4. Mettez en cache les donnees chargees

In [None]:
data.cache
//data.persist(StorageLevel.DISK_ONLY)

On utilise la mise en cache quand on effectue plusieurs actions sur le même DataFrame. 

## Q5. Supprimez tous les appels de type `Medical Incident`

Hint: appliquez la methode `.filter()` a la colonne `CallType` avec l'operateur `=!=`

In [11]:
import org.apache.spark.sql.functions.col

[32mimport [39m[36morg.apache.spark.sql.functions.col[39m

In [None]:
val filter_df = data.filter(col("CallType") =!= "Medical Incident")

In [None]:
filter_df.select("CallType").show(5) 

## Q6. Combien de types d'appels distincts ont été passés ?**  

In [6]:
import org.apache.spark.sql.functions.countDistinct

[32mimport [39m[36morg.apache.spark.sql.functions.countDistinct[39m

In [22]:
val nb_distinct = data.select("CallType").where(col("CallType").isNotNull).distinct().count()
nb_distinct

[36mnb_distinct[39m: [32mLong[39m = [32m34L[39m
[36mres21_1[39m: [32mLong[39m = [32m34L[39m

## Q7. Quels types d'appels  ont été passés au service d'incendie?

In [15]:
val type_appel = d.select("CallType").where(col("CallType").isNotNull).distinct()
type_appel.show(3 , false)

+---------------------------+
|Call Type                  |
+---------------------------+
|Elevator / Escalator Rescue|
|Marine Fire                |
|Aircraft Emergency         |
+---------------------------+
only showing top 3 rows



[36mtype_appel[39m: [32mDataset[39m[[32mRow[39m] = [Call Type: string]

## Q8. Trouvez toutes les réponses ou les délais sont supérieurs à 5 minutes

Hint:
1. Renommez la colonne `Delay` -> `ReponseDelayedinMins`
2. Retournez un nouveau DataFrame
3. Affichez tous les appels où le temps de réponse au site d'incendie a eu un retard de plus de 5 minutes

In [None]:
val rename_data = data.withColumnRenamed("Delay", "ReponseDelayedinMins")
rename_data.select("ReponseDelayedinMins").where(col("ReponseDelayedinMins") > 5).show(3)

## Q9. Convertissez les colonnes dates en timestamp

Hint:
* `CallDate` -> `IncidentDate`
* `WatchDate` -> `OnWatchDate`
* `AvailableDtTm` -> `AvailableDtTS`
exemple code pour le cas de `CallDate`:
`dataframe.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate")`

In [None]:
///importation to_timestamp
import org.apache.spark.sql.functions.to_timestamp

In [None]:
val convert_ts_data = (data
                  .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate")
                  .withColumn("WatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate")
                  .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy")).drop("AvailableDtTm"))

## Q10. Quels sont les types d'appels les plus courants?

In [None]:
data.groupBy("CallType")
.count()
.orderBy(col("count").desc)
.show(5, truncate =false)

## Q11. Quels sont les boites postales rencontrées dans les appels les plus courants?

In [None]:
val df2= data.select("CallType", "ZipCode")

 .where(col("CallType").isNotNull())

 .groupBy("CallType", "Zipcode")

 .count()

 .orderBy("count", ascending=false))

## Q12. Quels sont les quartiers de San Francisco dont les codes postaux sont `94102` et `94103`?**

In [None]:
data.select("Neighborhood", "Zipcode")
.where(col("Zipcode")=== 94102 || col("Zipcode")=== 94103)
.distinct()
.show

## Q13. Determinez le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?

In [None]:
data.select("ReponseDelayedinMins").describe().show()

## Q14. Combien d'années distinctes trouve t-on dans ce Dataset? 

Hint: Appliquer la fonction `year()` a la colonne `IncidentDate`

In [None]:
val df = data.select(year('IncidentDate')).distinct().orderBy(year('IncidentDate')).show()

## Q15. Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?

In [None]:
data.filter(year('IncidentDate') == 2018).groupBy(weekofyear('IncidentDate')).count().orderBy('count', ascending=False).show()


## Q16. Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?

In [None]:
data.select("Neighborhood", "ResponseDelayedinMins").filter(year("IncidentDate") == 2018).show(10, False)

## Q17. Stocker les données sous format de fichiers Parquet

In [None]:
df.write.format("parquet").save("/tmp/firedataService_parquet/files/")

## Q18. Rechargez  les données stockées en format Parquet

In [None]:
val newdataDF = sparkSession.read.parquet("/tmp/firedataService_parquet/files/")

In [None]:
newdataDF.printSchema