<center>
<a href="http://www.insa-toulouse.fr/" ><img src="http://www.math.univ-toulouse.fr/~besse/Wikistat/Images/logo-insa.jpg" style="float:left; max-width: 120px; display: inline" alt="INSA"/></a> 

<a href="http://wikistat.fr/" ><img src="http://www.math.univ-toulouse.fr/~besse/Wikistat/Images/wikistat.jpg" style="max-width: 250px; display: inline"  alt="Wikistat"/></a>

<a href="http://www.math.univ-toulouse.fr/" ><img src="http://www.math.univ-toulouse.fr/~besse/Wikistat/Images/logo_imt.jpg" style="float:right; max-width: 200px; display: inline" alt="IMT"/> </a>
</center>

# <a href="http://spark.apache.org/"><img src="http://spark.apache.org/images/spark-logo-trademark.png" style="max-width: 100px; display: inline" alt="Spark"/> </a> [pour Statistique et *Science des* grosses *Données*](https://github.com/wikistat/Intro-Python)

# La classe *DataFrame* de <a href="http://spark.apache.org/"><img src="http://spark.apache.org/images/spark-logo-trademark.png" style="max-width: 100px; display: inline" alt="Spark"/> </a> [SQL](http://spark.apache.org/sql/)

**Résumé**: Ce tutoriel introduit la classe *DataFrame* proposée par la librairie [*SparkSQL*](http://spark.apache.org/sql/). Cette classe deviendra un standard pour toutes les manipulations de données structurées à partir de la version 3.0 de *Spark*. 

## 1 Introduction
### 1.1 Lecture des données

Ce tutoriel s'inspire de ceux proposés par [J. A. Dianes](https://github.com/jadianes/spark-py-notebooks) pour l'utilisation des données du concours [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html) concernant près de 9M d'interactions dans un réseau. Elles sont décrites en détail [ici](http://kdd.ics.uci.edu/databases/kddcup99/kddcup.names). L'objectif est d'apprendre à détecter des intrusions dans un réseau à partir d'un ensemble de variables ou *features* déjà calculées sur chaque transaction ou ineraction avec le réseau.

Un sous-échantillon est chargé localement avant de créer la RDD.

In [None]:
DATA_PATH="" 

import urllib.request
# Download data
urllib.request.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz",DATA_PATH+"kddcup.data_10_percent.gz")
urllib.request.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.names",DATA_PATH+"kddcup.names")

Lis et créé la liste des colonne du fichiers

In [None]:
# Read col names
file_names = open(DATA_PATH+"kddcup.names","r").readlines()
col_names = [k.split(":")[0] for k in file_names[1:]]+["interactions"]

### 1. 2 *Spark SQL*

*Spark SQL* est un module de Spark permettant de manipuler des données structurées, contrairement au RDD. En effet Spark ne fait pas de différences entre un RDD consituté de listes ou de dictionnaires par exemple. Dans *Spark SQL* les données sont structurées selon un *schéma*. Grâce à cette structure plus d'optimisations peuvent être opérées sur les calculs.
Il existe plusieurs façons de manipuler ces données structurées, notamment directement avec des requêtes SQL ou avec l'API `dataset` (disponible via les APIs java et scala uniquement) ou encore avec les *DataFrames* disponibles par `pyspark`. C'est cette dernière option qui est détaillée dans ce calepin.

Le point d'entrée dans les fonctionnalités SQL en *Spark* est la classe *SQLContext*. 

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

## 2 Construire un *DataFrame*

Un *DataFrame* Spark est une collection de données distribuées et organisées en *colonnes* identifiées par des noms. C'est conceptuellement  équivalent à une table dans une base de données relationnelle, un *DataFrame* en R ou en Python-pandas. Cette classe peut être obtenue de sources ou types de données variés : *Hive*, *json*, *xml*, *parquet*, *cassandra*... Ces fonctionnalités ne sont pas toutes introduites mais elles constituent un atout évident pour justifier du développement de cet environnement. Nous allons voir dans cette partie comment créer une *DataFrame* à partir d'un RDD, d'un fichier `.csv` ou d'un *DataFrame* `pandas`. 


### 2.1 Depuis un RDD

Lecture du fichier dans un RDD. 

In [None]:
data_file = DATA_PATH+"kddcup.data_10_percent.gz"
# Creation d'un RDD de chaines de caractères.
string_rdd = sc.textFile(data_file).cache()
# Creation d'un RDD de Liste de string.
list_rdd = string_rdd.map(lambda l: l.split(","))

#### Construction des ` Row` (lignes, une à une)
La première opération consiste à construire le schéma des données. 

SparkSQL convertit en *DataFrame* un RDD composés d'objets `Row`.
Une `Row` est comparable à un dictionnaire. Cet objet est construit en passant une liste de (clef, valeur) comme [kwargs](http://deusyss.developpez.com/tutoriels/Python/args_kwargs/).  La clef définit le nom de colonne et le type (entier, flottant...) est déduit de la première ligne. Il est donc important qu'il n'y ait pas de données manquantes dans la première ligne du RDD.


In [None]:
from pyspark.sql import Row
row_rdd = list_rdd.map(lambda p: Row(
    duration=int(p[0]), 
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5]),
    interactions = p[-1],
    )
)

