# Big Data avec Spark : Spark SQL

*`Nom & Prenom : `*

## Problematique

Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Datasets** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données [ici](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

**Download csv file [here](https://data.sfgov.org/api/views/nuek-vuh3/rows.csv?accessType=DOWNLOAD)**

## Travail à faire.
L'objectif de ce travail est de comprendre le Dataset SF Fire afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquats.

- Code lisible et bien indenté, 
- N'oublier pas de mettre en commentaire la justification de votre réponse sur les cellule Markdown. 


#### Note:
- Vous pouvez en groupe (au plus deux étudiants) . 

## Q1. Importez les modules Spark necessaires

In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.5` // Or use any other 2.x version here
import $ivy.`sh.almond::almond-spark:0.10.9` // Not required since almond 0.7.0 (will be automatically added when importing spark)

[32mimport [39m[36m$ivy.$                                   // Or use any other 2.x version here
[39m
[32mimport [39m[36m$ivy.$                                // Not required since almond 0.7.0 (will be automatically added when importing spark)[39m

In [2]:
import org.apache.log4j.{Level, Logger}

val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}

[39m
[36mrootLogger[39m: [32mLogger[39m = org.apache.log4j.spi.RootLogger@1616ef80

## Q2. Creez la Spark Session

In [3]:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m

In [4]:
import org.apache.spark.sql._

val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    .getOrCreate()
}

Loading spark-stubs
Getting spark JARs
Creating SparkSession


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[32mimport [39m[36morg.apache.spark.sql._

[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@26f9a2e7

## Q3. Chargez les données

In [5]:
val path = "data/departement_call/sf-fire-calls.csv"
val df = spark
        .read
        .option("header","true")
        .csv(path)

[36mpath[39m: [32mString[39m = [32m"data/departement_call/sf-fire-calls.csv"[39m
[36mdf[39m: [32mDataFrame[39m = [CallNumber: string, UnitID: string ... 26 more fields]

Utilisez le `fireSchema` definit dans la cellule suivante pour le chargement.

In [6]:
import org.apache.spark.sql.types._

val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))

// your code here (hint spark session name is sparkSession Q2)
// val data = 


[32mimport [39m[36morg.apache.spark.sql.types._

[39m
[36mfireSchema[39m: [32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"CallNumber"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"UnitID"[39m, StringType, true, {}),
  [33mStructField[39m([32m"IncidentNumber"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"CallType"[39m, StringType, true, {}),
  [33mStructField[39m([32m"CallDate"[39m, StringType, true, {}),
  [33mStructField[39m([32m"WatchDate"[39m, StringType, true, {}),
  [33mStructField[39m([32m"CallFinalDisposition"[39m, StringType, true, {}),
  [33mStructField[39m([32m"AvailableDtTm"[39m, StringType, true, {}),
  [33mStructField[39m([32m"Address"[39m, StringType, true, {}),
  [33mStructField[39m([32m"City"[39m, StringType, true, {}),
  [33mStructField[39m([32m"Zipcode"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"Battalion"[39m, StringType, true, {}),
  [33mStructField[39m(

## Q4. Mettez en cache les donnees chargees

In [7]:
//On utilise la mise en cache quand on effectue plusieurs actions sur le même DataFrame. 
df.cache
df.persist()

23/04/11 11:51:33 WARN CacheManager: Asked to cache already cached data.


[36mres6_0[39m: [32mDataFrame[39m = [CallNumber: string, UnitID: string ... 26 more fields]
[36mres6_1[39m: [32mDataFrame[39m = [CallNumber: string, UnitID: string ... 26 more fields]

## Q5. Supprimez tous les appels de type `Medical Incident`

Hint: appliquez la methode `.filter()` a la colonne `CallType` avec l'operateur `=!=`

In [9]:
//Avant la suppression des lignes contenant tous les appels de type "Medical Incident"
df.select("CallType").show()

+----------------+
|        CallType|
+----------------+
|  Structure Fire|
|Medical Incident|
|Medical Incident|
|    Vehicle Fire|
|          Alarms|
|  Structure Fire|
|          Alarms|
|          Alarms|
|Medical Incident|
|Medical Incident|
|Medical Incident|
|  Structure Fire|
|Medical Incident|
|Medical Incident|
|  Structure Fire|
|  Structure Fire|
|  Structure Fire|
|Medical Incident|
|Medical Incident|
|Medical Incident|
+----------------+
only showing top 20 rows



In [10]:
//La méthode col() permet de sélectionner la colonne à filtrer et l'opérateur =!= est utilisé pour filtrer les valeurs dont CallType est egale à "Medical Incident"
import org.apache.spark.sql.functions.col

val df_filtered = df.filter(col("CallType") =!= "Medical Incident")
df_filtered.select("CallType").show()

+--------------------+
|            CallType|
+--------------------+
|      Structure Fire|
|        Vehicle Fire|
|              Alarms|
|      Structure Fire|
|              Alarms|
|              Alarms|
|      Structure Fire|
|      Structure Fire|
|      Structure Fire|
|      Structure Fire|
|Odor (Strange / U...|
|              Alarms|
|      Structure Fire|
|      Structure Fire|
|Odor (Strange / U...|
|Odor (Strange / U...|
|      Structure Fire|
|              Alarms|
|Smoke Investigati...|
|      Structure Fire|
+--------------------+
only showing top 20 rows



[32mimport [39m[36morg.apache.spark.sql.functions.col

[39m
[36mdf_filtered[39m: [32mDataset[39m[[32mRow[39m] = [CallNumber: string, UnitID: string ... 26 more fields]

## Q6. Combien de types d'appels distincts ont été passés ?**  

In [11]:
//Pour obtenir les valeurs distinctes dans la colonne CaalType on utilise la méthode distinct() 
//et la méthode count() est utilisée pour compter le nombre de valeurs distinctes. 
//Ainsi on obtient 30 valeurs distinctes
import org.apache.spark.sql.functions.countDistinct
val numDistinctCallTypes = df.select(col("CallType")).distinct().count()

[32mimport [39m[36morg.apache.spark.sql.functions.countDistinct
[39m
[36mnumDistinctCallTypes[39m: [32mLong[39m = [32m30L[39m

## Q7. Quels types d'appels  ont été passés au service d'incendie?

In [12]:
val fireCalls = df.select(col("CallType")).distinct().filter(col("CallType").contains("Fire")).show()

+--------------+
|      CallType|
+--------------+
|   Marine Fire|
|  Vehicle Fire|
|  Outside Fire|
|Structure Fire|
+--------------+



## Q8. Trouvez toutes les réponses ou les délais sont supérieurs à 5 minutes

Hint:
1. Renommez la colonne `Delay` -> `ReponseDelayedinMins`
2. Retournez un nouveau DataFrame
3. Affichez tous les appels où le temps de réponse au site d'incendie a eu un retard de plus de 5 minutes

In [13]:
//Pour renommez une colonne on utilise la fonction withColumnRenamed()
val newdf = df.withColumnRenamed("Delay", "ResponseDelayedinMins")

[36mnewdf[39m: [32mDataFrame[39m = [CallNumber: string, UnitID: string ... 26 more fields]

In [14]:
//Verification
newdf.printSchema()

root
 |-- CallNumber: string (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: string (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: string (nullable = true)
 |-- ALSUnit: string (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: string (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: string (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- Sup

In [20]:
//Affichons tous les appels où le temps de réponse au site d'incendie a eu un retard de plus de 5 minutes
//Pour ce faire je vais utiliser la methode createOrReplaceTempView() pour creer une table et ainsi je pourai faire des requettes sql

df.createOrReplaceTempView("FireCaal")
val dataframeSQL = spark.sql("select delay from FireCaal where delay > 5 ")
dataframeSQL.show()

+---------+
|    delay|
+---------+
|     6.25|
|     7.25|
|11.916667|
| 8.633333|
| 95.28333|
|      7.6|
| 6.133333|
|6.9166665|
|     6.35|
| 7.983333|
|    13.55|
|13.583333|
|6.5333333|
|     8.15|
|      6.6|
|7.0666666|
|     13.4|
| 8.716666|
|7.9333334|
|6.4666667|
+---------+
only showing top 20 rows



[36mdataframeSQL[39m: [32mDataFrame[39m = [delay: string]

## Q9. Convertissez les colonnes dates en timestamp

Hint:
* `CallDate` -> `IncidentDate`
* `WatchDate` -> `OnWatchDate`
* `AvailableDtTm` -> `AvailableDtTS`
exemple code pour le cas de `CallDate`:
`dataframe.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate")`

In [15]:
import org.apache.spark.sql.functions.{col, to_timestamp}
val newdf2= newdf.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate")

[32mimport [39m[36morg.apache.spark.sql.functions.{col, to_timestamp}
[39m
[36mnewdf2[39m: [32mDataFrame[39m = [CallNumber: string, UnitID: string ... 26 more fields]

In [16]:
//Verification
newdf2.printSchema()

root
 |-- CallNumber: string (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: string (nullable = true)
 |-- CallType: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: string (nullable = true)
 |-- ALSUnit: string (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: string (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: string (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true

## Q10. Quels sont les types d'appels les plus courants?

In [17]:
//La méthode groupBy() permet de regrouper les lignes de la base de données en fonction des valeurs de la colonne 
//"CallType". La méthode count() est ensuite utilisée pour compter le nombre de lignes pour chaque groupe et 
//la méthode orderBy() est utilisée pour trier les résultats par ordre décroissant de nombre d'appels.
import org.apache.spark.sql.functions.{col, desc}
val topCalls = newdf2.groupBy("CallType").count().orderBy(desc("count")).show()

+--------------------+------+
|            CallType| count|
+--------------------+------+
|    Medical Incident|113794|
|      Structure Fire| 23319|
|              Alarms| 19406|
|   Traffic Collision|  7013|
|Citizen Assist / ...|  2524|
|               Other|  2166|
|        Outside Fire|  2094|
|        Vehicle Fire|   854|
|Gas Leak (Natural...|   764|
|        Water Rescue|   755|
|Odor (Strange / U...|   490|
|   Electrical Hazard|   482|
|Elevator / Escala...|   453|
|Smoke Investigati...|   391|
|          Fuel Spill|   193|
|              HazMat|   124|
|Industrial Accidents|    94|
|           Explosion|    89|
|Train / Rail Inci...|    57|
|  Aircraft Emergency|    36|
+--------------------+------+
only showing top 20 rows



[32mimport [39m[36morg.apache.spark.sql.functions.{col, desc}
[39m

## Q11. Quels sont les boites postales rencontrées dans les appels les plus courants?

In [47]:
// On Sélectionne les colonnes Zipcode et CallType, grouper par Zipcode et CallType, 
// puis compter le nombre d'appels pour chaque BoitePostale et TypeAppel
val countByPostalCode = newdf.groupBy("Zipcode", "CallType")
                          .count()
                          .orderBy(desc("count"))

// Affichons le résultats
countByPostalCode.show()

+-------+----------------+-----+
|Zipcode|        CallType|count|
+-------+----------------+-----+
|  94102|Medical Incident|16130|
|  94103|Medical Incident|14775|
|  94110|Medical Incident| 9995|
|  94109|Medical Incident| 9479|
|  94124|Medical Incident| 5885|
|  94112|Medical Incident| 5630|
|  94115|Medical Incident| 4785|
|  94122|Medical Incident| 4323|
|  94107|Medical Incident| 4284|
|  94133|Medical Incident| 3977|
|  94117|Medical Incident| 3522|
|  94134|Medical Incident| 3437|
|  94114|Medical Incident| 3225|
|  94118|Medical Incident| 3104|
|  94121|Medical Incident| 2953|
|  94116|Medical Incident| 2738|
|  94132|Medical Incident| 2594|
|  94110|  Structure Fire| 2267|
|  94105|Medical Incident| 2258|
|  94102|  Structure Fire| 2229|
+-------+----------------+-----+
only showing top 20 rows



[36mcountByPostalCode[39m: [32mDataset[39m[[32mRow[39m] = [Zipcode: string, CallType: string ... 1 more field]

## Q12. Quels sont les quartiers de San Francisco dont les codes postaux sont `94102` et `94103`?**

In [22]:
spark.sql("select neighborhood, Zipcode from FireCaal where Zipcode = 94102 or Zipcode = 94103").show()

+--------------------+-------+
|        neighborhood|Zipcode|
+--------------------+-------+
|          Tenderloin|  94102|
|          Tenderloin|  94102|
|     South of Market|  94103|
|             Mission|  94103|
|             Mission|  94103|
|     South of Market|  94103|
|          Tenderloin|  94102|
|        Hayes Valley|  94102|
|          Tenderloin|  94102|
|          Tenderloin|  94102|
|     South of Market|  94103|
|        Hayes Valley|  94102|
|Financial Distric...|  94103|
|        Hayes Valley|  94102|
|             Mission|  94103|
|          Tenderloin|  94102|
|          Tenderloin|  94102|
|        Hayes Valley|  94102|
|     South of Market|  94103|
|     South of Market|  94103|
+--------------------+-------+
only showing top 20 rows



## Q13. Determinez le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?

In [23]:
//la méthode count() permet de calculer le nombre total d'appels dans le DataFrame df2.
//Les méthodes avg(), min() et max() permet de calculer respectivement la moyenne, le minimum et le maximum du temps
//de réponse des appels dans notre nouveaux dataframe newdf3.

import org.apache.spark.sql.functions.{avg, min, max}
// Convertir la colonne "ResponseDelayedinMins" en type numérique
val newdf3 = newdf2.withColumn("ResponseDelayedinMins", col("ResponseDelayedinMins").cast("double"))

// Calculer le nombre total d'appels
val totalCount = newdf3.count()

// Calculer la moyenne, le minimum et le maximum du temps de réponse des appels
val moyenne = newdf3.agg(avg("ResponseDelayedinMins")).first()(0)
val minimum = newdf3.agg(min("ResponseDelayedinMins")).first()(0)
val maximum = newdf3.agg(max("ResponseDelayedinMins")).first()(0)

[32mimport [39m[36morg.apache.spark.sql.functions.{avg, min, max}
// Convertir la colonne "ResponseDelayedinMins" en type numérique
[39m
[36mnewdf3[39m: [32mDataFrame[39m = [CallNumber: string, UnitID: string ... 26 more fields]
[36mtotalCount[39m: [32mLong[39m = [32m175296L[39m
[36mmoyenne[39m: [32mAny[39m = [32m3.8923641541750134[39m
[36mminimum[39m: [32mAny[39m = [32m0.016666668[39m
[36mmaximum[39m: [32mAny[39m = [32m1844.55[39m

## Q14. Combien d'années distinctes trouve t-on dans ce Dataset? 

Hint: Appliquer la fonction `year()` a la colonne `IncidentDate`

In [25]:
//Pour déterminer le nombre d'années distinctes dans ce Dataset, nous pouvons sélectionner la colonne "IncidentDate", 
//extraire l'année de chaque date avec la fonction year() et utiliser la fonction distinct() pour obtenir les années 
//uniques. Ensuite, nous pouvons utiliser la méthode count() pour connaître le nombre d'années distinctes.

import org.apache.spark.sql.functions._
import spark.implicits._

val distinctYears = newdf3.select(year($"IncidentDate")).distinct()
val countYears = distinctYears.count()

[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36mspark.implicits._

[39m
[36mdistinctYears[39m: [32mDataset[39m[[32mRow[39m] = [year(IncidentDate): int]
[36mcountYears[39m: [32mLong[39m = [32m19L[39m

## Q15. Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?

In [29]:
//Filtrer les données pour ne conserver que les appels d'incendie de l'année 2018
val calls2018 = newdf2.filter(year($"IncidentDate") === 2018 && $"CallType".like("%Fire%"))

//Calcul du nombre d'appels d'incendie pour chaque semaine de l'année
val callsByWeek = calls2018.groupBy(weekofyear($"IncidentDate").alias("Week")).count()

//Trouver la semaine avec le plus grand nombre d'appels d'incendie
val maxWeek = callsByWeek.orderBy(desc("count")).first().getInt(0)


[36mcalls2018[39m: [32mDataset[39m[[32mRow[39m] = [CallNumber: string, UnitID: string ... 26 more fields]
[36mcallsByWeek[39m: [32mDataFrame[39m = [Week: int, count: bigint]
[36mmaxWeek[39m: [32mInt[39m = [32m1[39m

Le résultat ci dessus montre que la 1er semaine de l’année 2018 a eu le plus d'appels d'incendie

## Q16. Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?

In [30]:
// Filtrer les appels d'incendie de l'année 2018
val calls2018 = newdf2.filter(year($"IncidentDate") === 2018)

// Grouper par quartier et calculer la moyenne du temps de réponse
val avgResponseByNeighborhood = calls2018.groupBy($"Neighborhood").agg(avg($"ResponseDelayedinMins").alias("AvgResponseTime"))

// Trier les résultats par ordre décroissant du temps de réponse
val worstResponseNeighborhoods = avgResponseByNeighborhood.sort($"AvgResponseTime".desc)

// Afficher les quartiers avec le pire temps de réponse
worstResponseNeighborhoods.show()

+--------------------+------------------+
|        Neighborhood|   AvgResponseTime|
+--------------------+------------------+
|           Chinatown| 6.190314097905762|
|            Presidio|5.8292270414492755|
|     Treasure Island|      5.4537037125|
|        McLaren Park| 4.744047642857142|
|Bayview Hunters P...|4.6205619568773955|
|    Presidio Heights| 4.594131472394366|
|        Inner Sunset| 4.438095199935065|
|      Inner Richmond| 4.364728682713178|
|Financial Distric...| 4.344084618290156|
|      Haight Ashbury| 4.266428599285714|
|            Seacliff| 4.261111146666666|
|  West of Twin Peaks| 4.190952390857142|
|        Potrero Hill| 4.190555557428572|
|     Pacific Heights| 4.180453718900524|
|          Tenderloin| 4.101519516597271|
|Oceanview/Merced/...| 3.947242180719425|
|           Excelsior|3.9363993797169807|
|         North Beach|  3.88924964034632|
|           Lakeshore|3.8815513650943387|
|         Mission Bay| 3.854868952191012|
+--------------------+------------

[36mcalls2018[39m: [32mDataset[39m[[32mRow[39m] = [CallNumber: string, UnitID: string ... 26 more fields]
[36mavgResponseByNeighborhood[39m: [32mDataFrame[39m = [Neighborhood: string, AvgResponseTime: double]
[36mworstResponseNeighborhoods[39m: [32mDataset[39m[[32mRow[39m] = [Neighborhood: string, AvgResponseTime: double]

## Q17. Stocker les données sous format de fichiers Parquet

In [43]:
newdf2.write.format("parquet").save("tmp/parket/files/")

## Q18. Rechargez  les données stockées en format Parquet

In [44]:
val newdataDF = spark.read.format("parquet").load("tmp/parket/files/")
              

[36mnewdataDF[39m: [32mDataFrame[39m = [CallNumber: string, UnitID: string ... 26 more fields]

In [45]:
newdataDF.printSchema

root
 |-- CallNumber: string (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: string (nullable = true)
 |-- CallType: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: string (nullable = true)
 |-- ALSUnit: string (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: string (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: string (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true