# Chargement de données dans RDD, DataFrame

Par défaut, Spark ne remplacera pas le dossier de sortie. Pour `RDDs`, on peut passer un paramètre de configuration pour modifier que` .config ("spark.hadoop.validateOutputSpecs", "false") `pour` DataFrame`s nous définirons explicitement le mode d'écriture.

Tout d'abord, initialisez la session Spark et le contexte Spark:

In [1]:
import pyspark
try:
    sc
except NameError:    
    spark = pyspark.sql.SparkSession.builder.master("local[*]").appName("hadoop course").config("spark.hadoop.validateOutputSpecs", "false").getOrCreate()
    sc = spark.sparkContext

## Charger des données non structurées

L'une des premières choses que nous devons apprendre est de savoir comment lire les données dans les RDD et les dataframes Spark. Spark fournit une API  pour lire des données structurées dans la plupart des formats de données (CSV, JSON, ...etc) ainsi que des données non structurées (fichiers texte brut, journaux de serveur, etc.).

### Lecture de texte brut

`textFile` et` wholeTextFiles` sont des fonctions à lire le texte brut non structuré.

1. `textFile` lit les données ligne par ligne créant un RDD où chaque entrée correspond à une ligne (un peu comme readlines () en Python)
1. `wholeTextFiles` lit le fichier entier dans une paire RDD: (chemin du fichier, contexte du fichier entier sous forme de chaîne)


Le code suivant démontre cela sur un exemple du nombre de mots.

### 1. textFile

In [2]:
#from pyspark import SparkContext
import sys
import time
import os

inputfile="file:///home/mbds/spark/tpSpark/2_LoadingData/data/unstructured/"
outputdir="file:///home/mbds/spark/tpSpark/2_LoadingData/output/"
start = time.time()
input_rdd = sc.textFile(inputfile)
counts = input_rdd.flatMap(lambda line: line.split()). \
                                                    map(lambda word: (word, 1)). \
                                                    reduceByKey(lambda a, b: a + b)
print ("\n Prendre les 10 mots les plus fréquents dans le texte et les fréquences correspondantes")
print (counts.takeOrdered(10, key=lambda x: -x[1]))
counts.map(lambda x: (x[1],x[0])).sortByKey(0).map(lambda x: (x[1],x[0]))\
        .repartition(1).saveAsTextFile(outputdir+"output_loadunstructured1/")
end = time.time()
print ("Elapsed time: ", (end-start))


 Prendre les 10 mots les plus fréquents dans le texte et les fréquences correspondantes
[('the', 22635), ('of', 11167), ('and', 11086), ('to', 10707), ('a', 10433), ('I', 10183), ('in', 7006), ('that', 6911), ('was', 6779), ('his', 4955)]
Elapsed time:  10.182549715042114


### 2.wholeTextFiles

In [22]:
input_pair_rdd = sc.wholeTextFiles(inputfile)
counts = input_pair_rdd.map(lambda line: line[0].split())
counts.collect()

[['file:/home/mbds/spark/tpSpark/2_LoadingData/data/unstructured/retn.txt'],
 ['file:/home/mbds/spark/tpSpark/2_LoadingData/data/unstructured/case.txt'],
 ['file:/home/mbds/spark/tpSpark/2_LoadingData/data/unstructured/advs.txt'],
 ['file:/home/mbds/spark/tpSpark/2_LoadingData/data/unstructured/mems.txt'],
 ['file:/home/mbds/spark/tpSpark/2_LoadingData/data/unstructured/lstb.txt']]

In [5]:
input_pair_rdd = sc.wholeTextFiles(inputfile)
counts = input_pair_rdd.map(lambda line: line[1].split())
counts.take(1)[0][0]

'THE'

In [6]:
start = time.time()
input_rdd = sc.wholeTextFiles(inputfile)
counts = input_rdd.flatMap(lambda line: line[1].split()). \
                                                    map(lambda word: (word, 1)). \
                                                    reduceByKey(lambda a, b: a + b)