Une fois le RDD créé par lignes, le schéma est inféré puis enregistré.

In [None]:
df_rdd_1 = sqlContext.createDataFrame(row_rdd)
df_rdd_1.take(2)

#### Spécifier le schéma

Il est également possible de spécifier le schéma présent dans un RDD avant de le convertir en *DataFrame*
Le schéma est créé grâce à l'objet `StructType` composé de `StructField` qui décrit les champs de valeur rencontrés dans le RDD.


In [None]:
from pyspark.sql.types import *
fields = [StructField("duration", IntegerType(), True),
         StructField("protocol_type", StringType(), True),
          StructField("service", StringType(), True),
          StructField("flag", StringType(), True),
          StructField("src_bytes", IntegerType(), True),
          StructField("dst_bytes", IntegerType(), True),
          StructField("interactions", StringType(), True)]
schema = StructType(fields)

In [None]:
subList_rdd = list_rdd.map(lambda p: (int(p[0]), p[1], p[2], p[3], int(p[4]), int(p[5]), p[-1]))

In [None]:
df_rdd = sqlContext.createDataFrame(subList_rdd, schema)
df_rdd.take(2)

### 2.2 Depuis un fichier `.csv`

Les méthodes précédentes impliquent de connaitre la structure présente dans les RDD afin de spécifier le *schéma*.

Lorsque le fichier source est dans un certain format connu (*parquet*, *json*, *csv*), on peut utiliser la fonction *spark.read.load* afin d'inférer automatiquement ce format dans un *dataframe*.

Le fichier *kddcup.data_10_percent.gz* est organisé comme un fichier `.csv`. Le *dataframe* peut donc directement être lu à partir du fichier en intégrant la structure par colonne avec leur nom.

In [None]:
df_csv = spark.read.load(data_file, format="csv", sep=",", inferSchema="true", header="False")
#Specify columns names
df_csv=df_csv.toDF(*col_names)

In [None]:
df_csv.take(2)

### 2.3 Depuis un *DataFrame* de `pandas`


La conversion d'un *DataFrame* `pandas` vers un *DataFrame* `PysSpark` peut être effectuée grâce à la librairie *pyarrow* qui permet des transferts d'objets entre la *JVM* et *python*.

Pour l'utiliser il est necessaire de changer la configuration `spark.sql.execution.arrow.enabled` à `true`. Celle-ci est à `False`par défaut.

In [None]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

Lecture du fichier dans un *DataFrame pandas*.

In [None]:
import pandas as pd
pandas_df = pd.read_csv(DATA_PATH+"kddcup.data_10_percent.gz", sep=",", names=col_names)

In [None]:
df = sqlContext.createDataFrame(pandas_df)

In [None]:
df.take(2)

## 3 Requête SQL
### 3.1 Préalable

