# 1. Introduction

Depuis la version 2.0 d'Apache Spark, la SparkSession a été créée afin d'unifier l'accès aux RDD, Dataframes et Datasets.

# 2. RDD (Resilient Distributed Datasets)

Un RDD est une collection distribuée d’enregistrements: Un RDD possède plusieurs caratérisques dont:

* Immuable: Une fois créée, un RDD reste inchangé. Pour obtenir une modification d’un RDD, il faut y appliquer une transformation, qui retournera un nouveau RDD,
* tolérance aux pannes : un RDD sait comment recréer et recalculer son ensemble de données grace au DAG (Directed Acyclic Graph).

Les RDD supportent deux types d’opérations :
* Transformation : c'est une opération qui retourne un nouveau RDD
    * exemple: `map`, `filter`, `flatMap`, `groupByKey`, `reduceByKey`,`aggregateByKey`.

* Action: Une action évalue et retourne une nouvelle valeur
    * exemple: `count`, `take`, `first`, `countByKey`, `collect`,`...`.
    
Les transformations sont paresseuses (lazy evaluations) car elles sont calculées seulement que lorsqu'une action est appliquée au RDD.

In [2]:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("AppName") 
                        .enableHiveSupport().getOrCreate

import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@539e0445


**Creation d'un RDD à partir d’une collection scala**

In [3]:
val langages = spark.sparkContext.parallelize(Seq("Java", "Scala", "R", "Python", "Ruby", "JavaScript"))
langages.foreach(println)

Java
Python
Ruby
R
JavaScript
Scala


langages: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:27


**Creation d'un RDD à partir d'une source de données**

In [4]:
val data = spark.sparkContext.textFile("../data/frostroad.txt")

data: org.apache.spark.rdd.RDD[String] = ../data/frostroad.txt MapPartitionsRDD[2] at textFile at <console>:27


**Quelques transformations et actions sur les RDD**

In [5]:
data.collect

res1: Array[String] = Array(Two roads diverged in a yellow wood,, And sorry I could not travel both, And be one traveler, long I stood, And looked down one as far as I could, To where it bent in the undergrowth;, "", Then took the other, as just as fair,, And having perhaps the better claim,, Because it was grassy and wanted wear;, Though as for that the passing there, Had worn them really about the same,, "", And both that morning equally lay, In leaves no step had trodden black., Oh, I kept the first for another day!, Yet knowing how way leads on to way,, I doubted if I should ever come back., "", I shall be telling this with a sigh, Somewhere ages and ages hence:, Two roads diverged in a wood, and I--, I took the one less traveled by,, And that has made all the difference.)


In [6]:
data.count

res2: Long = 23


In [7]:
for (sent <- data.take(2)) 
    println(sent)

Two roads diverged in a yellow wood,
And sorry I could not travel both


In [8]:
data.map(line => line.toUpperCase).take(3).foreach(println)

TWO ROADS DIVERGED IN A YELLOW WOOD,
AND SORRY I COULD NOT TRAVEL BOTH
AND BE ONE TRAVELER, LONG I STOOD


In [9]:
data.filter(line => line.startsWith("I")).foreach(println)

In leaves no step had trodden black.
I doubted if I should ever come back.
I shall be telling this with a sigh
I took the one less traveled by,


**Création d'un RDD à partir d'un autre**

In [10]:
val dataFiltre = data.map(line => line.toUpperCase).filter(line => line.startsWith("I"))

dataFiltre: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at filter at <console>:27


In [11]:
dataFiltre.foreach(println)

IN LEAVES NO STEP HAD TRODDEN BLACK.
I DOUBTED IF I SHOULD EVER COME BACK.
I SHALL BE TELLING THIS WITH A SIGH
I TOOK THE ONE LESS TRAVELED BY,


In [12]:
/** La  fonction toDebugString() qui fournit un String décrivant
    le DAG de transformations permettant d’obtenir ce RDD */

In [13]:
dataFiltre.toDebugString

res8: String =
(2) MapPartitionsRDD[6] at filter at <console>:27 []
 |  MapPartitionsRDD[5] at map at <console>:27 []
 |  ../data/frostroad.txt MapPartitionsRDD[2] at textFile at <console>:27 []
 |  ../data/frostroad.txt HadoopRDD[1] at textFile at <console>:27 []


# 3. DataFrame et DataSet

Depuis la version 2.0 de Spark, les deux API (DataFrame, DataSet ) sont unifiées en une seule.
* Sacla : un dataframe est un alias de Dataset
* En Java, on retrouve principalement des Dataset
* R et Python utilisement les dataframes car ces 2 langages ne sont pas compilés.

