<a href="https://colab.research.google.com/github/Yunpei24/BigDataBase/blob/main/TP3_Programmation_avec_Spark_SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction à <i>Apache Spark</i>
Le but principal de ce notebook est de prendre en main <b>Spark SQL</b> qui est un framework open source de calcul distribué. Il est un cadre applicatif de traitements Big Data pour effectuer des analyses complexes à grande échelle. L'objet de son développement était d'avoir une solution pour accélérer les traitements sous Hadoop. 

Aujourd'hui, les développeurs mettent en avant la rapidité de Spark en termes d'exécution des tâches par rapport à MapReduce.

<mark>En 2014, Spark a gagné le <b>Daytona GraySort Contest</b> dont l'objectif est de trier 100 To de données le plus rapidement possible. Ce record était préalablement détenu par Hadoop.</mark> Pour ce faire, Spark a utilisé 206 machines pour un temps d'exécution final de 23 minutes alors que Hadoop avait utilisé 2 100 machines pour un temps d'exécution final de 72 minutes. <b>La puissance de Spark fut ainsi démontrée en étant 3 fois plus rapide et en nécessitant approximativement 10 fois moins de machines</b>.

Les contributeurs qui participent au développement de Spark sont nombreux et proviennent d'environ 200 sociétés différentes telles que Intel, Facebook, IBM et Netflix ...  <a href="https://fr.wikipedia.org/wiki/Apache_Spark">Wikipédia</a>


# Initialisation de l'environnement d'exécution

Installation du JDK

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Téléchargement de l'archive du framework Apache Spark

In [2]:
# Download Spark
!wget -q https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz

Extraction de l'archive dans le dossier courant <mark>/content</mark>

In [3]:
# Unzip the file
!tar xf spark-3.3.1-bin-hadoop3.tgz

Installation des modules Python <b>pyspark</b> et <b>findspark</b>

In [4]:
!pip install -q pyspark
!pip install -q findspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m23.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


Test de l'installation de pyspark

In [5]:
!find /content -name "pyspark"

/content/spark-3.3.1-bin-hadoop3/bin/pyspark
/content/spark-3.3.1-bin-hadoop3/python/pyspark
/content/spark-3.3.1-bin-hadoop3/python/pyspark/python/pyspark


Création des variables d'environnement <mark>SPARK_HOME</mark> et <mark>JAVA_HOME</mark> pour situer respectivement les emplacements d'installation de Spark et Java 

In [6]:
import os
os.environ["SPARK_HOME"] =  "/content/spark-3.3.1-bin-hadoop3" 
os.environ["JAVA_HOME"] ="/usr/lib/jvm/java-8-openjdk-amd64"

Importation des bibliothèques Spark SQL

In [7]:
import findspark 
print("findspark.init() initialise les variables d'environnement pour spark") 
findspark.init() 

# Pyspark session objects
from pyspark.sql import SparkSession 
# Pyspark session configuration
from pyspark import SparkConf  

# Pyspark functions
import pyspark.sql.functions as f
from pyspark.sql import * 

# Pyspark SQL data types
from pyspark.sql.types import *

findspark.init() initialise les variables d'environnement pour spark


# Définition de fonctions utiles pour la suite

La fonction <mark>demarrer_spark</mark> permet d'initialiser une session <i>client</i> avec Spark

In [8]:
def demarrer_spark():
  local = "local[*]"
  appName = "TP3"
  configLocale = SparkConf().setAppName(appName).setMaster(local).\
  set("spark.executor.memory", "100G").\
  set("spark.driver.memory","50G").\
  set("spark.sql.catalogImplementation","in-memory").\
  set("spark.driver.maxResultSize", "10G")
  # .\ c'est pour effectuer les instructions en meme temps
  
  spark = SparkSession.builder.config(conf = configLocale).getOrCreate()
  sc = spark.sparkContext
  sc.setLogLevel("ERROR")
  
  # spark.conf.set("spark.sql.autoBroadcastJoinThreshold","-1")
  # On ajuste l'environnement d'exécution des requêtes à la taille du cluster (4 coeurs)
  # spark.conf.set("spark.sql.shuffle.partitions","200")    

  print("session démarrée, son id est ", sc.applicationId)
  return spark