*SparkSQL* permet d'appliquer une requête SQL sur un objet et de retourner le resultats ce cette requête sous le format *DataFrame*

Pour cela, on enregistre dans un premier temps le *DataFrame* sous le format *SQL temporary view*. 

In [None]:
df_rdd.createOrReplaceTempView("interactions")

### 3.2 Exemple de requête
Des requêtes SQL peuvent ensuite être exécutées.

In [None]:
# Sélectionner les interactions "tcp" de plus de 1 s et sans transfert.
tcp_interactions = sqlContext.sql("""
    SELECT duration, dst_bytes FROM interactions WHERE protocol_type = 'tcp' AND duration > 1000 AND dst_bytes 
    = 0""")
tcp_interactions

Les résultats d'une requête SQL sont des *DataFrames*.

In [None]:
type(tcp_interactions)

In [None]:
# Sortie des durées avec les dst_bytes
tcp_interactions_out = tcp_interactions.rdd.map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes))
for ti_out in tcp_interactions_out.collect():
  print (ti_out)

Impression du schéma du *DataFrame*.

In [None]:
tcp_interactions.printSchema()

###  3.3  Autres requêtes SQL

SparkSQL inclut un langage pour la manipulation de données structurées. Il permet de combiner des méthodes de sélection, filtrage, regroupement... des données. 
 
## 4 Opérations sur un *DataFrame* 
### 4.1 Opérations élémentaires

`printSchema` permet d'afficher le type de chaque colonne.

In [None]:
df_rdd.printSchema()

*select* permet d'extraire les colonnes d'une dataframe

In [None]:
df_rdd.select("interactions").take(5)

In [None]:
df_rdd.select("interactions","duration").take(5)

`groupBy` est équivalent à la fonction `groupby` de pandas.

L'exemple ci-dessous compte le nombre d'interactions par type de protocole à l'aide de cette fonction. 

In [None]:
from time import time

t0 = time()
df_rdd.groupBy("protocol_type").count().show()
tt = time() - t0

print ("Requete executee en {} secondes".format(round(tt,3)))

`filter` permet de selectionner les lignes d'un *DataFrame* selon une condition fixée sur une colonne.

Pour compter les interactions de moins d'une seconde sans transfert de données et groupées par protocole, il suffit d'ajouter des filtres. 

In [None]:
t0 = time()
df_rdd.filter(df_rdd.duration>1000).filter(df_rdd.dst_bytes==0).groupBy("protocol_type").count().show()
tt = time() - t0

print ("Requete executee en {} secondes".format(round(tt,3)))

### 4.2 `map` et *fonction utilisateur* pour *DataFrame*

La fonction `map` n'est pas disponible sur les objets *DataFrame*. Une première façon de procéder consiste à convertir le *DataFrame* en objet RDD, pour appliquer la fonction `map`.

In [None]:
df_rdd.rdd.map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes)).take(5)

Une autre façon consiste à convertir la fonction `lambda` en une *user defined function*. 

In [None]:
from pyspark.sql.functions import udf

function = udf(lambda x,y: "Duration: {}, Dest. bytes: {}".format(x,y))

Ce format permet ensuite d'appliquer cette fonction à la colonne ciblée du *DataFrame*.

In [None]:
output_dataframe = df_rdd.select(function("duration","dst_bytes").alias("string_output"))
output_dataframe.take(3)[0]

### 4.3 Creation d'une colonne

La section précedente montre comment appliquer une fonction `udf` sur un *DataFrame*. Le résultat de cette fonction est un *DataFrame* d'une seule colonne. Il est possible d'ajouter cette colonne au *DataFrame* existant grâce à la fonction `withColumn`.


Cette possibiité est illustrée, en ajoutant une colonne `label` au *DataFrame* `df`. Label ou variable qualitative avec deux modalités: `attack` et `normal`. 

Voici, dans un premier temps la fonction permettant de catégoriser chaque typr d'interaction.

In [None]:
def attack_or_normal_func(s):
    return "normal" if s == "normal." else "attack"