print ("\n Prendre les 10 mots les plus fréquents dans le texte et les fréquences correspondantes")
print (counts.takeOrdered(10, key=lambda x: -x[1]))
counts.map(lambda x: (x[1],x[0])).sortByKey(0).map(lambda x: (x[1],x[0]))\
        .repartition(1).saveAsTextFile(outputdir+"output_loadunstructured1/")
end = time.time()
print ("Elapsed time: ", (end-start))


 Prendre les 10 mots les plus fréquents dans le texte et les fréquences correspondantes
[('the', 22635), ('of', 11167), ('and', 11086), ('to', 10707), ('a', 10433), ('I', 10183), ('in', 7006), ('that', 6911), ('was', 6779), ('his', 4955)]
Elapsed time:  2.1484148502349854


## Chargement du CSV

Ensuite, nous allons apprendre à charger des données dans un format structuré comme CSV. Il existe au moins deux façons de procéder:

1. Lisez les fichiers ligne par ligne avec la méthode `textFiles ()`, divisée en délimiteur (non recommandé). Cela produira un RDD qui est une structure de données optimisée pour l'analyse orientée lignes et des primitives fonctionnelles telles que `map` et` filter`
1. Lisez les fichiers CSV en utilisant le `DataFrameReader` intégré (recommandé). Cela produira un dataframe, qui est une structure de données optimisée pour l'analyse orientée colonnes et les primitives relationnelles

In [7]:
import csv
import sys
import os
try:
    from StringIO import StringIO
except ImportError:
    from io import StringIO

#this one is use when you use textFile
def loadRecord(line,header,delimiter):
    """Parse a CSV line"""
    input = StringIO(line)
    reader = csv.DictReader(input, delimiter=delimiter, fieldnames=header)
    return next(reader)

delimiter = "|"
inputfile="file:///home/mbds/spark/tpSpark/2_LoadingData"
outputdir="file:///home/mbds/spark/tpSpark/2_LoadingData/output"

input = sc.textFile(inputfile+"/data/csv/person_nodes.csv")
header = input.first().split(delimiter)
data = input.filter(lambda x: header[0] not in x).map(lambda x: loadRecord(x,header,delimiter))
data.repartition(1).saveAsTextFile(outputdir+"/output_csv/")

In [8]:
input_df = spark.read.options(header='true', inferschema='true',delimiter=delimiter).csv(inputfile+"/data/csv/person_nodes.csv")
input_df.write.mode("overwrite").option("header", "true").csv(outputdir+"/output_csv2/")

### Exemple: analyse DB de diamants

De la même manière qu'avant, nous allons lire le fichier CSV dans dataframe.
Spark DataFrameReader peut gérer les délimiteurs et peut éventuellement ignorer la ligne d'en-tête pour les fichiers CSV.

In [9]:
# Read csv data as DataFrame using spark csv dataframe reader
diamonds = spark.read.options(header='true', inferSchema='true').csv(inputfile+'/data/csv/diamonds.csv')

In [10]:
diamonds.show(10)

+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|_c0|carat|      cut|color|clarity|depth|table|price|   x|   y|   z|
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|  1| 0.23|    Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
|  2| 0.21|  Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
|  3| 0.23|     Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
|  4| 0.29|  Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
|  5| 0.31|     Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
|  6| 0.24|Very Good|    J|   VVS2| 62.8| 57.0|  336|3.94|3.96|2.48|
|  7| 0.24|Very Good|    I|   VVS1| 62.3| 57.0|  336|3.95|3.98|2.47|
|  8| 0.26|Very Good|    H|    SI1| 61.9| 55.0|  337|4.07|4.11|2.53|
|  9| 0.22|     Fair|    E|    VS2| 65.1| 61.0|  337|3.87|3.78|2.49|
| 10| 0.23|Very Good|    H|    VS1| 59.4| 61.0|  338| 4.0|4.05|2.39|
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
only showing top 10 rows