Démarrage de la session

In [9]:
spark = demarrer_spark()

session démarrée, son id est  local-1674292465113


En vue de simplifier l'exécution des requêtes SQL, nous définissons la commande magique &#128526; <b><font color="blue">%%sql</font></b> pour exécuter les requêtes plus facilement

In [10]:
from IPython.core.magic import (register_line_magic, register_cell_magic, register_line_cell_magic)

def removeComments(query):
  result = ""
  for line in query.split('\n'):
    if not(line.strip().startswith("--")):
      result += line + "\n"
  return result

@register_line_cell_magic
def sql(line, cell=None):
    "To run a sql query. Use:  %%sql"
    val = cell if cell is not None else line
    tabRequetes = removeComments(val).split(";")
    resultat = None
    est_une_requete = False
    for r in tabRequetes:
        r = r.strip()
        if len(r) > 2:
          resultat = spark.sql(r)
          est_une_requete = r.lower().startswith('select') or r.lower().startswith('with')  
    if(est_une_requete):
      resultat.explain()
      return display(resultat)
    else:
      return print('ok')

De même, nous redéfinissons la fonction <b>display</b> pour un meilleur affichage des données manipulées.

In [11]:
import pandas as pd

def display(df, n=10):
  pd.set_option('max_columns', None)
  pd.set_option('max_colwidth', None)
  return df.limit(n).toPandas()

print("display redéfini")

display redéfini


# Programmation avec <i>Spark SQL</i>
Le travail consiste à utiliser <i>Spark SQL</i> avec le langage framework <i>pyspark</i> et d'effectuer des traitements distribués via des requêtes SQL. 

Vous devrez concevoir et écrire vous-même les requêtes à partir de la deuxième activité.

## Activité 1
Cette activité illustre le fonctionnement et l'utilisation de Spark avec SQL.

Téléchargement du jeu de données de notre dernier TP, en l'occurence le fichier <u>purchases.txt</u>

In [12]:
!curl -L -o 'purchases.txt' 'https://drive.google.com/u/0/uc?id=1NS-PSXW8bSNpzFH4XRbtmMnMGhXBdYy6&export=download&confirm=t'

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100  201M  100  201M    0     0  58.0M      0  0:00:03  0:00:03 --:--:-- 64.5M


Vérification du bon déroulement du téléchargement

In [13]:
!ls .

purchases.txt  spark-3.3.1-bin-hadoop3
sample_data    spark-3.3.1-bin-hadoop3.tgz


Affichage des premiers lignes du fichier. Le format des enregistrement est le suivant:
<table border='1'><tr>
<td>Date</td><td>Heure</td><td>Magasin</td><td>Produit</td><td>Montant</td><td>Moyen_de_paiement</td>
</tr></table>
La tabulation <b>\t</b> est utilisée comme séparateur de colonne ✅

In [14]:
!head -10  ./purchases.txt

2012-01-01	09:00	San Jose	Men's Clothing	214.05	Amex
2012-01-01	09:00	Fort Worth	Women's Clothing	153.57	Visa
2012-01-01	09:00	San Diego	Music	66.08	Cash
2012-01-01	09:00	Pittsburgh	Pet Supplies	493.51	Discover
2012-01-01	09:00	Omaha	Children's Clothing	235.63	MasterCard
2012-01-01	09:00	Stockton	Men's Clothing	247.18	MasterCard
2012-01-01	09:00	Austin	Cameras	379.6	Visa
2012-01-01	09:00	New York	Consumer Electronics	296.8	Cash
2012-01-01	09:00	Corpus Christi	Toys	25.38	Discover
2012-01-01	09:00	Fort Worth	Toys	213.88	Visa