Cette fonction est convertie en *user defined function*.

In [None]:
attack_or_normal = udf(attack_or_normal_func)

Création de la nouvelle colonne `label` a partir du resultat de cette fonction à l'aide de la fonction `withColumn`.

In [None]:
df_with_label = df.withColumn("label", attack_or_normal(df.interactions) )

Vérification que la colonne "label" a bien été créée. Si le nom de colonne est déjà présent, celle-ci est remplacée.

In [None]:
df_with_label.printSchema()

Dénombrement du nombre d'attaques et d'interactions normales.

In [None]:
t0 = time()
df_with_label.select("label").groupBy("label").count().show()
tt = time() - t0

print ("Requete executee en {} secondes".format(round(tt,3)))

Dénombrement par label et type de protocole pour souligner le pouvoir discriminant de cette variable.

In [None]:
t0 = time()
df_with_label.select("label", "protocol_type").groupBy("label", "protocol_type").count().show()
tt = time() - t0

print ("Requete executee en {} secondes".format(round(tt,3)))

**Q** Que dire du protocole `udp`?

Ajouter la prise en compte du transfert de données à partir de la cible.

In [None]:
t0 = time()
df_with_label.select("label", "protocol_type", "dst_bytes").groupBy("label", "protocol_type", df_with_label.dst_bytes==0).count().show()
tt = time() - t0

print ("Requete executee en {} secondes".format(round(tt,3)))

Consulter les très nombreuses autres fonctionnalités, présentes ou à venir (version 2.3) dans la [documentation en ligne](http://spark.apache.org/docs/latest/sql-programming-guide.html).

### 4.4 Fonction Pandas `udf`

Une fonction `pandas udf` est simialire à une `udf` définie précédemment. Mais elle permet de prendre en entrée une colonne d'un *DataFrame* qui sera traitée comme une *Séries* de *pandas*.

Cette nouvelle classe de fonction permet une amélioration significative des [performances](https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html).

Il existe deux types de `pandas udf`: `Scalar` et `Grouped Map`.

#### `Scalar`

Les *Scalar Pandas UDFs* sont utilisées pour vectoriser efficactement des opérations scalaires. Cette fonction prend en compte une *Series* de *pandas* comme argument et retourne une autre *Series* de *pandas* de la même taille tout en bénificiant des fonctions natives de *pandas*. 

Illustration avec la fonction `cumsum` de *pandas*.

In [None]:
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, PandasUDFType
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def cum_sum(x):
    return x.cumsum()

cum_sum_udf = pandas_udf(cum_sum, returnType=IntegerType())

In [None]:
cum_sum_duration = df.select(cum_sum_udf(col("duration")))

In [None]:
cum_sum_duration.take(1000)[-10:]

### Fonction *Grouped Map*

Les fonctions *Grouped map Pandas UDFs* sont à utiliser avec la fonction `groupBy().apply()` permettant d'appliquer le pattern `split-apply-combine`. Ceci séalise en trois étapes:

 * *Split* des données en groupes à l'aide de `DataFrame.groupBy`.
 * *Apply* de la fonction sur chaque groupe. Les entrées et les sorties de la fonction sont des *DataFrames* de *pandas*. Les entrées de cette étape contiennent toutes les lignes et colonnes de chaque groupe. 
 * *Combine* les résultats dans un nouveau *DataFrame*.

Afin d'utiliser `groupBy().apply()`, il faut préalablement définir:

* Une fonction python qui précise le calcul à exécuter sur chaque groupe.
* Un objet `StructType`  ou une `string` qui définit le schéma du *DataFrame* de sortie.

L'exemple suivant montre comment centrer la variable `duration` de chacun des groupes: `attack` et `normal`.

In [None]:
@pandas_udf("label string, duration int", PandasUDFType.GROUPED_MAP)
def substract_mean(pdf):
    # pdf is a pandas.DataFrame
    duration = pdf.duration
    return pdf.assign(duration=duration - duration.mean())

df_with_label.select("label","duration").groupby("label").apply(substract_mean).show()