# Big Data avec Spark : Spark SQL

*`Nom & Prenom : ABABACAR SAGNA`*

## Problematique

Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Datasets** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données [ici](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

**Download csv file [here](https://data.sfgov.org/api/views/nuek-vuh3/rows.csv?accessType=DOWNLOAD)**

## Travail à faire.
L'objectif de ce travail est de comprendre le Dataset SF Fire afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquats.

- Code lisible et bien indenté, 
- N'oublier pas de mettre en commentaire la justification de votre réponse sur les cellule Markdown. 


#### Note:
- Vous pouvez en groupe (au plus deux étudiants) . 

## Q1. Importez les modules Spark necessaires

In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.5`
import $ivy.`sh.almond::almond-spark:0.10.9`
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql._

[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36m$ivy.$                               
[39m
[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36morg.apache.spark.sql.DataFrame
[39m
[32mimport [39m[36morg.apache.spark.sql.SparkSession
[39m
[32mimport [39m[36morg.apache.spark.storage.StorageLevel
[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m
[32mimport [39m[36morg.apache.spark.sql._[39m

In [4]:
import org.apache.log4j.{Level, Logger}

val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}

[39m
[36mrootLogger[39m: [32mLogger[39m = org.apache.log4j.spi.RootLogger@5065bd94

In [3]:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m

## Q2. Creez la Spark Session

In [6]:
val spark = SparkSession.builder
  .master("local")
  .appName("Mon-Projet-spark")
  .config("spark.some.config.option", "config-value")
  .getOrCreate()

23/04/12 16:55:20 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.


[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@573f5e2f

## Q3. Chargez les données

Utilisez le `fireSchema` definit dans la cellule suivante pour le chargement.

In [7]:
import org.apache.spark.sql.types._

val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))

// your code here (hint spark session name is sparkSession Q2)
val sfFireData = spark.read.option("header", "false").schema(fireSchema).csv("sf-fire-calls.csv")


[32mimport [39m[36morg.apache.spark.sql.types._

[39m
[36mfireSchema[39m: [32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"CallNumber"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"UnitID"[39m, StringType, true, {}),
  [33mStructField[39m([32m"IncidentNumber"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"CallType"[39m, StringType, true, {}),
  [33mStructField[39m([32m"CallDate"[39m, StringType, true, {}),
  [33mStructField[39m([32m"WatchDate"[39m, StringType, true, {}),
  [33mStructField[39m([32m"CallFinalDisposition"[39m, StringType, true, {}),
  [33mStructField[39m([32m"AvailableDtTm"[39m, StringType, true, {}),
  [33mStructField[39m([32m"Address"[39m, StringType, true, {}),
  [33mStructField[39m([32m"City"[39m, StringType, true, {}),
  [33mStructField[39m([32m"Zipcode"[39m, IntegerType, true, {}),
  [33mStructField[39m([32m"Battalion"[39m, StringType, true, {}),
  [33mStructField[39m(

## Q4. Mettez en cache les donnees chargees

In [8]:
// dataframe_name.cache
val data = sfFireData.cache()

[36mdata[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 26 more fields]

On utilise la mise en cache quand on effectue plusieurs actions sur le même DataFrame. 

## Q5. Supprimez tous les appels de type `Medical Incident`

Hint: appliquez la methode `.filter()` a la colonne `CallType` avec l'operateur `=!=`

In [9]:
// Importer le module necessaire
import org.apache.spark.sql.functions.col

/* Utilisation de la methode .filter pour filtrer par rapport a la colonne "CallType" et retirer toutes les lignes qui ont le
    d'appel  "Medical Incident"
*/

val filterSFDF = data.filter(col("CallType") =!= "Medical Incident")


[32mimport [39m[36morg.apache.spark.sql.functions.col

/* Utilisation de la methode .filter pour filtrer par rapport a la colonne "CallType" et retirer toutes les lignes qui ont le
    d'appel  "Medical Incident"
*/

[39m
[36mfilterSFDF[39m: [32mDataset[39m[[32mRow[39m] = [CallNumber: int, UnitID: string ... 26 more fields]

## Q6. Combien de types d'appels distincts ont été passés ?**  

In [12]:
import org.apache.spark.sql.functions.countDistinct
/* Cette methode ~agg(countDistinct)~ permet de recuperer les types distincts et d'aggreger de "1" (+1), 
lorsque le type rencontree est unique
*/
val distinctsCallType = data.agg(countDistinct("CallType"))
distinctsCallType.show()

+------------------------+
|count(DISTINCT CallType)|
+------------------------+
|                      30|
+------------------------+



[32mimport [39m[36morg.apache.spark.sql.functions.countDistinct
/* Cette methode ~agg(countDistinct)~ permet de recuperer les types distincts et d'aggreger de "1" (+1), 
lorsque le type rencontree est unique
*/
[39m
[36mdistinctsCallType[39m: [32mDataFrame[39m = [count(DISTINCT CallType): bigint]

## Q7. Quels types d'appels  ont été passés au service d'incendie?

In [13]:
// Cette methode permet de les afficher (Les types distincts de la colonne "CallType")

/* Ici on precise 30 dans la methode .show() pour afficher les 20 "CallType" */

data.select("CallType").distinct.show(false)

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|null                                        |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Coll

## Q8. Trouvez toutes les réponses ou les délais sont supérieurs à 5 minutes

Hint:
1. Renommez la colonne `Delay` -> `ReponseDelayedinMins`
2. Retournez un nouveau DataFrame
3. Affichez tous les appels où le temps de réponse au site d'incendie a eu un retard de plus de 5 minutes

In [31]:
// Renommons la colonne ~Delay~ en ~ReponseDelaydinmins~, puis retournons un nouveau df
val df = data.withColumnRenamed("Delay","ReponseDelayedinMins")

// Selectionner tous les appels où le temps de réponse au site d'incendie a eu un retard de plus de 5 minutes
val dfa = df.withColumn("Reponse_gt_5", col("ReponseDelayedinMins").gt(5))

// Afficher le résultat

val delays_gt_5 = dfa.filter(col("ReponseDelayedinMins").gt(5))
delays_gt_5.select(
    "ReponseDelayedinMins",
    "Reponse_gt_5"
).show()


+--------------------+------------+
|ReponseDelayedinMins|Reponse_gt_5|
+--------------------+------------+
|                5.35|        true|
|                6.25|        true|
|                 5.2|        true|
|                 5.6|        true|
|                7.25|        true|
|           11.916667|        true|
|            5.116667|        true|
|            8.633333|        true|
|            95.28333|        true|
|                5.45|        true|
|                 7.6|        true|
|            6.133333|        true|
|           5.1833334|        true|
|           6.9166665|        true|
|                 5.2|        true|
|                6.35|        true|
|            7.983333|        true|
|               13.55|        true|
|                5.15|        true|
|           13.583333|        true|
+--------------------+------------+
only showing top 20 rows



[36mdf[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 26 more fields]
[36mdfa[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 27 more fields]
[36mdelays_gt_5[39m: [32mDataset[39m[[32mRow[39m] = [CallNumber: int, UnitID: string ... 27 more fields]

## Q9. Convertissez les colonnes dates en timestamp

Hint:
* `CallDate` -> `IncidentDate`
* `WatchDate` -> `OnWatchDate`
* `AvailableDtTm` -> `AvailableDtTS`
exemple code pour le cas de `CallDate`:
`dataframe.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate")`

In [15]:
// Importons le module necessaire pour cette transformation
import org.apache.spark.sql.functions.to_timestamp

// CallDate
val dfb = dfa.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate")

// WatchDate
val dfc = dfb.withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate")

// AvailableDtTm
val dfd = dfc.withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy")).drop("AvailableDtTm")

// Display the result
dfd.select(
    "IncidentDate",
    "OnWatchDate",
    "AvailableDtTS"
).show(2,false)


+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|null               |null               |null               |
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 00:00:00|
+-------------------+-------------------+-------------------+
only showing top 2 rows



[32mimport [39m[36morg.apache.spark.sql.functions.to_timestamp

// CallDate
[39m
[36mdfb[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 27 more fields]
[36mdfc[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 27 more fields]
[36mdfd[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 27 more fields]

## Q10. Quels sont les types d'appels les plus courants?

In [32]:
// On recupere le nombre de type d'appel, pour chaque type d'appel
val countDistinctCallType =  dfd.groupBy("CallType").count()
// countDistinctCallType.orderBy("count").show(35,false)


// show(35,false) : pour afficher toutes les valeurs, sans restriction

countDistinctCallType
  .withColumn("num_gt_5000", col("count").gt(5000))
  .show(5,false)

val countDistinctCallType =  dfd.groupBy("CallType").count()

countDistinctCallType
  .withColumn("num_gt_5000", col("count").gt(5000))
  .orderBy(col("count").desc)
  .show(false)


+-------------------------------+------+-----------+
|CallType                       |count |num_gt_5000|
+-------------------------------+------+-----------+
|Medical Incident               |113794|true       |
|Structure Fire                 |23319 |true       |
|Alarms                         |19406 |true       |
|Traffic Collision              |7013  |true       |
|Citizen Assist / Service Call  |2524  |false      |
|Other                          |2166  |false      |
|Outside Fire                   |2094  |false      |
|Vehicle Fire                   |854   |false      |
|Gas Leak (Natural and LP Gases)|764   |false      |
|Water Rescue                   |755   |false      |
|Odor (Strange / Unknown)       |490   |false      |
|Electrical Hazard              |482   |false      |
|Elevator / Escalator Rescue    |453   |false      |
|Smoke Investigation (Outside)  |391   |false      |
|Fuel Spill                     |193   |false      |
|HazMat                         |124   |false 

[36mcountDistinctCallType[39m: [32mDataFrame[39m = [CallType: string, count: bigint]

## Q11. Quels sont les boites postales rencontrées dans les appels les plus courants?

In [33]:
// Calculer le nombre d'appels par code postal
val dfZipcode = dfd.groupBy("Zipcode").count()

// Afficher les 5 codes postaux les plus fréquents
dfZipcode.orderBy($"count".desc).show(5)

// Filtrer les appels par type d'appel le plus fréquent
val frequentCallTypes = Seq("Medical Incident", "Structure Fire", "Alarms", "Traffic Collision")
val filteredData = dfd.filter($"CallType".isin(frequentCallTypes:_*))

// Afficher les codes postaux distincts des appels filtrés
filteredData.select("Zipcode").distinct().show(false)


+-------+-----+
|Zipcode|count|
+-------+-----+
|  94102|21840|
|  94103|20897|
|  94110|14801|
|  94109|14686|
|  94124| 9236|
+-------+-----+
only showing top 5 rows

+-------+
|Zipcode|
+-------+
|94109  |
|94115  |
|94112  |
|94127  |
|94108  |
|94121  |
|94105  |
|null   |
|94131  |
|94116  |
|94134  |
|94124  |
|94102  |
|94114  |
|94107  |
|94111  |
|94103  |
|94117  |
|94122  |
|94110  |
+-------+
only showing top 20 rows



[36mdfZipcode[39m: [32mDataFrame[39m = [Zipcode: int, count: bigint]
[36mfrequentCallTypes[39m: [32mSeq[39m[[32mString[39m] = [33mList[39m(
  [32m"Medical Incident"[39m,
  [32m"Structure Fire"[39m,
  [32m"Alarms"[39m,
  [32m"Traffic Collision"[39m
)
[36mfilteredData[39m: [32mDataset[39m[[32mRow[39m] = [CallNumber: int, UnitID: string ... 27 more fields]

## Q12. Quels sont les quartiers de San Francisco dont les codes postaux sont `94102` et `94103`?**

In [19]:
// On fait un filtre avec ces deux "Zipcode" specifiquements 
dfd.filter($"Zipcode" === 94102 || $"Zipcode" === 94103).select("Address").show()

+--------------------+
|             Address|
+--------------------+
|MARKET ST/MCALLIS...|
|600 Block of POLK ST|
|    9TH ST/HOWARD ST|
|400 Block of VALE...|
|  16TH ST/MISSION ST|
|   4TH ST/MISSION ST|
|400 Block of TURK ST|
|   OAK ST/WEBSTER ST|
| 0 Block of JONES ST|
|400 Block of EDDY ST|
|300 Block of CLEM...|
| 500 Block of OAK ST|
|700 Block of MARK...|
|HAIGHT ST/OCTAVIA ST|
|100 Block of JULI...|
|0 Block of LARKIN ST|
|100 Block of TURK ST|
|CALL BOX: BUCHANA...|
|    5TH ST/MARKET ST|
| 100 Block of 7TH ST|
+--------------------+
only showing top 20 rows



## Q13. Determinez le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?

In [35]:
// le nombre total d'appels
dfd.groupBy("CallType").count().show(31,false)



import org.apache.spark.sql.functions._

val responseStats = dfd.agg(avg("ReponseDelayedinMins"), min("ReponseDelayedinMins"), max("ReponseDelayedinMins"))
responseStats.show()


+--------------------------------------------+------+
|CallType                                    |count |
+--------------------------------------------+------+
|Elevator / Escalator Rescue                 |453   |
|Marine Fire                                 |14    |
|Aircraft Emergency                          |36    |
|Confined Space / Structure Collapse         |13    |
|Administrative                              |3     |
|Alarms                                      |19406 |
|Odor (Strange / Unknown)                    |490   |
|null                                        |1     |
|Citizen Assist / Service Call               |2524  |
|HazMat                                      |124   |
|Watercraft in Distress                      |28    |
|Explosion                                   |89    |
|Oil Spill                                   |21    |
|Vehicle Fire                                |854   |
|Suspicious Package                          |15    |
|Extrication / Entrapped (Ma

[32mimport [39m[36morg.apache.spark.sql.functions._

[39m
[36mresponseStats[39m: [32mDataFrame[39m = [avg(ReponseDelayedinMins): double, min(ReponseDelayedinMins): float ... 1 more field]

## Q14. Combien d'années distinctes trouve t-on dans ce Dataset? 

Hint: Appliquer la fonction `year()` a la colonne `IncidentDate`

In [21]:
// On cree une nouvelle colonne "year"
val dfe = dfd.withColumn("year", year(to_timestamp($"IncidentDate", "MM/dd/yyyy")))

// On recupere tous les annees distincts par aggregation
dfe.agg(countDistinct("year")).show()

+--------------------+
|count(DISTINCT year)|
+--------------------+
|                  19|
+--------------------+



[36mdfe[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 28 more fields]

## Q15. Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?

In [40]:


// Créer une colonne avec le format semaine "week_of_year"
val dff = dfe.withColumn("week_of_year", date_format(col("IncidentDate"), "w"))

// Calculer le nombre d'incidents par semaine en 2018
val dfIncidentByWeek = dff.filter(year($"IncidentDate") === 2018)
  .groupBy($"week_of_year")
  .agg(count($"IncidentDate").as("count"))
  .orderBy(desc("count"))

// Afficher la semaine avec le plus grand nombre d'appels d'incendie
dfIncidentByWeek.show(1, false)


+------------+-----+
|week_of_year|count|
+------------+-----+
|22          |272  |
+------------+-----+
only showing top 1 row



[36mdff[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 29 more fields]
[36mdfIncidentByWeek[39m: [32mDataset[39m[[32mRow[39m] = [week_of_year: string, count: bigint]

## Q16. Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?

In [44]:

// Filtrer les données pour ne garder que celles de l'année 2018
val df2018 = dfe.filter(year(col("IncidentDate")) === 2018)

// Calculer le temps de réponse pour chaque appel
val dfResponseTime = df2018.withColumn("ResponseTime", unix_timestamp(col("AvailableDtTS")) - unix_timestamp(col("IncidentDate")))

// Agréger les temps de réponse par quartier et calculer la moyenne
val dfResponseByNeighborhood = dfResponseTime.groupBy(col("Neighborhood")).agg(avg(col("ResponseTime")).as("AvgResponseTime"))

// Trier les quartiers par temps de réponse moyen croissant et afficher les 5 premiers
dfResponseByNeighborhood.orderBy(col("AvgResponseTime").asc).show(5, false)



+------------+---------------+
|Neighborhood|AvgResponseTime|
+------------+---------------+
|McLaren Park|0.0            |
|Seacliff    |0.0            |
|Excelsior   |0.0            |
|Lincoln Park|0.0            |
|None        |0.0            |
+------------+---------------+
only showing top 5 rows



[36mdf2018[39m: [32mDataset[39m[[32mRow[39m] = [CallNumber: int, UnitID: string ... 28 more fields]
[36mdfResponseTime[39m: [32mDataFrame[39m = [CallNumber: int, UnitID: string ... 29 more fields]
[36mdfResponseByNeighborhood[39m: [32mDataFrame[39m = [Neighborhood: string, AvgResponseTime: double]

## Q17. Stocker les données sous format de fichiers Parquet

In [55]:
df.write.format("parquet").save("/tmmpp/fiiredataService_parquet/fiiles/")


23/04/12 19:11:31 ERROR Executor: Exception in task 0.0 in stage 124.0 (TID 4122)
java.io.IOException: (null) entry in command string: null chmod 0644 C:\tmmpp\fiiredataService_parquet\fiiles\_temporary\0\_temporary\attempt_20230412191131_0124_m_000000_4122\part-00000-d7bd89b9-48b7-4721-b40e-94b27ac9e4d8-c000.snappy.parquet
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:762)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:859)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:842)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:661)
	at org.apache.hadoop.fs.ChecksumFileSystem$1.apply(ChecksumFileSystem.java:501)
	at org.apache.hadoop.fs.ChecksumFileSystem$FsOperation.run(ChecksumFileSystem.java:482)
	at org.apache.hadoop.fs.ChecksumFileSystem.setPermission(ChecksumFileSystem.java:498)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:467)
	at org.apache.hadoop.fs.ChecksumFileSystem.create

	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceiv

	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:236)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:177)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	... 3 more


: 

## Q18. Rechargez  les données stockées en format Parquet

In [56]:
val newdataDF = sparkSession.read.parquet("/tmmpp/fiiredataService_parquet/fiiles/")

: 

In [56]:
newdataDF.printSchema

cmd56.sc:1: not found: value newdataDF
val res56 = newdataDF.printSchema
            ^Compilation Failed

: 