# TP 5

## Goals

### Utilisation de pyspark
* filter, map and reduce
* Utilisation des fichiers CSV et JSON
* SQL et pyspark
* Écriture et lecture de fichiers ORC et Parquet

## Exercice 1 \[★\]

Tout d'abord, nous allons installer pyspark

`$ pip install pyspark`

Si l'installation a réussi, le code suivant sera exécuté avec succès.

In [1]:
from pyspark import SparkConf
from pyspark.context import SparkContext

Nous allons utiliser la configuration par défaut pour créer un contexte spark.
Ce contexte spark sera utilisé tout au long de nos exemples.

Tout d'abord, nous lisons un fichier CSV qui contient deux colonnes : languageLabel et year.

In [2]:
sc = SparkContext.getOrCreate(SparkConf())
lines = sc.textFile("../../data/pl.csv")

Nous allons utiliser les méthodes map() et reduce() avec les expressions lambda.

Notez que dans l'exercice suivant, nous n'utilisons pas les fonctions intégrées de Python.
Le but de cet exemple est de compter le nombre de caractères ligne par ligne.

In [3]:
line_length = lines.map(lambda line: len(line))
total_length = line_length.reduce(lambda a, b: a + b)
print(total_length)

1686


Dans cet exemple, nous obtenons le nombre de tokens séparés par des virgules pour chaque ligne.
Enfin, nous calculons le nombre total de tokens.

In [4]:
line_token_count = lines.map(lambda line: len(line.split(",")))
total_token_count = line_token_count.reduce(lambda a, b: a + b)
print(total_token_count)

202


Nous modifions légèrement le code et utilisons l'espace comme séparateur pour compter le nombre de tokens.

In [5]:
line_token_count = lines.map(lambda line: len(line.split(" ")))
total_token_count = line_token_count.reduce(lambda a, b: a + b)
print(total_token_count)

174


Jusqu'à présent, nous n'avons considéré qu'un seul fichier à la fois.

Que faire si nous voulons manipuler et analyser plusieurs fichiers CSV?

La méthode textFile() peut également gérer de tels cas.

In [7]:
sc = SparkContext.getOrCreate(SparkConf())
lines = sc.textFile("../../data/pl.csv")

line_length = lines.map(lambda line: len(line))
total_length = line_length.reduce(lambda a, b: a + b)
print(total_length)

1686


Notre prochain objectif est de collecter toutes les lignes stockées dans plusieurs fichiers et d'y accéder en utilisant une seule variable.

Ceci est possible en utilisant la méthode collect().

In [8]:
sc = SparkContext.getOrCreate(SparkConf())
lines = sc.textFile("../../data/pl.csv")

all_lines = lines.collect()
print(len(all_lines))
for line in all_lines:
    print(line)

101
languageLabel,year
ENIAC coding system,1943
ENIAC Short Code,1946
Von Neumann and Goldstine graphing system,1946
ARC Assembly,1947
Plankalkül,1948
CPC Coding scheme,1948
Curry notation system,1948
Short Code,1949
assembly language,1949
Short Code,1950
G-code,1950
Birkbeck Assembler,1950
Superplan,1951
ALGAE,1951
Intermediate Programming Language,1951
Regional Assembly Language,1951
Boehm unnamed coding system,1951
Klammerausdrücke,1951
OMNIBAC Symbolic Assembler,1951
Stanislaus,1951
Whirlwind assembler,1951
Rochester assembler,1951
Sort Merge Generator,1951
autocode,1952
A-0 System,1952
Editing Generator,1952
COMPOOL,1952
Speedcoding,1953
READ/PRINT,1953
Fortran,1954
ARITH-MATIC,1954
autocode,1954
Laning and Zierler system,1954
MATH-MATIC,1954
MATRIX MATH,1954
FLOW-MATIC,1955
PACT,1955
BACAIC,1955
Freiburger Code,1955
Sequentielle Formelübersetzung,1955
Internal Translator,1955
PRINT,1955
Information Processing Language,1956
FORTRAN for the IBM 704,1956
Fortran,1957
COMTRAN,1957
GE

Comme la fonction intégrée de Python map(), 
il est également possible de passer une fonction définie par l'utilisateur comme entrée de la fonction map()
de pyspark.

Dans l'exemple suivant, nous comptons le nombre d'occurrences d'un mot particulier dans tous les fichiers CSV.

Dans ce but, nous avons écrit une fonction count_Language() qui compte le nombre d'occurrences de "Language" dans une ligne. 


In [9]:
import re

sc = SparkContext.getOrCreate(SparkConf())
lines = sc.textFile("../../data/pl.csv")


def count_Language(line):
    return len(re.findall("Language", line))


line_length = lines.map(count_Language)
total_length = line_length.reduce(lambda a, b: a + b)
print(total_length)

4


**Question** Téléchargez 50 pages HTML. Écrivez un programme utilisant Spark pour compter le nombre total de fois où `<div>` ou `<div/>` est présent dans tous ces fichiers téléchargés.

In [11]:
import requests
import os
import re

def download_html_pages(urls, output_folder):
    # Créer le dossier de sortie s'il n'existe pas
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Télécharger les pages HTML
    for i, url in enumerate(urls):
        try:
            response = requests.get(url)
            # Vérifier si la requête a réussi
            if response.status_code == 200:
                # Enregistrer la page HTML dans un fichier
                with open(os.path.join(output_folder, f"page_{i+1}.html"), 'wb') as f:
                    f.write(response.content)
                print(f"Page {i+1} téléchargée avec succès.")
            else:
                print(f"Échec du téléchargement de la page {i+1}. Status code: {response.status_code}")
        except Exception as e:
            print(f"Une erreur est survenue lors du téléchargement de la page {i+1}: {e}")


# Liste d'URLs à télécharger
urls = [
    "https://www.google.com/",
    "https://www.facebook.com",
    # Ajoutez d'autres URLs ici...
]

# Dossier de sortie pour enregistrer les pages HTML
output_folder = "html_pages"

# Appel de la fonction pour télécharger les pages HTML
download_html_pages(urls, output_folder)


sc = SparkContext.getOrCreate(SparkConf())

#get all images
def get_all_html(source="html_pages"):
    htmls = []
    for filename in os.listdir(source):
        if filename.endswith(".html"):
            htmls.append(filename)
    return htmls


def count_Language(line):
    return len(re.findall("<div>", line))


htmls = get_all_html()
for html in htmls :
    lines = sc.textFile("html_pages/"+html)
    

line_length = lines.map(count_Language)
total_length = line_length.reduce(lambda a, b: a + b)
print(total_length)

Page 1 téléchargée avec succès.
Page 2 téléchargée avec succès.
4


## Exercice 2 \[★★\]

Notre prochain objectif est de travailler avec des fichiers JSON et de créer des dataframes pandas.

Cependant, nous aimerions utiliser le support des dataframes fourni par Spark.

Nous créons d'abord une session Spark.

In [12]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Nous lisons un fichier JSON et le convertissons en dataframe pandas.

Ce dataframe pandas est ensuite converti en dataframe Spark.

In [13]:
from pandas import json_normalize
import pandas as pd
import json

data = json.load(open("../../data/pl.json"))
dataframe = json_normalize(data)
dataframe = dataframe.astype(dtype={"languageLabel": "<U200", "year": "int64"})

df = spark.createDataFrame(dataframe)

Nous affichons maintenant le dataframe Spark.

In [14]:
df.show()

+--------------------+----+
|       languageLabel|year|
+--------------------+----+
| ENIAC coding system|1943|
|    ENIAC Short Code|1946|
|Von Neumann and G...|1946|
|        ARC Assembly|1947|
|         PlankalkÃ¼l|1948|
|   CPC Coding scheme|1948|
|Curry notation sy...|1948|
|          Short Code|1949|
|   assembly language|1949|
|          Short Code|1950|
|  Birkbeck Assembler|1950|
|              G-code|1950|
|           Superplan|1951|
|               ALGAE|1951|
|Intermediate Prog...|1951|
|Regional Assembly...|1951|
|Boehm unnamed cod...|1951|
|   KlammerausdrÃ¼cke|1951|
|OMNIBAC Symbolic ...|1951|
|          Stanislaus|1951|
+--------------------+----+
only showing top 20 rows



Il est également possible de spécifier le nombre de lignes à afficher.

In [15]:
df.show(30)