Création du dataframe Spark <mark>df</mark> à partir du jeu de données du fichier <u>purchases.txt</u> téléchargé

In [15]:
schemaTable = StructType([
    StructField("Date", DateType(), True),
    StructField("Heure", StringType(), True),
    StructField("Magasin", StringType(), True),
    StructField("Produit", StringType(), True),
    StructField("Montant", DoubleType(), True),
    StructField("ModePaiement", StringType(), True)
    ])

df = spark.read.load("purchases.txt", format="csv", sep="\t", schema=schemaTable, header=False)

Affichage du dataframe

In [16]:
display(df)

Unnamed: 0,Date,Heure,Magasin,Produit,Montant,ModePaiement
0,2012-01-01,09:00,San Jose,Men's Clothing,214.05,Amex
1,2012-01-01,09:00,Fort Worth,Women's Clothing,153.57,Visa
2,2012-01-01,09:00,San Diego,Music,66.08,Cash
3,2012-01-01,09:00,Pittsburgh,Pet Supplies,493.51,Discover
4,2012-01-01,09:00,Omaha,Children's Clothing,235.63,MasterCard
5,2012-01-01,09:00,Stockton,Men's Clothing,247.18,MasterCard
6,2012-01-01,09:00,Austin,Cameras,379.6,Visa
7,2012-01-01,09:00,New York,Consumer Electronics,296.8,Cash
8,2012-01-01,09:00,Corpus Christi,Toys,25.38,Discover
9,2012-01-01,09:00,Fort Worth,Toys,213.88,Visa


Matérialisation du dataframe comme une vue SQL avec la vue <mark>purchases</mark> qui pointe sur lui.

In [17]:
df.createOrReplaceTempView('purchases')

Test de notre première requête SQL. <b>Que fait-elle ?</b>

In [18]:
%%sql
select count(*) from purchases

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(1)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=20]
      +- HashAggregate(keys=[], functions=[partial_count(1)])
         +- FileScan csv [] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/purchases.txt], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>




Unnamed: 0,count(1)
0,4138476


<b>Que fait cette deuxième requête ?</b>
N'hésitez pas ! Exécutez la &#128521;

