<a href="https://colab.research.google.com/github/crystalloide/Spark/blob/master/Big_Data_and_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Big Data

Le Big Data n'est que de la donnée, mais en quantité ?

Le Big Data peut être décrit à l'aide des «quatre V» (on évoque même 5 ou 7 V). 

Les quatre premiers V sont les suivants :
1. Volume
2. Vitesse
3. Variété
4. Véracité

Les mégadonnées sont décrites comme étant volumineuses, en volume donc (quantité de données : de l'ordre de zétaoctets par exemple), comme arrivant à grande vitesse (données en streaming, par exemple, les données des capteurs ou des téraoctets d'informations commerciales), comme étant de différentes variétés (différentes formes de données, par exemple les vidéos et les tweets) et comme étant d'une véracité plus ou moins fiable, on parle alors d'incertitude de la valeur des données (par exemple, mauvaise qualité des données).

En raison de la charge de traitement des mégadonnées, il est nécessaire d'utiliser des outils spéciaux optimisés pour des calculs de cette taille.



# Clusters de Stockage et de Calcul

Un seul ordinateur ne sera pas en mesure de stocker seul l'ensemble des données à traiter. C'est ce qui est appelé un cluster de stockage : un groupe d'ordinateurs qui se répartissent les fragments de fichiers pour faire stocker la totalité des informations à traiter.

L'overhead du traitement des données Big Data  provient du fait qu'un seul ordinateur ne fera pas le travail de traitement des données «big data», mais il faudra faire fonctionner plusieurs ordinateurs ensemble. 
C'est ce qui est appelé un cluster de calcul : un groupe d'ordinateurs qui travaillent ensemble pour faire le travail attendu.

Comment gérer un ensemble d'ordinateurs pour travailler ensemble afin d' accomplir une tâche? 

Cette gestion du travail en cluster est en réalité difficile à gérer, du fait de la concurrence, de la communication interprocessus, de la planification des tâches, etc. A cela s'ajoute les problèmes "normaux" rencontrés sur des systèmes distribués comme les pannes d'ordinateur ou la latence du réseau, etc.

# Hadoop

Pour permettre ces usages liés au Big Data, ** Apache Hadoop ** a été mis au point et est constitué d'un framework, une collection d'outils, destinés à gérer les clusters et les utiliser.

- Yarn : gestion des tâches de calcul dans le cluster
- HDFS (Hadoop Distributed File System) : stockage des données réparties sur les nœuds du cluster (ordinateurs ou nodes)
- Spark : framework permettant de réaliser des calculs sur les données

# Débutons avec Spark
1. Télécharger [Spark (2.4.3)](https://spark.apache.org/) (ou la version pre-built la plus récente)
2. Positionner une variable d'environnement (avec un terminal). Si on utilise  Python 3, cela donne :
> export PYSPARK_PYTHON=python3
3. Positionner également le path : (chemin d'accès vers les binaires)
> export PATH=${PATH}:/home/you/spark-2.4.4-bin-hadoop2.7/bin
4. Si vous obtenez une  erreur 'Py4JJavaError', vous devrez installer Java ou OpenJDK version 8

Voilà comment configurer Spark.

De plus, Pour exécuter un programme spark, il suffit ensuite d'entrer la commande suivante en ligne de commande dans une terminal :

> spark-submit spark-program.py

# Notre 1er programme Spark

Il faut ensuite cliquer sur "Ajouter des données" en haut à droite et de choisir "Los Angeles Traffic Collision Dataset".

A noter : 
N'hésitez pas à utiliser un autre dataset lors de futures expériences. 
Il faut alors vérifiez le type de fichier et changer la méthode de lecture spark en conséquence.

In [0]:
import os
input_dir = '../input'
os.listdir(input_dir)
file = 'traffic-collision-data-from-2010-to-present.csv'
path = os.path.join(input_dir,file)
print(path)

In [0]:
!pip install pyspark

In [0]:
import sys
from pyspark.sql import SparkSession, functions, types
 
spark = SparkSession.builder.appName('example 1').getOrCreate()
spark.sparkContext.setLogLevel('WARN')

assert sys.version_info >= (3, 5) # make sure we have Python 3.5+
assert spark.version >= '2.3' # make sure we have Spark 2.3+

data = spark.read.csv(path, header=True,
                      inferSchema=True)
data.show()

Hum .. Comme cela n'est pas très joli, voyons ce que l'on peut faire.
Tout d'abord, explorons certaines méthodes avec un dataframe Spark.

In [0]:
# Regardons le schéma
data.printSchema()

In [0]:
# Sélectionnons quelques colonnes
data.select(data['Crime Code'], data['Victim Age']).show()

In [0]:
# Filtrons les données
data.filter(data['Victim Age'] < 40).select('Victim Age', 'Victim Sex').show()

In [0]:
# Ecrivons dans un fichier en format json
json_file = data.filter(data['Victim Age'] < 40).select('Victim Age', 'Victim Sex')
json_file.write.json('json_output', mode='overwrite')

Si vous vous attendiez à un simple fichier json, vous allez être surpris.

Vous obtenez plutôt un **répertoire** de plusieurs fichiers json. 

La concaténation de ces fichiers est la sortie réelle. 

Cela provient de la façon dont Spark calcule. 

Cela sera détaillé plus tard.

In [0]:
!ls json_output

In [0]:
# Encore quelques informations

# Opérons un calcul sur une colonne et renommons-là
data.select((data['Council Districts']/2).alias('CD_dividedBy2')).show()

# Renommons des colonnes 
data.withColumnRenamed('Victim Sex', 'Gender').select('Gender').show()

# Supprimons des colonnes et appliquons un format vertical plus approprié pour les 10 premiers
d = data.drop('Neighborhood Councils')
d.show(n=10, truncate=False, vertical=True)

# Partitionnement
Nous avons vu précédemment que la sortie du fichier json est obtenue avec un répertoire de plusieurs fichiers json. C'est parce que, comme nous l'avons vu également, les (méga)données sont trop volumineuses pour être traitées sur un seul ordinateur.

C'est pourquoi les outils fournis dans Apache Hadoop sont là pour : 
pouvoir travailler avec toutes les données en entrée
répartir les calculs sur plusieurs ordinateurs
mais aussi rassembler les résultats sous un seul résultat comme si les données avaient été traitées sur une seule machine. 
C'est pourquoi tous les dataframes Spark sont partitionnés de cette façon, et cela quelle que soit la taille des données.

Habituellement, vous indiqueriez un répertoire en entrée contenant vos fichiers de "données" où chaque thread / processus / noyau / exécuteur lirait individuellement un fichier d'entrée. 
Lors de la création de la sortie, chaque écriture est réalisée en parallèle, et lorsque chacun des fichiers de sortie est combiné, ils forment le résultat de sortie unique. C'est là que HDFS joue un rôle -en tant que système de fichiers partagé- pour que tout ce parallélisme fonctionne.

YARN est responsable de la gestion du calcul sur chaque ordinateur individuel lorsqu'il travaille effectivement au sein d'un cluster de nœuds. 
YARN gère les ressources CPU et mémoire. Très important quand on sait que le réseau est une ressource limitante, plutôt que de déplacer les données vers différents nœuds, YARN peut déplacer le travail de calcul là où se trouvent les données.

Remaruqe : sur une seule machine locale, nous pouvons utiliser simplement le système de fichiers local

# Conclusion

Ce TP est un 1er et bref aperçu d'un usage de Spark dans un contexte Big Data.