<img src="../images/databricks.png" style="float: left; margin-right: 10px;" />

## DataFrame

Un DataFrame est une collection distribuée de données organisées en colonne:
* posséde également les caratistiques d'un RDD (immuabilité, tolérance aux pannes, les types d'opérations, etc, ...). 
* dispose aussi d'un catalyseur pour optimiser les traitements.


**Creation d'un RDD à partir d’une collection scala**

In [14]:
import spark.implicits._

val simpleData = Seq(("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
  )

val dfm = simpleData.toDF("employee_name", "department", "salary")
dfm.show()

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+



import spark.implicits._
simpleData: Seq[(String, String, Int)] = List((James,Sales,3000), (Michael,Sales,4600), (Robert,Sales,4100), (Maria,Finance,3000), (James,Sales,3000), (Scott,Finance,3300), (Jen,Finance,3900), (Jeff,Marketing,3000), (Kumar,Marketing,2000), (Saif,Sales,4100))
dfm: org.apache.spark.sql.DataFrame = [employee_name: string, department: string ... 1 more field]


**Quelques transformations et actions sur les dataframe**

In [15]:
import org.apache.spark.sql.functions.upper
dfm.withColumn("employee_name", upper($"employee_name")).show(false)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|JAMES        |Sales     |3000  |
|MICHAEL      |Sales     |4600  |
|ROBERT       |Sales     |4100  |
|MARIA        |Finance   |3000  |
|JAMES        |Sales     |3000  |
|SCOTT        |Finance   |3300  |
|JEN          |Finance   |3900  |
|JEFF         |Marketing |3000  |
|KUMAR        |Marketing |2000  |
|SAIF         |Sales     |4100  |
+-------------+----------+------+



import org.apache.spark.sql.functions.upper


In [16]:
dfm.filter(dfm("department") === "Sales").show(false)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|James        |Sales     |3000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [17]:
dfm.describe("salary").show()

+-------+-----------------+
|summary|           salary|
+-------+-----------------+
|  count|               10|
|   mean|           3400.0|
| stddev|765.9416862050705|
|    min|             2000|
|    max|             4600|
+-------+-----------------+



**Création d'un RDD à partir d'un autre**

In [18]:
val dfm2 = dfm.withColumn("employee_name", upper($"employee_name"))
              .filter(dfm("department") === "Sales")

dfm2.show(false)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|JAMES        |Sales     |3000  |
|MICHAEL      |Sales     |4600  |
|ROBERT       |Sales     |4100  |
|JAMES        |Sales     |3000  |
|SAIF         |Sales     |4100  |
+-------------+----------+------+



dfm2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [employee_name: string, department: string ... 1 more field]


In [19]:
dfm2.explain(true)

== Parsed Logical Plan ==
Filter (department#11 = Sales)
+- Project [upper(employee_name#10) AS employee_name#130, department#11, salary#12]
   +- Project [_1#3 AS employee_name#10, _2#4 AS department#11, _3#5 AS salary#12]
      +- LocalRelation [_1#3, _2#4, _3#5]

== Analyzed Logical Plan ==
employee_name: string, department: string, salary: int
Filter (department#11 = Sales)
+- Project [upper(employee_name#10) AS employee_name#130, department#11, salary#12]
   +- Project [_1#3 AS employee_name#10, _2#4 AS department#11, _3#5 AS salary#12]
      +- LocalRelation [_1#3, _2#4, _3#5]

== Optimized Logical Plan ==
LocalRelation [employee_name#130, department#11, salary#12]

== Physical Plan ==
LocalTableScan [employee_name#130, department#11, salary#12]



**Creation d'un RDD à partir d'une source de données**

In [20]:
// Define custom schema
import spark.implicits._
import org.apache.spark.sql.types.{StructType, StringType}
val schema = new StructType()
      .add("brand", StringType, true)
      .add("name", StringType, true)
      .add("device", StringType, true)
      .add("model", StringType, true)
val dfm = spark.read.schema(schema).option("multiline", "true").json("../data/devices.json")
dfm.show(5)

+-----+----------+----------+--------------------+
|brand|      name|    device|               model|
+-----+----------+----------+--------------------+
|     |          |    AD681H|Smartfren Androma...|
|     |          |     FJL21|               FJL21|
|     |          |  hws7721g|  MediaPad 7 Youth 2|
|  1&1|  1&1 Puck|diw362_1u1|         DIW362P 1U1|
|  1&1|1&1 TV Box|diw387_1u1|          DIW387 1U1|
+-----+----------+----------+--------------------+
only showing top 5 rows



import spark.implicits._
import org.apache.spark.sql.types.{StructType, StringType}
schema: org.apache.spark.sql.types.StructType = StructType(StructField(brand,StringType,true), StructField(name,StringType,true), StructField(device,StringType,true), StructField(model,StringType,true))
dfm: org.apache.spark.sql.DataFrame = [brand: string, name: string ... 2 more fields]


## Dataset

Un dataset est une extension d'un dataframe:
* fortement typé, immuable collection d'objets qui sont associés à un schéma relationnel.
* dispose d'un encodeur pour gérer et valider les types des objets
* détecte les erreurs de syntaxe et d'analyse au moment de la compilation du code

**Creation d'un RDD à partir d’une collection scala**

In [21]:
val dataset = Seq("Java", "Python", "R", "Scala", "Ruby").toDS()
dataset.show()

+------+
| value|
+------+
|  Java|
|Python|
|     R|
| Scala|
|  Ruby|
+------+



dataset: org.apache.spark.sql.Dataset[String] = [value: string]


**Creation d'un dataset à partir d'un RDD**

In [22]:
val simpleData = Seq(("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
  )

val rdd = sc.parallelize(simpleData)
val ds = rdd.toDS()
ds.show()

+-------+---------+----+
|     _1|       _2|  _3|
+-------+---------+----+
|  James|    Sales|3000|
|Michael|    Sales|4600|
| Robert|    Sales|4100|
|  Maria|  Finance|3000|
|  James|    Sales|3000|
|  Scott|  Finance|3300|
|    Jen|  Finance|3900|
|   Jeff|Marketing|3000|
|  Kumar|Marketing|2000|
|   Saif|    Sales|4100|
+-------+---------+----+



simpleData: Seq[(String, String, Int)] = List((James,Sales,3000), (Michael,Sales,4600), (Robert,Sales,4100), (Maria,Finance,3000), (James,Sales,3000), (Scott,Finance,3300), (Jen,Finance,3900), (Jeff,Marketing,3000), (Kumar,Marketing,2000), (Saif,Sales,4100))
rdd: org.apache.spark.rdd.RDD[(String, String, Int)] = ParallelCollectionRDD[16] at parallelize at <console>:50
ds: org.apache.spark.sql.Dataset[(String, String, Int)] = [_1: string, _2: string ... 1 more field]


**Creation d'un dataset à partir d'un dataframe**

In [23]:
// Sans Case Classe

val rdd = sc.parallelize(Seq(("Java", 10), ("R", 5), ("Scala", 23), ("Python", 15)))
val df = rdd.toDF("Langage", "Note")

val ds = df.as[(String, Int)]
ds.show()

+-------+----+
|Langage|Note|
+-------+----+
|   Java|  10|
|      R|   5|
|  Scala|  23|
| Python|  15|
+-------+----+



rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[20] at parallelize at <console>:41
df: org.apache.spark.sql.DataFrame = [Langage: string, Note: int]
ds: org.apache.spark.sql.Dataset[(String, Int)] = [Langage: string, Note: int]


In [24]:
// Avec Case class

import spark.implicits._

case class Employee (employee_name: String, department: String, salary: Long)

val simpleData = Seq(Employee("James", "Sales", 3000),
    Employee("Michael", "Sales", 4600),
    Employee("Robert", "Sales", 4100),
    Employee("Maria", "Finance", 3000),
    Employee("James", "Sales", 3000),
    Employee("Scott", "Finance", 3300),
    Employee("Jen", "Finance", 3900),
    Employee("Jeff", "Marketing", 3000),
    Employee("Kumar", "Marketing", 2000),
    Employee("Saif", "Sales", 4100)
  )

val df = simpleData.toDF()

val empDS = df.as[Employee]
empDS.show()

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+



import spark.implicits._
defined class Employee
simpleData: Seq[Employee] = List(Employee(James,Sales,3000), Employee(Michael,Sales,4600), Employee(Robert,Sales,4100), Employee(Maria,Finance,3000), Employee(James,Sales,3000), Employee(Scott,Finance,3300), Employee(Jen,Finance,3900), Employee(Jeff,Marketing,3000), Employee(Kumar,Marketing,2000), Employee(Saif,Sales,4100))
df: org.apache.spark.sql.DataFrame = [employee_name: string, department: string ... 1 more field]
empDS: org.apache.spark.sql.Dataset[Employee] = [employee_name: string, department: string ... 1 more field]


Sources:

[Documentation Databricks](https://databricks.com/fr/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html)