# FLight usa dojo

#### Lire la donnée

On va utiliser `DataFrameReader` pour lire la donnée, ce dernier est lié à notre `SparkSession` et adapté pour lire de gros volume de donnée.

Construction du schema : Dans notre cas on veut faire deviner à spark notre schema de donnée. On va donc utiliser l'option : `schema inference`.

In [3]:
from pyspark.shell import spark

flightData2015 = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv("./data/2015-summary.csv")

Ce dataframe possède un ensemble de colonnes avec un nombre non spécifié de lignes. La raison pour laquelle le nombre de lignes n'est pas spécifié est que la lecture des données est une transformation, et elle est donc une opération `lazy` (dite paraisseuse). Spark ne jette un coup d'œil qu'à quelques lignes de données pour essayer de deviner les types de chaque colonne. La figure 2-7 illustre la lecture du fichier CSV dans un DataFrame, puis sa conversion en un tableau local ou une liste de lignes.

![reader](static/reader.png)

Si nous effectuons l'action take sur le DataFrame, nous pourrons voir les 3 premières lignes du DataFrame:

In [7]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

> N'oubliez pas que le tri ne modifie pas le DataFrame.
> Nous utilisons sort comme une transformation qui renvoie un nouveau DataFrame en transformant le DataFrame précédent.
> Illustrons ce qui se passe lorsque nous appelons take sur ce DataFrame résultant.

![figure 2-8. Reading, sorting, and collecting a DataFrame](static/reading_sorting_collecting.png)

Rien ne se produit dans les données lorsque nous appelons sort parce qu'il s'agit simplement d'une transformation. Cependant, nous pouvons voir que Spark est en train de construire un plan pour savoir comment il va exécuter cette requête à travers le cluster en regardant le plan explain. Nous pouvons appeler explain sur n'importe quel objet DataFrame pour voir le lineage (lignée) du DataFrame (ou comment Spark va exécuter cette requête) :

#### Explains

In [8]:
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#42 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#42 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#52]
      +- FileScan csv [DEST_COUNTRY_NAME#40,ORIGIN_COUNTRY_NAME#41,count#42] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/loic.caminale/Workspace/formation/dataguru/labs/modules/ba..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




Félicitations, vous venez de lire votre premier plan d'explication ! Les plans d'explication sont un peu obscurs, mais avec un peu de pratique, ils deviennent une seconde nature. Vous pouvez lire les plans d'explication de haut en bas, le haut étant le résultat final, et le bas étant la ou les sources de données. Dans ce cas, jetez un coup d'œil aux premiers mots-clés. Vous verrez trier, échanger, et FileScan. C'est parce que le tri de nos données est en fait une transformation large, car les lignes devront être comparées les unes aux autres. Ne vous inquiétez pas trop de comprendre tout ce qui concerne les plans d'explication à ce stade, ils peuvent juste être des outils utiles pour déboguer et améliorer vos connaissances à mesure que vous progressez avec Spark.

Maintenant, comme nous l'avons fait auparavant, nous pouvons spécifier une action pour lancer ce plan. Cependant, avant de faire cela, nous allons définir une configuration. Par défaut, lorsque nous effectuons un shuffle, Spark produit 200 partitions de shuffle. Réglons cette valeur à 5 pour réduire le nombre de partitions de sortie du shuffle :

In [10]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [11]:
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

La photo ci-dessous illustre cette opération. Remarquez qu'en plus des transformations logiques, nous incluons également le nombre de partitions physiques.

![Figure 2-9. The process of logical and physical DataFrame manipulation](./static/logical_and_physical_partition.png)

Le plan logique des transformations que nous construisons définit un lineage (lignée) pour le DataFrame de sorte qu'à tout moment, Spark sait comment recalculer n'importe quelle partition en effectuant toutes les opérations qu'il avait auparavant sur les mêmes données d'entrée. Cela se trouve au cœur du modèle de programmation de Spark - la programmation fonctionnelle où les mêmes entrées donnent toujours les mêmes sorties lorsque les transformations sur ces données restent constantes.

Nous ne manipulons pas les données physiques ; au lieu de cela, nous configurons les caractéristiques d'exécution physique par le biais de choses comme le paramètre shuffle partitions que nous avons défini il y a quelques instants. Nous nous sommes retrouvés avec cinq partitions de sortie parce que c'est la valeur que nous avons spécifiée dans le paramètre shuffle partition. Vous pouvez modifier ce paramètre pour aider à contrôler les caractéristiques d'exécution physique de vos travaux Spark. Allez-y, expérimentez avec différentes valeurs et voyez vous-même le nombre de partitions. En expérimentant avec différentes valeurs, vous devriez voir des temps d'exécution radicalement différents. N'oubliez pas que vous pouvez surveiller la progression des travaux en naviguant vers l'interface utilisateur Spark sur le port 4040 pour voir les caractéristiques d'exécution physique et logique de vos travaux.

#### DataFrames and SQL

Nous avons travaillé sur une transformation simple dans l'exemple précédent, travaillons maintenant sur une transformation plus complexe et suivons-la à la fois dans DataFrames et SQL. Spark peut exécuter les mêmes transformations, quel que soit le langage, exactement de la même manière. Vous pouvez exprimer votre logique d'entreprise en SQL ou DataFrames (soit en R, Python, Scala ou Java) et Spark compilera cette logique vers un plan sous-jacent (que vous pouvez voir dans le plan d'explication) avant d'exécuter réellement votre code. Avec Spark SQL, vous pouvez enregistrer n'importe quel DataFrame comme une table ou une vue (une table temporaire) et l'interroger en utilisant du pur SQL. Il n'y a pas de différence de performance entre l'écriture de requêtes SQL ou l'écriture de code DataFrame, ils sont tous deux "compilés" au même plan sous-jacent que nous spécifions dans le code DataFrame.

You can make any DataFrame into a table or view with one simple method call:

In [12]:
flightData2015.createOrReplaceTempView("flight_data_2015")

Maintenant, nous pouvons interroger nos données en SQL. Pour ce faire, nous allons utiliser la fonction spark.sql (rappelez-vous, spark est notre variable SparkSession) qui renvoie commodément un nouveau DataFrame. Bien que cela puisse sembler un peu circulaire dans la logique - qu'une requête SQL contre un DataFrame renvoie un autre DataFrame - c'est en fait assez puissant. Cela vous permet de spécifier les transformations de la manière qui vous convient le mieux à un moment donné et de ne pas sacrifier l'efficacité pour le faire ! Pour comprendre ce qui se passe, examinons deux plans d'explication :

In [13]:
# in Python
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

dataFrameWay = flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .count()

sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#40], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#40, 5), ENSURE_REQUIREMENTS, [id=#74]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#40], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#40] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/loic.caminale/Workspace/formation/dataguru/labs/modules/ba..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#40], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#40, 5), ENSURE_REQUIREMENTS, [id=#87]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#40], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#40] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIn