# TP SPARK - EPSI

#### Lien utiles

Guide Développement :
https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds

Spark Context :
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.html?highlight=sparkcontext#pyspark.SparkContext

Resilient Distributed Dataset (RDD) :
https://spark.apache.org/docs/latest/rdd-programming-guide.html


In [1]:
#!pip install pyspark

# Initialisation PySpark


In [2]:
from pyspark import SparkConf, SparkContext
import collections

print('Spark Conf :', SparkConf.__doc__)
print('Spark Context v %s :'%SparkContext.version, SparkContext.__doc__)

Spark Conf : 
    Configuration for a Spark application. Used to set various Spark
    parameters as key-value pairs.

    Most of the time, you would create a SparkConf object with
    ``SparkConf()``, which will load values from `spark.*` Java system
    properties as well. In this case, any parameters you set directly on
    the :class:`SparkConf` object take priority over system properties.

    For unit tests, you can also call ``SparkConf(false)`` to skip
    loading external settings and get the same configuration no matter
    what the system properties are.

    All setter methods in this class support chaining. For example,
    you can write ``conf.setMaster("local").setAppName("My app")``.

    Parameters
    ----------
    loadDefaults : bool
        whether to load values from Java system properties (True by default)
    _jvm : class:`py4j.java_gateway.JVMView`
        internal parameter used to pass a handle to the
        Java VM; does not need to be set by users
    _jc

In [3]:
#local[N] = Pas de cluster Manager. N Nombre de processeur. Si non défini tous les processeurs. Si juste local, un processeur
# Une nouvelle exécution nécessite un redémarrage du kernel Jupyter.
conf = SparkConf().setMaster("local[2]").setAppName("EPSI-TP1")
sc = SparkContext(conf = conf)
print(sc.applicationId,sc.version,'\n---------------\n',sc.getConf().getAll())

local-1741073154695 3.5.5 
---------------
 [('spark.driver.host', '25a080625eb3'), ('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false'), ('spark.app.name', 'EP

## Exercice 1 :  Tableau de nombre.

#### 1.1 Créer un RDD avec 1000 chiffres. Afficher 5 exemples pris au hasard.

In [4]:
rdd = sc.parallelize(range(1000)) #RDD = Données Distribuées Résilientes. Stockage des chiffres de 0 à 999 avec range.
rdd.takeSample(False, 5) # 5 valeurs au hasard. False car sans remplacement.

[97, 880, 566, 161, 856]

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Exercice 2 : WordCount.

#### 2.1 Charger le fichier data/montaigne.txt ligne par ligne. Afficher 5 examples pris au hasard.

In [None]:
text_file = sc.textFile("/content/drive/MyDrive/Colab Notebooks/Montaigne.txt")
text_file.takeSample(False, 5)

#### 2.2 Segmenter chaque ligne en mots. Afficher 2 exemples au hasard.

In [None]:
words = text_file.flatMap(lambda line: line.split())
words.takeSample(False, 2)

#### 2.3 Segmenter chaque ligne en mots en rendant chaque mot indépendant. Afficher 5 exemples au hasard.

In [None]:
words = text_file.flatMap(lambda line: line.split())
words.takeSample(False, 5)

#### 2.4 Afficher le nombre total de mot, et la taille du vocabulaire (nombre de mots distinct)

In [None]:
total_words = words.count()
vocabulary_size = words.distinct().count()

print(f"Total number of words: {total_words}")
print(f"Vocabulary size (number of distinct words): {vocabulary_size}")

#### 2.5 Transformer chaque mot w en une clé-valeur (w,1). Afficher 5 exemples de clé-valeur

In [None]:
word_counts = words.map(lambda word: (word, 1))
word_counts.take(5)

#### 2.6 Sommer  par clé . Afficher 5 valeurs.

In [None]:
word_counts = word_counts.reduceByKey(lambda a, b: a + b)
word_counts.take(5)

#### 2.7 Ne conserver que les mots de plus de 6 caractères apparaissant au moins 3 fois

In [None]:
filtered_word_counts = word_counts.filter(lambda x: len(x[0]) > 6 and x[1] >= 3)
filtered_word_counts.take(10)

####  2.8 Récupérer le résultat des actions précédentes. Afficher le type de la valeur renvoyée

In [None]:
result = filtered_word_counts.take(10)

print("Result:", result)
print("Type of result:", type(result))

result2 = word_counts.take(5)
print("Result:", result2)
print("Type of result:", type(result2))

#### 2.9 Trier et afficher les mots trouvés.

In [None]:
sorted_word_counts = filtered_word_counts.sortBy(lambda x: (-x[1], x[0]))

sorted_results = sorted_word_counts.collect()

for word, count in sorted_results:
    print(f"{word}: {count}")

## Exercice 3 optionnel : Analyse numériques

#### 3.1 Charger dans Spark le fichier Titanic.csv

#### 3.2 Creer une fonction python qui transforme une ligne en une liste de variables avec le type correct. tester sur une ligne.

#### 3.3 Charger les segmentations dans un RDD. Afficher les trois premiers

#### 3.4 Creér un RDD composé de la classe (élément 2)  et de la survie (élement 1). Afficher les 10 premiers.

#### 3.5 En utilisant la fonction mapValues, créer une liste avec la classe du passager et un doublet (survie,1). Afficher les 10 premiers.
Exemple pour les deux premiers passagers : <br>
(3, (0, 1)), <br>
(1, (1, 1)

#### 3.6. Pour chaque classe afficher le nombre de survivant et le nombre total.

#### 3.7 afficher le taux de survie par Classe, trié de 1 à 3

#### 3.8 Créer un rdd des survivants

#### 3.9 : Trouver le survivant le plus agé