In [11]:
diamonds.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- carat: double (nullable = true)
 |-- cut: string (nullable = true)
 |-- color: string (nullable = true)
 |-- clarity: string (nullable = true)
 |-- depth: double (nullable = true)
 |-- table: double (nullable = true)
 |-- price: integer (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



recuperer combine de ligne

In [12]:
diamonds.count()

53940

In [13]:
diamonds.select('color').distinct().show()

+-----+
|color|
+-----+
|    F|
|    E|
|    D|
|    J|
|    G|
|    I|
|    H|
+-----+



Ensuite, essayons d'estimer un prix moyen par carat. Comme vous l'avez remarqué, la colonne de prix est un entier. Cela peut entraîner une perte de précision lors du calcul de la moyenne. Donc, tout d'abord, nous allons convertir cette colonne en type double:

In [14]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import *

# Convert Price column to type DoubleType
diamondsdf = diamonds.withColumn("price", diamonds["price"].cast(DoubleType()))
diamondsdf.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- carat: double (nullable = true)
 |-- cut: string (nullable = true)
 |-- color: string (nullable = true)
 |-- clarity: string (nullable = true)
 |-- depth: double (nullable = true)
 |-- table: double (nullable = true)
 |-- price: double (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



Nous utiliserons la "fonction groupby-aggregate" pour calculer la moyenne. Cela crée une colonne avec le nom par défaut "avg (prix)" que nous renommons en quelque chose de plus facile à taper. Enfin, nous classons la sortie par prix dans l'ordre décroissant:

In [15]:
# Calculate average price per carat value
carat_avgPrice = (diamondsdf
                  .groupBy("carat")
                  .avg("price")
                  .withColumnRenamed("avg(price)", "avgPrice")
                  .orderBy(desc("avgPrice")))

# View top10 highest carat value
carat_avgPrice.show(10)

+-----+------------------+
|carat|          avgPrice|
+-----+------------------+
| 3.51|           18701.0|
| 2.67|           18686.0|
|  4.5|           18531.0|
| 5.01|           18018.0|
| 2.57|17841.666666666668|
|  2.6|           17535.0|
| 2.64|           17407.0|
| 4.13|           17329.0|
| 2.39|17182.428571428572|
| 2.71|           17146.0|
+-----+------------------+
only showing top 10 rows



### (approche non recommandée) Analyse des fichiers CSV en Python en tant que RDD 

En principe, on peut également utiliser `RDD` pour analyser des données structurées, mais cela semble moins utiles, surtout si la logique de votre analyse peut être exprimée à l'aide des fonctions relationnelles de  SQL.

Nous allons maintenant convertir nos diamants DataFrame en RDD:

In [16]:
# We can convert the DataFrame directly into an RDD
diamonds_rdd = diamonds.rdd

In [46]:
# View first 3 rows of the diamonds RDD
diamonds_rdd.take(3)

[Row(_c0=1, carat=0.23, cut='Ideal', color='E', clarity='SI2', depth=61.5, table=55.0, price=326, x=3.95, y=3.98, z=2.43),
 Row(_c0=2, carat=0.21, cut='Premium', color='E', clarity='SI1', depth=59.8, table=61.0, price=326, x=3.89, y=3.84, z=2.31),
 Row(_c0=3, carat=0.23, cut='Good', color='E', clarity='VS1', depth=56.9, table=65.0, price=327, x=4.05, y=4.07, z=2.31)]

Vous pouvez maintenant utiliser les opérations RDD pour analyser les données:

In [17]:
# Diamond counts by cuts
countByGroup = diamonds_rdd.map(lambda x: (x.cut, 1)).reduceByKey(lambda x,y: x+y)
print (countByGroup.take(3))

[('Ideal', 21551), ('Premium', 13791), ('Good', 4906)]


In [18]:
# Distinct diamond clarities in dataset
distinctClarity = diamonds_rdd.map(lambda x: x.clarity).distinct()
print (distinctClarity.collect())

['SI2', 'SI1', 'VS1', 'VS2', 'VVS2', 'VVS1', 'I1', 'IF']


In [19]:
# Average price per diamond cut
avgPrice = diamonds_rdd.map(lambda x: (x.cut, float(x.price))).reduceByKey(lambda x,y: (x+y)/2)
print (avgPrice.collect())

[('Ideal', 2756.7240663718817), ('Premium', 2756.654813661215), ('Good', 2755.647409027791), ('Very Good', 2756.7183661747795), ('Fair', 2743.567771968392)]


# Exercice: charger un fichier CSV et l'analyser

Utilisez ce que vous avez appris pour charger un ensemble de jeux de données `CSV`. Ouvrez ** load_csv_exercise.ipynb ** et suivez le devoir qui y figure.

1. Acteur
1. Film
1. Acteur jouant dans un film (relations)

et trouvez des films dans lesquels ** Tom Hanks ** a joué.

Enregistrez la réponse au format «JSON».

# Chargement de JSON

Le meilleur et le seul moyen raisonnable de charger des fichiers JSON consiste à utiliser Spark DataFrameReader.
Spark SQL prend en charge la lecture de fichiers JSON contenant un objet JSON distinct et autonome par ligne.

** Remarque: les fichiers JSON multilignes ne sont actuellement pas compatibles avec Spark SQL. **

In [31]:
testJsonData = spark.read.json(inputfile+"/data/json/test.json")

In [32]:
testJsonData.printSchema()

root
 |-- array: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- dict: struct (nullable = true)
 |    |-- extra_key: string (nullable = true)
 |    |-- key: string (nullable = true)
 |-- int: long (nullable = true)
 |-- string: string (nullable = true)



In [23]:
testJsonData.show()

+---------+--------------------+---+-------+
|    array|                dict|int| string|
+---------+--------------------+---+-------+
|[1, 2, 3]|          [, value1]|  1|string1|
|[2, 4, 6]|          [, value2]|  2|string2|
|[3, 6, 9]|[extra_value3, va...|  3|string3|
+---------+--------------------+---+-------+



Spark SQL peut déduire le schéma automatiquement à partir de vos données JSON. Pour afficher le schéma, utilisez `printSchema`.

Essayons maintenant de faire quelques requêtes de base pour mieux comprendre l'ensemble de données.

In [24]:
# Count number of rows in dataset
print (testJsonData.count())

3


Les données JSON peuvent contenir des structures de données imbriquées auxquelles on peut accéder par "."

In [25]:
testJsonData.select('dict.key').show()

+------+
|   key|
+------+
|value1|
|value2|
|value3|
+------+



We can also perform DataFrame operations such as filtering queries according to some criteria:

In [38]:
testJsonData.filter(testJsonData["int"] > 1).show()

+---------+--------------------+---+-------+
|    array|                dict|int| string|
+---------+--------------------+---+-------+
|[2, 4, 6]|       [null,value2]|  2|string2|
|[3, 6, 9]|[extra_value3,val...|  3|string3|
+---------+--------------------+---+-------+



### Analyse des fichiers JSON en Python avec SQL
Tout DataFrame, y compris ceux créés avec des données JSON, peut être inscrit en tant que table Spark SQL pour interroger avec SQL.

In [26]:
# Create a Spark SQL temp table
# Note that temp tables are not global across clusters and will not persist across cluster restarts
testJsonData.registerTempTable("test_json")

Nous pouvons exécuter toutes les requêtes SQL sur cette table avec Spark SQL:

In [27]:
spark.sql("SELECT * FROM test_json").show()

+---------+--------------------+---+-------+
|    array|                dict|int| string|
+---------+--------------------+---+-------+
|[1, 2, 3]|          [, value1]|  1|string1|
|[2, 4, 6]|          [, value2]|  2|string2|
|[3, 6, 9]|[extra_value3, va...|  3|string3|
+---------+--------------------+---+-------+



### Mini-exercise

Basculez vers le répertoire de travail du TP, ouvrez le fichier: ** load_json.ipynb **
et suivez les instructions en ligne.

In [33]:
input = spark.read.json(inputfile+"/data/json/test.json")