+--------------------+----+
|       languageLabel|year|
+--------------------+----+
| ENIAC coding system|1943|
|    ENIAC Short Code|1946|
|Von Neumann and G...|1946|
|        ARC Assembly|1947|
|         PlankalkÃ¼l|1948|
|   CPC Coding scheme|1948|
|Curry notation sy...|1948|
|          Short Code|1949|
|   assembly language|1949|
|          Short Code|1950|
|  Birkbeck Assembler|1950|
|              G-code|1950|
|           Superplan|1951|
|               ALGAE|1951|
|Intermediate Prog...|1951|
|Regional Assembly...|1951|
|Boehm unnamed cod...|1951|
|   KlammerausdrÃ¼cke|1951|
|OMNIBAC Symbolic ...|1951|
|          Stanislaus|1951|
| Whirlwind assembler|1951|
| Rochester assembler|1951|
|Sort Merge Generator|1951|
|   Editing Generator|1952|
|             COMPOOL|1952|
|            autocode|1952|
|          A-0 System|1952|
|          READ/PRINT|1953|
|         Speedcoding|1953|
|             Fortran|1954|
+--------------------+----+
only showing top 30 rows



Le code suivant affichera le schéma du dataframe.

In [16]:
df.printSchema()

root
 |-- languageLabel: string (nullable = true)
 |-- year: long (nullable = true)



Dans le code suivant, nous écrivons une fonction filtre pour obtenir les langages de programmation
qui ont été publiés en 1952.

In [17]:
def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.year == 1952]


df.mapInPandas(pandas_filter_func, schema=df.schema).show()

Py4JJavaError: An error occurred while calling o202.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 13.0 failed 1 times, most recent failure: Lost task 0.0 in stage 13.0 (TID 28) (host.docker.internal executor driver): java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
	at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
	at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
	at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
	at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
	at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
	at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
	at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3314)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3314)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3537)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
	at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
	at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
	at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
	at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133)
	at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140)
	at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30)
	at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)


Dans le code suivant, nous appliquons une fonction définie par l'utilisateur sur la colonne année pour obtenir le siècle où un langage de programmation a été publié pour la première fois.

In [None]:
from pyspark.sql.functions import pandas_udf


@pandas_udf("long")
def century(series: pd.Series) -> pd.Series:
    return (series / 100) + 1


df.select(century(df.year)).show()

Comme pour pandas, il est possible de regrouper les données par valeurs de colonnes spécifiques.

Dans l'exemple suivant, nous voulons obtenir le nombre de langages de programmation publiés pour la première fois chaque année.

In [None]:
df.groupby("year").count().show()

Nous avons jusqu'à présent utilisé des fichiers CSV et JSON.

Mais pour une performance optimisée, les fichiers ORC et parquet sont suggérés.

Dans les exemples suivants, nous voyons comment nous pouvons écrire des fichiers ORC et parquet en utilisant un dataframe Spark.

In [None]:
df.write.orc("languages.orc")
spark.read.orc("languages.orc").show()

In [None]:
df.write.parquet("languages.parquet")
spark.read.parquet("languages.parquet").show()

In [None]:
df.write.csv("languages.csv")
spark.read.csv("languages.csv").show()

Avant de continuer, vérifiez votre répertoire actuel et voyez comment ces dataframes ont été écrits.

**Question** Écrire une requête sur Wikidata pour télécharger les noms de tous les logiciels ainsi que leur première date de sortie. Écrivez un programme utilisant Spark pour obtenir le nombre de logiciels sortis chaque année.

## Exercice 3 \[★★\]

Il est possible de travailler en utilisant le langage SQL sur des dataframes spark.

Pour cela, nous allons créer des vues temporaires et exécuter des requêtes SQL.

L'exemple suivant va retourner et afficher toutes les langues.

In [None]:
df.createOrReplaceTempView("languages")
spark.sql("SELECT * from languages").show()

L'exemple suivant utilise une requête SQL pour obtenir le nombre de langages de programmation.

In [None]:
spark.sql("SELECT count(*) from languages").show()

La requête suivante renvoie le nombre de langues publiées pour la première fois en 1952.

In [None]:
spark.sql("SELECT count(*) from languages where year=1952").show()

La requête suivante est similaire à l'exemple groupby vu précédemment.

In [None]:
spark.sql("SELECT year, count(*) from languages Group by year ORDER by year").show(100)

**Question** À la place des fonctions d'agrégation de Spark, utilisez SQL et Spark pour obtenir le nombre de logiciels sortis chaque année en utilisant les données téléchargées dans l'exercice précédent.

Pour plus d'exemples, 
consultez la [documentation officielle](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart.html)