In [None]:
# Projet Final Apache Spark

In [None]:
**Nom Etudiant :*TOGO*  

**Prenom Etudiant:*EMILE NARE*  

**Classe :*MASTERE 1*  


In [None]:
## Description
Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Dataset** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données cliquer sur ce [lien](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

In [None]:
## Travail à faire.
L'objectif de ce projet est de comprendre le **SF Fire Dataset** afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquates.

- Créer un repos git (public) et partager le repos avec mon mail (limahin10@gmail.com)
- Ecrire un code lisible et bien indenté 
- N'oublier pas de mettre en commentaire la justification de vos réponses sur les cellules Markdown. 


## Note:
- Le projet est personnel, c'est-à-dire chaque notebook ne concerne qu'un seul étudiant. 
- Deadline : **Lundi 31 Janvier 2022  à 23h 59** (Aucune de dérogation ne sera acceptée)

In [None]:
### Chargement des données

In [None]:
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._ 
import spark.implicits._

In [None]:
Nous allons jeter un coup d'oeil sur la structure des données avant de définir un schéma

In [None]:
Vu que la taille de ces données est énormes, inferer le schema pour un très grande volumes de données s'avère un peu couteux. Nous allons ainsi définir un schema pour le Dataset.

In [None]:
val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))

In [None]:
val sfFireFile = "datasets/sf-fire/sf-fire-calls.csv"
val fireDF = spark
  .read
  .schema(fireSchema)
  .option("header", "true")
  .csv(sfFireFile)

In [None]:
Nous allons mettre en cache le Dataframe

In [None]:
fireDF.cache()