In [19]:
%%sql
SELECT magasin, SUM(montant) FROM purchases GROUP BY magasin ORDER BY magasin

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [magasin#2 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(magasin#2 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=70]
      +- HashAggregate(keys=[magasin#2], functions=[sum(montant#4)])
         +- Exchange hashpartitioning(magasin#2, 200), ENSURE_REQUIREMENTS, [plan_id=67]
            +- HashAggregate(keys=[magasin#2], functions=[partial_sum(montant#4)])
               +- FileScan csv [Magasin#2,Montant#4] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/purchases.txt], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Magasin:string,Montant:double>




Unnamed: 0,magasin,sum(montant)
0,Albuquerque,10052311.42
1,Anaheim,10076416.36
2,Anchorage,9933500.4
3,Arlington,10072207.97
4,Atlanta,9997146.7
5,Aurora,9992970.92
6,Austin,10057158.9
7,Bakersfield,10031208.92
8,Baltimore,10096521.45
9,Baton Rouge,10131273.23


## Activité 2
Nous continuons à travailler avec la même vue <mark>purchases</mark>. <u>Le but est d’écrire vos propres requêtes</u>.
<ol>
<li>Donner le nombre de paiement par mode de paiement.</li>
<li>Quel est le chiffre d'affaire réalisé selon les jours de la semaine ?</li>
<li>Quelle est la liste des magasins ?</li>
<li>Quel est le nombre total des ventes et la valeur totale des ventes de tous les magasins confondus ?</i>
<li>Quel magasin a fait le plus grand chiffre d'affaire ? <b>Comment devrions-nous y prendre avec MapReduce ?</b></li>
</ol>

#QUESTION1 : Donner le nombre de paiement par mode de paiement.

In [20]:
%%sql
SELECT ModePaiement, COUNT(ModePaiement) FROM purchases GROUP BY ModePaiement ORDER BY ModePaiement

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [ModePaiement#5 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(ModePaiement#5 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=135]
      +- HashAggregate(keys=[ModePaiement#5], functions=[count(ModePaiement#5)])
         +- Exchange hashpartitioning(ModePaiement#5, 200), ENSURE_REQUIREMENTS, [plan_id=132]
            +- HashAggregate(keys=[ModePaiement#5], functions=[partial_count(ModePaiement#5)])
               +- FileScan csv [ModePaiement#5] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/purchases.txt], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ModePaiement:string>




Unnamed: 0,ModePaiement,count(ModePaiement)
0,Amex,826535
1,Cash,828770
2,Discover,827426
3,MasterCard,828524
4,Visa,827221


# QUESTION 2 : Le chiffre d'affaire réalisé selon les jours de la semaine 

In [24]:
from datetime import datetime

fichier = open('/content/purchases.txt', 'r')
fichier1 = open('/content/purchases1.txt', 'a')
file_line = fichier.readlines()
fichier.close()
for line in file_line:
  data = line.strip().split("\t")
  if len(data) == 6:
    date, time, store, item, montant, payment = data
    day = datetime.fromisoformat(date)
    data_file = day.strftime("%A") + "\t"+ time+"\t"+ store+"\t"+item+"\t"+montant+"\t"+ payment
    fichier1.write(data_file +"\n")

fichier1.close()

In [29]:
schemaTable1 = StructType([
    StructField("Date", StringType(), True),
    StructField("Heure", StringType(), True),
    StructField("Magasin", StringType(), True),
    StructField("Produit", StringType(), True),
    StructField("Montant", DoubleType(), True),
    StructField("ModePaiement", StringType(), True)
    ])

df1 = spark.read.load("purchases1.txt", format="csv", sep="\t", schema=schemaTable1, header=False)

In [None]:
display(df1)

In [34]:
df1.createOrReplaceTempView('purchases1')

In [35]:
%%sql
SELECT Date, SUM(Montant) FROM purchases1 GROUP BY Date ORDER BY Date

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [Date#60 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(Date#60 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=292]
      +- HashAggregate(keys=[Date#60], functions=[sum(Montant#64)])
         +- Exchange hashpartitioning(Date#60, 200), ENSURE_REQUIREMENTS, [plan_id=289]
            +- HashAggregate(keys=[Date#60], functions=[partial_sum(Montant#64)])
               +- FileScan csv [Date#60,Montant#64] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/purchases1.txt], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Date:string,Montant:double>




Unnamed: 0,Date,sum(Montant)
0,Friday,147414900.0
1,Monday,150364100.0
2,Saturday,147410200.0
3,Sunday,150296800.0
4,Thursday,147353800.0
5,Tuesday,147246700.0
6,Wednesday,144371500.0


In [48]:
%%sql
SELECT date_format(date, 'E'), SUM(Montant) FROM purchases GROUP BY date_format(date, 'E')

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[_groupingexpression#177], functions=[sum(Montant#4)])
   +- Exchange hashpartitioning(_groupingexpression#177, 200), ENSURE_REQUIREMENTS, [plan_id=657]
      +- HashAggregate(keys=[_groupingexpression#177], functions=[partial_sum(Montant#4)])
         +- Project [Montant#4, date_format(cast(date#0 as timestamp), E, Some(Etc/UTC)) AS _groupingexpression#177]
            +- FileScan csv [Date#0,Montant#4] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/purchases.txt], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Date:date,Montant:double>




Unnamed: 0,"date_format(date, E)",sum(Montant)
0,Sun,150296800.0
1,Mon,150364100.0
2,Thu,147353800.0
3,Sat,147410200.0
4,Wed,144371500.0
5,Tue,147246700.0
6,Fri,147414900.0


# QUESTION 3 : La liste des magasins

In [46]:
%%sql
SELECT disTINCT Magasin FROM purchases

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[Magasin#2], functions=[])
   +- Exchange hashpartitioning(Magasin#2, 200), ENSURE_REQUIREMENTS, [plan_id=592]
      +- HashAggregate(keys=[Magasin#2], functions=[])
         +- FileScan csv [Magasin#2] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/purchases.txt], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Magasin:string>




Unnamed: 0,Magasin
0,North Las Vegas
1,Phoenix
2,Omaha
3,Anchorage
4,Anaheim
5,Greensboro
6,Dallas
7,Oakland
8,Laredo
9,Scottsdale


# QUESTION 4 : Le nombre total des ventes et la valeur totale des ventes de tous les magasins confondus

In [49]:
%%sql
SELECT SUM(montant) AS ChiffreAffaire, COUNT(Produit) AS NbVente FROM purchases

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[sum(montant#4), count(Produit#3)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=722]
      +- HashAggregate(keys=[], functions=[partial_sum(montant#4), partial_count(Produit#3)])
         +- FileScan csv [Produit#3,Montant#4] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/purchases.txt], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Produit:string,Montant:double>




Unnamed: 0,ChiffreAffaire,NbVente
0,1034458000.0,4138476


# QUESTION 5 : Quel magasin a fait le plus grand chiffre d'affaire ? Comment devrions-nous y prendre avec MapReduce ?

In [54]:
%%sql
CREATE OR REPLACE TEMP VIEW chiffreDaffaireParMagasin AS SELECT magasin, SUM(montant) AS chiffreDaffaire FROM purchases GROUP BY Magasin

ok


In [None]:
%%sql
SELECT magasin, chiffreDaffaire FROM chiffreDaffaireParMagasin WHERE chiffreDaffaire = (SELECT MAX(chiffreDaffaire) FROM chiffreDaffaireParMagasin)

**Deuxième manière de Faire**

In [57]:
%%sql
WITH chiffreDaffaireParMagasin AS (select magasin, sum(montant) as chiffreDaffaire from purchases group by magasin) select magasin, chiffreDaffaire from chiffreDaffaireParMagasin where chiffreDaffaire = (SELECT MAX(chiffreDaffaire) FROM chiffreDaffaireParMagasin)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Filter (isnotnull(chiffreDaffaire#243) AND (chiffreDaffaire#243 = Subquery subquery#242, [id=#972]))
   :  +- Subquery subquery#242, [id=#972]
   :     +- AdaptiveSparkPlan isFinalPlan=false
   :        +- HashAggregate(keys=[], functions=[max(chiffreDaffaire#243)])
   :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=970]
   :              +- HashAggregate(keys=[], functions=[partial_max(chiffreDaffaire#243)])
   :                 +- HashAggregate(keys=[magasin#2], functions=[sum(montant#4)])
   :                    +- Exchange hashpartitioning(magasin#2, 200), ENSURE_REQUIREMENTS, [plan_id=966]
   :                       +- HashAggregate(keys=[magasin#2], functions=[partial_sum(montant#4)])
   :                          +- FileScan csv [Magasin#2,Montant#4] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/purchases.txt], PartitionFilters: [], PushedFilters: 

Unnamed: 0,magasin,chiffreDaffaire
0,Philadelphia,10190080.26


# Références
**Réalisez des calculs distribués sur des données massives** : 
https://openclassrooms.com/fr/courses/4297166-realisez-des-calculs-distribues-sur-des-donnees-massives/4308666-prenez-spark-en-main

**Apache Spark : qu’est-ce que c’est et à quoi ça sert ?** : https://datascientest.com/apache-spark

**Traitement de données massives avec Apache Spark** : http://b3d.bdpedia.fr/spark-batch.html

**PySpark : Tout savoir sur la librairie Python** : https://datascientest.com/pyspark