In [None]:
fireDF.count(

In [None]:
fireDF.printSchema(

In [None]:
fireDF.show(5)

In [None]:
Filtrage des d'appels de type "Medical Incident"

In [None]:
### Question 1
**Combien de types d'appels distincts ont été passés ?**  
Pour être sûr, il ne faut pas compter les valeurs «nulles» dans la colonne.

In [None]:
val fewFireDF = fireDF
  .select("IncidentNumber", "AvailableDtTm", "CallType") 
  .where($"CallType" =!= "Medical Incident")

fewFireDF.show(5, false)

In [None]:
# Use the .select() transformation to yank out just the 'Call Type' column
# Add the .distinct() transformation to keep only distinct rows
# Finally call the show action, to show the first 10 rows.
# The False below expands the ASCII column width to fit the full text in the output

// Reponse 1
/*

fireServiceCallsDF.select('CallType').distinct().show(35, False)
*/### Question 2

In [None]:
### Question 3

**Trouver toutes les réponses ou les délais sont supérieurs à 5 minutes?**

*Indication
1. Renommer la colonne Delay -> ReponseDelayedinMins
2. Retourner un nouveau DataFrame
3. Afficher tous les appels où le temps de réponse à un site d'incendie a eu lieu après un retard de plus de 5 minutes

In [None]:
### Question 2

In [None]:
**Quels types d'appels différents ont été passés au service d'incendie?**

In [None]:
#Note that .count() is actually a transformation here

// Reponse 2
/*
display(fireServiceCallsDF.select('CallType').groupBy('CallType').count().orderBy("count", ascending=False))
*/

In [None]:
### Question 3

**Trouver toutes les réponses ou les délais sont supérieurs à 5 minutes?**

*Indication
1. Renommer la colonne Delay -> ReponseDelayedinMins
2. Retourner un nouveau DataFrame
3. Afficher tous les appels où le temps de réponse à un site d'incendie a eu lieu après un retard de plus de 5 minutes

In [None]:
// Reponse 3
val newFireDF = fireDF.withColumnRenamed("Delay", "ResponseDelayedinMins")
/*
Completer le code
*/

In [None]:
### Transformations des dates  
Maintenant nous allons d'abord:  
1. Transformer les dates de type String en Spark Timestamp afin que nous puissions effectuer des requêtes basées sur la date plus tard    
2. Retourner le Dataframe transformée  
3. Mettre en cache le nouveau DataFrame  

In [None]:
val fireTSDF = newFireDF
  .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate") 
  .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate") 
  .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm")

fireTSDF.cache()

import org.apache.spark.sql.functions._
import spark.sqlContext.implicits._

val dfFata=data2.toDF("input_timestamp")

dfDate.withColumn("datetype_timestamp",
to_timestamp(col("input_timestamp"),"MM-dd-yyyy HH mm ss SSS"))
    .show(false)
  

//String to timestamps
val data = Seq(("2021-07-01 12:01:19.000"),
    ("2021-06-24 12:01:19.000"),
    ("2021-14-16 16:44:55.406"),
    ("2021-14-16 16:50:59.406"))
val df=data.toDF("07-01-2021")
df.withColumn("MM/dd/yyyy_timestamp",
        to_timestamp(col("CallDate")))
  .printSchema()

In [None]:
### Question 4
**Quels sont les types d'appels les plus courants?**

In [None]:
### Question 5-a
**Quels sont boites postaux rencontrés dans les appels les plus courants?**

In [None]:
//Reponse 5-a
/*
Ecrire ici votre code
*/

In [None]:
### Question 5-a
**Quels sont les quartiers de San Francisco dont les codes postaux sont 94102 et 94103?**

In [None]:
//Reponse 5-b
/*
%sql 
SELECT NeighborhoodDistrict, count(NeighborhoodDistrict) AS Neighborhood_Count 
FROM firecalls 
between number 94102 AND 94103
GROUP BY NeighborhoodDistrict
ORDER BY Neighborhood_Count DESC 
LIMIT 15

*/

In [None]:
### Question 6
**Determiner le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?**

In [None]:
//Reponse 6
/*
Ecrire ici votre code
*/

In [None]:
### Question 7-a
**Combien d'années distinctes trouve t-on dans ce Dataset?**  
Dans ce dataset nous avons des données comprises entre 2000-2018. Vous pouvez utilisez la fonction Spark `year()` pour les dates en Timestamp

In [None]:
//Reponse 7-a
/*
fireServiceCallsDF.printSchema()
from pyspark.sql.functions import *
from_pattern1 = 'MM/dd/yyyy'
to_pattern1 = 'yyyy-MM-dd'

from_pattern2 = 'MM/dd/yyyy hh:mm:ss aa'
to_pattern2 = 'MM/dd/yyyy hh:mm:ss aa'


fireServiceCallsTsDF = fireServiceCallsDF \
  .withColumn('CallDateTS', unix_timestamp(fireServiceCallsDF['CallDate'], from_pattern1).cast("timestamp")) \
  .drop('CallDate') \
  .withColumn('WatchDateTS', unix_timestamp(fireServiceCallsDF['WatchDate'], from_pattern1).cast("timestamp")) \
  .drop('WatchDate') \
  .withColumn('ReceivedDtTmTS', unix_timestamp(fireServiceCallsDF['ReceivedDtTm'], from_pattern2).cast("timestamp")) \
  .drop('ReceivedDtTm') \
  .withColumn('EntryDtTmTS', unix_timestamp(fireServiceCallsDF['EntryDtTm'], from_pattern2).cast("timestamp")) \
  .drop('EntryDtTm') \
  .withColumn('DispatchDtTmTS', unix_timestamp(fireServiceCallsDF['DispatchDtTm'], from_pattern2).cast("timestamp")) \
  .drop('DispatchDtTm') \
  .withColumn('ResponseDtTmTS', unix_timestamp(fireServiceCallsDF['ResponseDtTm'], from_pattern2).cast("timestamp")) \
  .drop('ResponseDtTm') \
  .withColumn('OnSceneDtTmTS', unix_timestamp(fireServiceCallsDF['OnSceneDtTm'], from_pattern2).cast("timestamp")) \
  .drop('OnSceneDtTm') \
  .withColumn('TransportDtTmTS', unix_timestamp(fireServiceCallsDF['TransportDtTm'], from_pattern2).cast("timestamp")) \
  .drop('TransportDtTm') \
  .withColumn('HospitalDtTmTS', unix_timestamp(fireServiceCallsDF['HospitalDtTm'], from_pattern2).cast("timestamp")) \
  .drop('HospitalDtTm') \
  .withColumn('AvailableDtTmTS', unix_timestamp(fireServiceCallsDF['AvailableDtTm'], from_pattern2).cast("timestamp")) \
  .drop('AvailableDtTm') 
*/

In [None]:
### Question 7-b
**Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?**

In [None]:
//Reponse 7-b
/*
fireServiceCallsDF.printSchema()
fireServiceCallsTsDF \
  .select(year('CallDateTS')).distinct() \
  .orderBy('week(CallDateTS)').show()

*/

In [None]:
### Question 8
**Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?**

In [None]:
//Reponse 8
/*
%sql 
SELECT NeighborhoodDistrict, count(NeighborhoodDistrict) AS Neighborhood_Count 
FROM firecalls 
WHERE year(CallDateTS) == 2018
GROUP BY NeighborhoodDistrict
ORDER BY Neighborhood_Count DESC 
LIMIT 18
*/

In [None]:
### Question 9

**Comment stocker les données du Dataframe sous format de fichiers Parquet?**

In [None]:
//Reponse 9
/*
%fs ls /tmp/fireServiceParquet/
fireServiceCallsTsDF.write.format('parquet').save('/tmp/fireServiceParquet/')
*/

In [None]:
### Question 10
**Comment relire les données stockée en format Parquet?**

In [None]:
//Reponse 10
/*
tempDF = spark.read.parquet('/tmp/fireServiceParquet/')

1) import pyarrow.parquet as pq

path = 'parquet/part-r-00000-1e638be4-e31f-498a-a359-47d017a0059c.gz.parquet'
table = pq.read_table(path)
df = table.to_pandas()

2)import pyarrow.parquet as pq

dataset = pq.ParquetDataset('parquet/')
table = dataset.read()
df = table.to_pandas()

*/