<img src="images/intro-logo-scala-spa.png" align="left" width="600px"/>
<!--- ![alt text](heading.png "Heading with Scala logo") --->

---

# Índice


### [4. Ya sólo nos queda... ¡volar!](#seccion-4. Ya sólo nos queda... ¡volar!)


---

<a name="seccion-4. Ya sólo nos queda... ¡volar!"></a>
<table align="left" style="border-collapse: collapse; width: 100%; border: 5px double black">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 100px;">
<img src="icons/volar-m.png" align="left" width=85px/>
        </td>
        <td style="border:none !important; text-align:left;">
<h1>4. Ya sólo nos queda... ¡volar!</h1>
<br>
Después de conocer la sintaxis y las características más importantes de Scala, solo nos queda poner en práctica todo lo aprendido para llevar a cabo tareas más avanzadas e interesantes.
        </td>
    </tr>
</table>

---

<a name="subseccion-Introduccion a Spark"></a>
## Introducción a Spark

En este notebook presentaremos los conceptos básicos de Spark con Scala.

#### Hasta ahora:
Las tareas de "data science" y análisis de datos se ha llevado a cabo “a pequeña escala”, en R/Python/MATLAB, etc.


#### Hoy en día:
Los conjuntos de datos ya no caben en memoria, así que...

* Estos lenguajes/frameworks no nos permiten escalar. 

* Hay que reimplementarlo todo en algún otro lenguaje o sistema.


#### Además:

* La industria se está moviendo hacia una Inteligencia Empresarial basada en la toma de decisiones orientada a los datos, que se apoya en enormes conjuntos de datos.

* La API de Spark guarda una relación casi de 1-a-1 con las colecciones de Scala, pero ¡distribuidas!

### Spark + Scala


* Más expresivo. APIs modeladas tras las colecciones de Scala. ¡Parecen listas funcionales! 


* Más rico, con más operaciones componibles posibles que en MapReduce (Hadoop).


* Eficiente: en terminos de tiempo de ejecución... ¡Y también en términos de la productividad del desarrollador! 


* Bueno para 'data science'. No solo por el rendimiento, sino porque permite iteraciones (eficientes), algo requerido por la mayoría de los algoritmos presentes en la caja de herramientas de un 'data scientist'.


* Alta demanda de desarrolladores de Spark y Scala ¡y de 'data scientists'!


### Spark vs Hadoop




* Hadoop es una implementación 'open source' del MapReduce de Google.


* Una API simple para operaciones map y reduce sobre conjuntos de datos distribuidos.


* Tolerancia a fallos: entre cada operación map y reduce, escribe datos intermedios para ser capaz de recuperarse de fallos.


* La tolerancia a fallos de Spark es mucho más eficiente porque:
    - Mantiene todos los datos inmutables y en memoria
    - Las operaciones son transformaciones funcionales
    - Tolerancia a fallos: volver a aplicar las transformaciones a los datos originales


* Spark es compatible con HDFS (Hadoop Distributed FileSystem)
<br><br>



<a name="subseccion-Conceptos principales en Spark"></a>
## Conceptos principales en Spark

* Spark Session: una conexión a la API de Spark


* Estructura Hardware:
    - Cluster de master + workers
    - Workflow: shuffling


* Estructuras de datos lógicas:
    - RDDs
    - PairRDDs
    
    
* Operaciones básicas:
    - Transformaciones
    - Acciones
    
    
* Librerías interesantes:
    - Spark SQL: DataFrames y Datasets
    - Spark Streaming API
    - MlLib
    - GraphX
    - ...

### Spark Session

Conexión al cluster de Spark. 

Normalmente le "hablaremos" al nodo `master` del cluster, y este le envia las tareas a los nodos `worker`.

`SparkSession` es el objeto que usaremos para llevar a cabo las operaciones de configuración y entrada contra el cluster.

#### Configuración


In [1]:
val spark = SparkSession
      .builder()
      .appName("Spark basic example")     // Nombre para la sesión
      .master("local[2]")                 // Ruta y número de cores a utilizar
      .getOrCreate()

// Opcional: Ajustar el nivel de logging si log4j no está configurado
Logger.getRootLogger.setLevel(Level.ERROR)

Name: Compile Error
Message: <console>:17: error: not found: value SparkSession
       val spark = SparkSession
                   ^
StackTrace: 

<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/notepad.png" align="left" width="50px"> 
        </td>
        <td style="border:none !important; text-align:left;">
            <ul>
                <li>Las versiones anteriores de Spark usan <strong>SparkContext</strong> en lugar de <strong>SparkSession</strong></li>
                <li><strong>SparkContext</strong> todavía se usa, pero es transparente al desarrollador</li>
                <li>Se puede acceder a <strong>SparkContext</strong> a través de <strong>SparkSession</strong>: <strong>spark.sparkContext</strong></li>
                <li>En este notebook, ya tenemos a ambos, un <strong>SparkSession</strong> y su correspondiente  <strong>SparkContext</strong> en las variables inmutables: <strong>spark</strong> y <strong>sc</strong>, respectivamente, las cuales se usarán a lo largo del notebook</li>
            </ul>
        </td>
    </tr>
</table>

### Spark Hello World!


In [2]:
sc.parallelize(1 to 100).reduce(_ + _)

5050

<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/notepad.png" align="left" width="50px"> 
        </td>
        <td style="border:none !important;text-align:left;">
            <ul>
                <li>Usamos <strong>sc</strong> para construir una colección paralela en un cluster de Spark.</li>
                <li><strong>parallelize</strong> es una función para transformar una colección a su correspondiente versión paralela.</li>
                <li><strong>(1 to 100)</strong> es la definición de una colección de rango (una colección formada por los valores en el rango dado)</li>
                <li><strong>reduce</strong> tiene el mismo significado que en la API Collection de Scala</li>
            </ul>
        </td>
    </tr>
</table>


### Similitudes entre Spark y Scala



<br>
La API de Spark tiene casi una relación 1-a-1 con la API de colecciones de Scala. Veamos un ejemplo:
<br>

In [2]:
val lista = List("Juan", "María", "Pedro", "Elisa")                   // Construimos una List[String] de Scala 

val paresLista = lista.map(nombre => (nombre, nombre.length))         // Associar la longitud de cada string

paresLista.sortBy(-_._2).foreach(t => println(t._1 + " => " + t._2))  // Lo imprimimos

paresLista.map(_._2).reduce(_ + _)                                    // Sumar las longitudes de los strings

María => 5
Pedro => 5
Elisa => 5
Juan => 4


lista: List[String] = List(Juan, María, Pedro, Elisa)
paresLista: List[(String, Int)] = List((Juan,4), (María,5), (Pedro,5), (Elisa,5))
res1: Int = 19


<br>
El equivalente en Spark, de manera distribuida, podría ser así:
<br>

In [3]:
val parlista = sc.parallelize[String](lista)                      // Crear el equivalente: ParallelCollectionRDD[String]

val pares = parlista.map(nombre => (nombre, nombre.length))       // pares: MapPartitionRDD[(String, Int)]

pares.sortBy(-_._2).collect.foreach(t => println(t._1 + " => " + t._2))

pares.map(_._2).reduce(_ + _)              

María => 5
Pedro => 5
Elisa => 5
Juan => 4


parlista: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:27
pares: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[1] at map at <console>:29
res2: Int = 19


<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/notepad.png" align="left" width="50px"> 
        </td>
        <td style="border:none !important;text-align:left;">
            <ul>
                <li>Correspondencia 1-a-1 entre Scala y Spark</li>
                <li><strong>collect</strong> recopila en el nodo 'master' los bloques de datos distribuidos por los nodos 'worker', para así poder procesar todo el RDD</li>
                <li>Spark usa evaluación 'lazy' para algunas funciones (map, sortBy) y evaluación 'eager' para otras (reduce, collect). Lo estudiaremos con más detalle a continuación.</li>
            </ul>
        </td>
    </tr>
</table>

<a name="subseccion-Estructura hardware en Spark"></a>
## Estructura hardware en Spark

<img src="images/spark_structure.png" width="80%"/>

### Workflow

* El nodo 'master' distribuye los datos en bloques sobre los nodos 'worker', envía las tareas e integra los resultados de la computación.


* Los nodos 'worker' reciben los fragmentos de datos y las tareas y llevan a cabo las transformaciones y acciones sobre sus bloques de datos.


* Cada vez que nuestro proceso requiere todo el conjunto de datos para llevar a cabo una acción, el nodo 'master' recupera los bloques de datos de los 'workers', y reconstruye los datos en memoria.





<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/warning.png" align="left" width="50px"> 
        </td>
        <td style="border:none !important;text-align:left;">
            <ul>
                <li>Cuando los datos viajan a través de la red, se denomina <em>shuffle</em> y es <strong>realmente costoso</strong></li>
                <li>Debemos minimizar el número de veces que se requiere un <em>shuffle</em> en nuestra aplicación.</li>
                <li>Pero vamos a tomarlo con calma por ahora: necesitamos saber más conceptos relacionados con Spark para estudiar con mayor profundidad este tema.</li>
            </ul>
        </td>
    </tr>
</table>

<a name="subseccion-Estructuras de datos lógicas RDDs"></a>
## Estructuras de datos lógicas: RDDs


* Resilient Distributed Dataset


* Colecciones paralelas para la computación distribuida de programación funcional


* Una colección de datos (tipados), que se puede distribuir fácilmente sobre los nodos 'worker', de manera que cada nodo se hace cargo de un trozo de todo el conjunto de datos a procesar.


* Un RDD es una referencia lógica a un conjunto de datos que es fragmentado a través de muchos servidores en el cluster de Spark.


* Los RDD son fragmentados y distribuidos sobre los nodos 'worker' en el cluster de Spark de forma automática (sin la intervención del programador). Ver la sección anterior sobre la estructura física de un cluster de Spark.


* El esquema de fragmentación puede modificarse, pero por defecto Spark intenta minimizar el tráfico de la red entre los nodos cuando procesa los RDD. Por ejemplo: en un entorno local, hay normalmente una partición por cada nodo 'worker' (los cores de la CPU disponibles para Spark).

#### Ejemplo: Leyendo de un fichero Json a un RDD


Fichero Json de entrada:

Instrucciones en Spark para leer el fichero y construir el RDD:

In [5]:
val testRDD = sc.textFile("test.txt")            // Lee del fichero a un RDD
val nRecords = testRDD.count                     // Devuelve el número de registros leidos del fichero Json
val nPartitions = testRDD.partitions.size        // Devuelve el número de particiones de testRDD

println("Number of records in the file: " + nRecords)
println("Number of partitions in the RDD: " + nPartitions)

Number of records in the file: 10
Number of partitions in the RDD: 2


### Vamos a jugar un poco con nuestro nuevo RDD

#### Imprime los primeros 5 elementos

In [6]:
testRDD.take(5).foreach(elem => println("\nRow: " + elem ))    // Toma 5 elementos del RDD y los imprime en la consola


Row: {"idTweet":"915831976929714177","text":"RT @Societatcc: Ayúdanos a difundir, necesitamos llegar a todos los rincones, no tenemos TV3 pero... ¡¡os tenemos a vosotros!!\n¿Com… ","date":"Thu Oct 05 08:52:13 CEST 2017","authorId":"2885455811","idOriginal":"915523419281739776"}

Row: {"idTweet":"915831940745441280","text":"Yo ya he escogido mediador. https://t.co/D7xS4MHbDG","date":"Thu Oct 05 08:52:04 CEST 2017","authorId":"2099361","idOriginal":""}

Row: {"idTweet":"915831968301973504","text":"RT @pedroveraOyP: #AmicsAmigos no pelearsen que es muy #ranciofacts https://t.co/mjMhHQfHuB","date":"Thu Oct 05 08:52:11 CEST 2017","authorId":"799792832","idOriginal":"915830958443687936"}

Row: {"idTweet":"915831985582612480","text":"RT @Societatcc: Ayúdanos a difundir, necesitamos llegar a todos los rincones, no tenemos TV3 pero... ¡¡os tenemos a vosotros!!\n¿Com… ","date":"Thu Oct 05 08:52:15 CEST 2017","authorId":"105157939","idOriginal":"915523419281739776"}

Row: {"idTweet":"91583200465

<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/warning.png" align="left" width="50px"> 
        </td>
        <td style="border:none !important;text-align:left;">
            <ul>
                <li>Esto es solo parte del fichero de entrada que se usará en los próximos ejemplos</li>
                <li>Con esta instrucción leemos un Json como un fichero de texto plano</li>
            </ul>
        </td>
    </tr>
</table>

#### Filtra aquellos tuits que tengan cualquier hashtag

In [7]:
val hashtagTweets = testRDD.filter(t => t.contains("#"))
hashtagTweets.collect.foreach(elem => println("Row: " + elem ))

Row: {"idTweet":"915831968301973504","text":"RT @pedroveraOyP: #AmicsAmigos no pelearsen que es muy #ranciofacts https://t.co/mjMhHQfHuB","date":"Thu Oct 05 08:52:11 CEST 2017","authorId":"799792832","idOriginal":"915830958443687936"}
Row: {"idTweet":"915832004658286593","text":"RT @pedroveraOyP: #AmicsAmigos no pelearsen que es muy #ranciofacts https://t.co/mjMhHQfHuB","date":"Thu Oct 05 08:52:19 CEST 2017","authorId":"124248712","idOriginal":"915830958443687936"}
Row: {"idTweet":"915830958443687936","text":"#AmicsAmigos no pelearsen que es muy #ranciofacts https://t.co/mjMhHQfHuB","date":"Thu Oct 05 08:48:10 CEST 2017","authorId":"110117638","idOriginal":""}
Row: {"idTweet":"915832008936509440","text":"RT @gsemprunmdg: el desbroce    x      Davila\n\n#FelizJueves\n#AmicsAmigos\n#LaCafeteraPARLEM\n#DíaMundialDeLosDocentes https://t.co/trDDTvrjgr","date":"Thu Oct 05 08:52:20 CEST 2017","authorId":"150587014","idOriginal":"915830945785237504"}
Row: {"idTweet":"915832057288433664","text"

#### Extrae los ids de los tuits y sus autores y el tuit original (si es un retuit)

In [8]:
val ids = testRDD.map(t => t.split("\",\"")).map(fields => (fields(0), fields(3), fields(4)))

ids.collect.foreach(elem => println("[" + elem._1 + ", " + elem._2 + ", " + elem._3 + "]" ))

[{"idTweet":"915831976929714177, authorId":"2885455811, idOriginal":"915523419281739776"}]
[{"idTweet":"915831940745441280, authorId":"2099361, idOriginal":""}]
[{"idTweet":"915831968301973504, authorId":"799792832, idOriginal":"915830958443687936"}]
[{"idTweet":"915831985582612480, authorId":"105157939, idOriginal":"915523419281739776"}]
[{"idTweet":"915832004658286593, authorId":"124248712, idOriginal":"915830958443687936"}]
[{"idTweet":"915830958443687936, authorId":"110117638, idOriginal":""}]
[{"idTweet":"915832008936509440, authorId":"150587014, idOriginal":"915830945785237504"}]
[{"idTweet":"915832057288433664, authorId":"273360453, idOriginal":"915808416639143936"}]
[{"idTweet":"915808416639143936, authorId":"184865048, idOriginal":""}]
[{"idTweet":"915836526789046273, authorId":"142775869, idOriginal":""}]


#### Limpia los elementos del RDD

In [9]:
val cleanIds = ids.map(tuple => {
    (tuple._1.replace("{\"idTweet\":\"", ""), 
    tuple._2.replace("authorId\":\"",""), 
    tuple._3.replace("idOriginal\":\"","").replace("\"}",""))
    })

cleanIds.collect.foreach(elem => println("[" + elem._1 + ", " + elem._2 + ", " + elem._3 + "]" ))

[915831976929714177, 2885455811, 915523419281739776]
[915831940745441280, 2099361, ]
[915831968301973504, 799792832, 915830958443687936]
[915831985582612480, 105157939, 915523419281739776]
[915832004658286593, 124248712, 915830958443687936]
[915830958443687936, 110117638, ]
[915832008936509440, 150587014, 915830945785237504]
[915832057288433664, 273360453, 915808416639143936]
[915808416639143936, 184865048, ]
[915836526789046273, 142775869, ]


#### Convierte el RDD[(String, String, String)] en un RDD[(Long, Long, Long)]

In [10]:
val longIds = cleanIds.map(tuple => {
    (tuple._1.toLong, 
    tuple._2.toLong, 
    tuple._3.toLong)
    })

longIds.collect.foreach(elem => println("[" + elem._1 + ", " + elem._2 + ", " + elem._3 + "]" ))

Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 1 in stage 10.0 failed 1 times, most recent failure: Lost task 1.0 in stage 10.0 (TID 20, localhost, executor driver): java.lang.NumberFormatException: For input string: ""
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Long.parseLong(Long.java:601)
	at java.lang.Long.parseLong(Long.java:631)
	at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:276)
	at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
	at $line42.$read$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:28)
	at $line42.$read$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mut

#### ¿Lo volvemos a intentar?

In [11]:
val longIds = cleanIds.map(tuple => {
    (tuple._1.toLong, 
    tuple._2.toLong, 
    if(tuple._3 == "") 0 else tuple._3.toLong)
    })

longIds.collect.foreach(elem => println("[" + elem._1 + ", " + elem._2 + ", " + elem._3 + "]" ))

[915831976929714177, 2885455811, 915523419281739776]
[915831940745441280, 2099361, 0]
[915831968301973504, 799792832, 915830958443687936]
[915831985582612480, 105157939, 915523419281739776]
[915832004658286593, 124248712, 915830958443687936]
[915830958443687936, 110117638, 0]
[915832008936509440, 150587014, 915830945785237504]
[915832057288433664, 273360453, 915808416639143936]
[915808416639143936, 184865048, 0]
[915836526789046273, 142775869, 0]


#### Recupera los tuits que han sido retuiteados

In [12]:
val retweetedIDS = longIds.filter(t => t._3!=0).map(_._3).distinct.collect

val original = longIds.filter(t => retweetedIDS.contains(t._1))


original.collect.foreach(println _)
retweetedIDS

(915830958443687936,110117638,0)
(915808416639143936,184865048,0)


Array(915523419281739776, 915830945785237504, 915830958443687936, 915808416639143936)

<a name="subseccion-Estructuras de datos lógicas PairRDDs"></a>
## Estructuras de datos lógicas: PairRDDs


* Intuición: versión paralela y distribuida de un Map


* Un RDD que contiene tuplas de (clave, valor)


* Muy útil porque los Map son una de las abstracciones de datos más utilizadas

### Caso de uso de PairRDDs: Contando palabras en un RDD


1.- Primero: vamos a dividir el contenido del RDD en palabras: usando <strong>flatMap</strong>

2.- Después, crea un PairRDD con: (Palabra, 1): usando <strong>map</strong>

3.- Finalmente, agrupa cada par en función de su primer componente (la palabra) y suma los segundos componentes (ocurrencias de las palabras): usando la función <strong>reduceByKey</strong> de los PairRDDs

In [13]:
// Mecanismo habitual para contar elementos mapeando un RDD a un PairRDD
val countWords = testRDD.flatMap(line => line.split(" ")).map(w => (w, 1)).reduceByKey(_ + _)

println("Filas leídas: " + countWords.count)

// Imprimiendo
countWords.take(15).foreach(t => println("Word: " + t._1 + "\tOccurrences: " + t._2))

Filas leídas: 101
Word: #ranciofacts	Occurrences: 3
Word: recodo	Occurrences: 1
Word: 2017","authorId":"124248712","idOriginal":"915830958443687936"}	Occurrences: 1
Word: arreglan	Occurrences: 2
Word: https://t.co/trDDTvrjgr","date":"Thu	Occurrences: 1
Word: Si	Occurrences: 1
Word: donde	Occurrences: 1
Word: ya	Occurrences: 1
Word: los	Occurrences: 4
Word: {"idTweet":"915830958443687936","text":"#AmicsAmigos	Occurrences: 1
Word: 2017","authorId":"142775869","idOriginal":""}	Occurrences: 1
Word: x	Occurrences: 1
Word: @pedroveraOyP:	Occurrences: 2
Word: #AmicsAmigos	Occurrences: 2
Word: pero...	Occurrences: 2


### Vamos a pensarlo dos veces...


* El primer paso va desde un RDD[String] a un RDD[String]: flatMap divide cada <em>Tweet</em> en una Collection[<em>palabras</em>], y luego las aplana, obteniendo un RDD[<em>palabras</em>].


* El segundo paso va de un RDD[String], donde cada String es una palabra, a un RDD[(String, Int)], que es un PairRDD[(String, Int)].

* Finalmente, <strong>reduceByKey</strong> agrupa todas las tuplas con la misma palabra, sumando sus valores, produciendo un PairRDD[(String, Int)] que representa un RDD[<em>(palabra, ocurrenciasDePalabra)</em>]

<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/optimizar.png" align="left" width="50px"> 
        </td>
        <td style="border:none !important; text-align:left">
            <ul>
                <li>En el ejemplo de arriba, cogemos cada linea del fichero Json como un String.</li>
                <li>Eso significa que estamos contando todas las palabras en los textos, incluyendo los nombres de los campos Json, y cosas que no son solo texto.</li>
                <li>¿Cómo podemos arreglarlo? Vamos a verlo en las siguientes secciones.</li>
            </ul>
        </td>
    </tr>
</table>

### ¿Qué estamos haciendo realmente?


Cuando leemos un fichero con <strong>textFile</strong>, Spark crea un RDD[String]. No infiere la estructura del Json, ni de los pares (campo, valor).


La primera idea para solucionarlo podría ser algo como lo siguiente:

In [14]:
val texts = testRDD.map(row => row.split("\",")).map(row => row(1).replace("\"text\":\"", ""))

#### ¿Qué hace la instrucción de arriba con el RDD[String] original?

Vamos a imprimirlo y lo vemos...

In [15]:
texts.take(5).foreach(println(_))

RT @Societatcc: Ayúdanos a difundir, necesitamos llegar a todos los rincones, no tenemos TV3 pero... ¡¡os tenemos a vosotros!!\n¿Com… 
Yo ya he escogido mediador. https://t.co/D7xS4MHbDG
RT @pedroveraOyP: #AmicsAmigos no pelearsen que es muy #ranciofacts https://t.co/mjMhHQfHuB
RT @Societatcc: Ayúdanos a difundir, necesitamos llegar a todos los rincones, no tenemos TV3 pero... ¡¡os tenemos a vosotros!!\n¿Com… 
RT @pedroveraOyP: #AmicsAmigos no pelearsen que es muy #ranciofacts https://t.co/mjMhHQfHuB


#### Ahora, podemos contar las palabras de cada texto siguiendo el mismo procedimiento que antes...

In [16]:
val countWordsTexts = texts.flatMap(line => line.split(" ")).map(w => (w, 1)).reduceByKey(_ + _)

In [17]:
// Printing it out, sorted by the counting
countWordsTexts.sortBy(-_._2).take(15).foreach(t => println("Word: " + t._1 + "\tOccurrences: " + t._2))

Word: 	Occurrences: 8
Word: a	Occurrences: 8
Word: no	Occurrences: 7
Word: RT	Occurrences: 6
Word: que	Occurrences: 5
Word: los	Occurrences: 4
Word: tenemos	Occurrences: 4
Word: lo	Occurrences: 4
Word: todos	Occurrences: 4
Word: #ranciofacts	Occurrences: 3
Word: https://t.co/mjMhHQfHuB	Occurrences: 3
Word: #AmicsAmigos	Occurrences: 3
Word: es	Occurrences: 3
Word: pelearsen	Occurrences: 3
Word: muy	Occurrences: 3


### Pros de los RDD y los PairRDD


    1.- Fácil usar la API, basados en las colecciones de Scala (map, reduce, filter, flatMap...)
    
    2.- Optimizados para ser usados en un cluster distribuido de Spark
    
    3.- Colecciones tipadas: apoyándose en la inferencia de tipos de Scala
    


### Pero, realmente... es un poco tedioso, ¿no es cierto?

    1.- No son buenos para procesar datos estructurados o semi-estructurados: 
        - En el ejemplo, intentamos leer un fichero <strong>estructurado</strong>, en formato Json
        - Así que perdimos toda esa <strong>información preciada</strong> (campos, valores, etc.) transformándolo en una colección (resiliente y distribuible) de strings.
        - Y luego usamos los mismos <strong>split-get-replace</strong> viejos y aburridos de la clase String para extraer las partes interesantes del string.


    2.- El shuffling puede convertirse en el cuello de botella de nuestra aplicación y a veces no es fácil de evitar.
    

<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/optimizar.png" align="left" width="50px"> 
        </td>
        <td style="border:none !important; text-align:left">
            <ul>
                <li>En relación al <strong>data shuffle</strong>, en la siguiente sección estudiaremos las operaciones básicas en Spark (<em>Transformaciones</em> y <em>Acciones</em>), sus efectos y la manera en que se gestionan en el cluster de Spark.</li>
                <li>Respecto al procesado de <strong>información estructurada y semi-estructurada</strong>: Spark ofrece una manera mucho mejor de lidiar con este tipo de datos a través de la librería <em>Spark SQL</em> y sus estructuras de datos relacionales: <em>DataFrames</em> y <em>Datasets</em>. Las estudiaremos más adelante.</li>
            </ul>
        </td>
    </tr>
</table>

<a name="subseccion-Operaciones Básicas de Spark: Transformaciones y Acciones"></a>
## Operaciones Básicas de Spark: Transformaciones y Acciones


Los RDD de Apache Spark soportan dos tipos de operaciones: Transformaciones y Acciones.



### Transformaciones


* Son funciones que producen nuevos RDD a partir de los ya existentes. Ejemplos: map(), filter().


* Dado que los RDD de entrada no pueden modificarse (son inmutables por naturaleza), cada vez que aplicamos una transformación se crean nuevos RDD.


* Las transformaciones se evalúan con evaluación "perezosa", lo que significa que no se ejecutan de inmediato. Una transformación se ejecuta efectivamente cuando llamamos a una acción.


* Por lo tanto, aplicar una (cantidad de) transformaciones no produce ningún efecto inmediato. En cambio, se crea un linaje de RDD, que va del RDD original (que invoca la primera transformación) a los RDD finales (resultado de todas las transformaciones). El linaje de RDD, representado por un <strong>DAG</strong> (Directed Acyclic Graph o Grafo acíclico dirigido), es un plan de ejecución lógica de todas las transformaciones.

Ejemplos de transformaciones y del DAG:

In [2]:
// Recordar el código visto anteriormente: 
    // val texts = testRDD.map(row => row.split("\",")).map(row => row(1).replace("\"text\":\"", ""))
    // val countWordsTexts = texts.flatMap(line => line.split(" ")).map(w => (w, 1)).reduceByKey(_ + _)

countWordsTexts.toDebugString            // imprimir el plan de ejecución (DAG) de las transformaciones

res0: String =
(2) ShuffledRDD[6] at reduceByKey at <console>:33 []
 +-(2) MapPartitionsRDD[5] at map at <console>:33 []
    |  MapPartitionsRDD[4] at flatMap at <console>:33 []
    |  MapPartitionsRDD[3] at map at <console>:32 []
    |  MapPartitionsRDD[2] at map at <console>:32 []
    |  test.txt MapPartitionsRDD[1] at textFile at <console>:25 []
    |  test.txt HadoopRDD[0] at textFile at <console>:25 []


### Tipos de transformaciones:



* Transformaciones estrechas ('narrow'): no implican una mezcla de datos. Se pueden calcular por cada nodo 'worker' con sus propias particiones de datos.
     - Ejemplos: map, filter, flatMap, union, sample...



* Transformaciones amplias ('wide'): la lógica de procesamiento depende de los datos de múltiples particiones, por lo que es necesario combinar los datos para reunirlos en un solo lugar.
     - Ejemplos: distinct, join, reduceByKey, groupByKey...
   

<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/notepad.png" align="left" width="50px"> 
        </td>
        <td style="border:none !important; text-align:left">
            <ul>
                <li>Spark implementa un mecanismo para optimizar el plan de ejecución de las transformaciones con el fin de minimizar la combinación de datos ('data shuffling')</li>
                <li>Recuerda que las transformaciones son <strong>lazy</strong>: no se ejecutan cuando se declaran</li>
                <li>Una forma de realizar un conjunto de transformaciones es aplicar una acción al RDD de salida</li>
            </ul>
        </td>
    </tr>
</table>

<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/warning.png" align="left" width="50px"> 
        </td>
        <td style="border:none !important; text-align:left">
            <ul>
                <li>DAG es el mecanismo que permite que Spark sea tolerante a fallos, <strong>sin</strong> tener que escribir datos en el disco como una copia de seguridad</li>
                <li>Spark se recupera de las fallos volviendo a calcular las particiones perdidas, siguiendo el <strong>DAG</strong></li>
                <li>Es realmente <strong>rápido</strong> recuperar datos de transformaciones <strong>narrow</strong>, pero <strong>lento</strong> hacerlo de transformaciones <strong>wide</strong></li>
            </ul>
        </td>
    </tr>
</table>


### Acciones:



* Son operaciones Spark sobre RDD que producen valores que no son RDD.


* Los resultados de las acciones se almacenan en los nodos 'master' o en el sistema de almacenamiento externo. Por lo tanto, una acción es una de las maneras de enviar datos desde los nodos 'worker' al 'master'.


* Pone el modo 'lazy' de los RDD en movimiento, lo que significa que una acción provoca la ejecución de las transformaciones asociadas en el RDD.

* Ejemplos: count, collect, first, take...

#### Repasemos el ejemplo de RDD desde un archivo de texto:

In [3]:
val originalRDD = sc.textFile("./test.txt")           // Leer fichero de texto plano

val firstTransformation = originalRDD.map(row => row.split("\","))

val secondTransformation = firstTransformation.map(row => row(1).replace("\"text\":\"", ""))

val thirdTransformation = secondTransformation.filter(text => text.contains("@"))

val fourthTransformation = secondTransformation.flatMap(text => text.split(" "))

val fifthTransformation = fourthTransformation.filter(word => word.startsWith("#"))

val sixthTransformation = fifthTransformation.map(_.toLowerCase).distinct

originalRDD: org.apache.spark.rdd.RDD[String] = ./test.txt MapPartitionsRDD[8] at textFile at <console>:25
firstTransformation: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[9] at map at <console>:27
secondTransformation: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[10] at map at <console>:29
thirdTransformation: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:31
fourthTransformation: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at flatMap at <console>:33
fifthTransformation: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at filter at <console>:35
sixthTransformation: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at distinct at <console>:37


<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/question.jpg" align="left" width="50px"> 
        </td>
        <td style="border:none !important; text-align:left">
            <ul>
                <li>¿Qué hemos hecho, hasta el momento?</li>
                <li>¿Cuál es el contenido de cada RDD?</li>
            </ul>
        </td>
    </tr>
</table>


In [2]:
thirdTransformation.collect.foreach(println)         // Transformación a computar: 3, 2 y 1
println
fifthTransformation.collect.foreach(println)         // Transformación a computar: 5, 4, 2 y 1
println
sixthTransformation.collect.foreach(println)         // Transformación a computar: 6, 5, 4, 2 y 1

RT @Societatcc: Ayúdanos a difundir, necesitamos llegar a todos los rincones, no tenemos TV3 pero... ¡¡os tenemos a vosotros!!\n¿Com… 
RT @pedroveraOyP: #AmicsAmigos no pelearsen que es muy #ranciofacts https://t.co/mjMhHQfHuB
RT @Societatcc: Ayúdanos a difundir, necesitamos llegar a todos los rincones, no tenemos TV3 pero... ¡¡os tenemos a vosotros!!\n¿Com… 
RT @pedroveraOyP: #AmicsAmigos no pelearsen que es muy #ranciofacts https://t.co/mjMhHQfHuB
RT @gsemprunmdg: el desbroce    x      Davila\n\n#FelizJueves\n#AmicsAmigos\n#LaCafeteraPARLEM\n#DíaMundialDeLosDocentes https://t.co/trDDTvrjgr
RT @carmouna: Si no lo arreglan los que mandan, lo haremos todos nosotros. Juntos. Envía tu canción a #amicsamigos @radio3_rne… 
Si no lo arreglan los que mandan, lo haremos todos nosotros. Juntos. Envía tu canción a #amicsamigos @radio3_rne… https://t.co/ayQQEgCvVz
#AmicsAmigos
#ranciofacts
#AmicsAmigos
#ranciofacts
#AmicsAmigos
#ranciofacts
#amicsamigos
#amicsamigos
#ranciofacts
#amicsamigos


<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/optimizar.png" align="left" width="50px"> 
        </td>
        <td style="border:none !important; text-align:left">
            <ul>
                <li>Recuerda que las transformaciones se evaluan mediante evaluación perezosa, así que... </li>
                <li>Fíjate en que las transformaciones 2 y 1 se evalúan ¡¡tres veces!!</li>
                <li>Spark proporciona un mecanismo para ayudar a los programadores a evitar esta situación: <strong>caching</strong>. Vamos a reescribir nuestro código:</li>
            </ul>
        </td>
    </tr>
</table>

In [3]:
val originalRDD2 = sc.textFile("test.txt")           // leer el fichero de texto plano

val firstT = originalRDD2.map(row => row.split("\","))

val secondT = firstT.map(row => row(1).replace("\"text\":\"", "")).cache    // ¡¡Guardar el resultado en la caché!!

val thirdT = secondT.filter(text => text.contains("@"))

val fourthT = secondT.flatMap(text => text.split(" "))

val fifthT = fourthT.filter(word => word.startsWith("#"))

val sixthT = fifthT.map(_.toLowerCase).distinct

In [4]:
thirdT.collect.foreach(println)         // Transformación a computar: 3, 2 y 1, y guarda en la caché la transformación 2
println
fifthT.collect.foreach(println)         // Transformación a computar: 5 and 4 sobre la ya ya evaluada y guardada 2
println
sixthT.collect.foreach(println)         // Transformación a computar: 6, 5, 4 sobre la ya ya evaluada y guardada 2

RT @Societatcc: Ayúdanos a difundir, necesitamos llegar a todos los rincones, no tenemos TV3 pero... ¡¡os tenemos a vosotros!!\n¿Com… 
RT @pedroveraOyP: #AmicsAmigos no pelearsen que es muy #ranciofacts https://t.co/mjMhHQfHuB
RT @Societatcc: Ayúdanos a difundir, necesitamos llegar a todos los rincones, no tenemos TV3 pero... ¡¡os tenemos a vosotros!!\n¿Com… 
RT @pedroveraOyP: #AmicsAmigos no pelearsen que es muy #ranciofacts https://t.co/mjMhHQfHuB
RT @gsemprunmdg: el desbroce    x      Davila\n\n#FelizJueves\n#AmicsAmigos\n#LaCafeteraPARLEM\n#DíaMundialDeLosDocentes https://t.co/trDDTvrjgr
RT @carmouna: Si no lo arreglan los que mandan, lo haremos todos nosotros. Juntos. Envía tu canción a #amicsamigos @radio3_rne… 
Si no lo arreglan los que mandan, lo haremos todos nosotros. Juntos. Envía tu canción a #amicsamigos @radio3_rne… https://t.co/ayQQEgCvVz
#AmicsAmigos
#ranciofacts
#AmicsAmigos
#ranciofacts
#AmicsAmigos
#ranciofacts
#amicsamigos
#amicsamigos
#ranciofacts
#amicsamigos


<a name="subseccion-Spark SQL"></a>
## Spark SQL: DataFrames and Datasets



### Características de Spark SQL


* Librería de Spark que integra la sintaxis basada en SQL para realizar operaciones en datos distribuidos.


* Define estructuras de datos para facilitar la implementación de operaciones relacionales (select, group-by, order-by, max, min, average, count, etc.): DataFrames y Datasets.


* Estas estructuras de datos integran optimizaciones de rendimiento del álgebra relacional de SQL.



<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/optimizar.png" align="left" width="50px"> 
        </td>
        <td style="border:none !important; text-align:left">
            <ul>
                <li>Para utilizar la sintaxis optimizada de Spark SQL debemos incluir la siguiente línea de código: <em>import spark.implicits._</em></li>
                <li>También es útil para transformar los RDD en DataFrames</li>
            </ul>
        </td>
    </tr>
</table>

In [24]:
// En el notebook de Jupyter esta línea deberá ser diferente
val sqlC = new org.apache.spark.sql.SQLContext(sc)
import sqlC.implicits._

<a name="subsubseccion-DataFrames"></a>
### DataFrames

* Conceptualmente equivalente a una tabla SQL


* Los DataFrame <strong>no tienen tipo</strong>: Scala no puede inferir el tipo de sus elementos, porque los Dataframe están compuestos por <strong>filas</strong> (sin tipo)


* Perdimos la flexibilidad de los RDD y los tipos y funciones definidos por el programador, contra un conjunto de tipos predefinidos (<em>Int, Long, String...</em>) y funciones relacionales (<em>SELECT, COUNT, WHERE...</em>)


* Por otro lado, obtenemos enormes <strong>optimizaciones</strong> en términos de complejidad de tiempo gracias a estas fuertes restricciones.


* Catalyst es el componente Spark a cargo de las optimizaciones de esos métodos.


#### Creando DataFrames

* Los dataframes se pueden crear leyendo directamente de un archivo de texto, usando la variable SparkSession:

In [2]:
val df = spark.read.json("test.txt")
df

[authorId: string, date: string ... 3 more fields]

In [26]:
df.printSchema                                // Imprime el schema del DataFrame, inferido del fichero Json

root
 |-- authorId: string (nullable = true)
 |-- date: string (nullable = true)
 |-- idOriginal: string (nullable = true)
 |-- idTweet: string (nullable = true)
 |-- text: string (nullable = true)



In [27]:
df.show                                       // Imprime los primeros 20 elementos en el DataFrame

+----------+--------------------+------------------+------------------+--------------------+
|  authorId|                date|        idOriginal|           idTweet|                text|
+----------+--------------------+------------------+------------------+--------------------+
|2885455811|Thu Oct 05 08:52:...|915523419281739776|915831976929714177|RT @Societatcc: A...|
|   2099361|Thu Oct 05 08:52:...|                  |915831940745441280|Yo ya he escogido...|
| 799792832|Thu Oct 05 08:52:...|915830958443687936|915831968301973504|RT @pedroveraOyP:...|
| 105157939|Thu Oct 05 08:52:...|915523419281739776|915831985582612480|RT @Societatcc: A...|
| 124248712|Thu Oct 05 08:52:...|915830958443687936|915832004658286593|RT @pedroveraOyP:...|
| 110117638|Thu Oct 05 08:48:...|                  |915830958443687936|#AmicsAmigos no p...|
| 150587014|Thu Oct 05 08:52:...|915830945785237504|915832008936509440|RT @gsemprunmdg: ...|
| 273360453|Thu Oct 05 08:52:...|915808416639143936|915832057288433664

* Los dataframes pueden ser creados a partir de un RDD existente:

In [28]:
val dfFromRawRDD = originalRDD.toDF                // Función importada de spark.implicits._
dfFromRawRDD.printSchema
dfFromRawRDD.show

root
 |-- value: string (nullable = true)

+--------------------+
|               value|
+--------------------+
|{"idTweet":"91583...|
|{"idTweet":"91583...|
|{"idTweet":"91583...|
|{"idTweet":"91583...|
|{"idTweet":"91583...|
|{"idTweet":"91583...|
|{"idTweet":"91583...|
|{"idTweet":"91583...|
|{"idTweet":"91580...|
|{"idTweet":"91583...|
+--------------------+



In [143]:
case class Tweet(idTweet:Long, text:String, date:String, AuthorId:Long, idOriginal:Long)

val formattedRDD = originalRDD.map(line => line.replace("{", "").replace("}", "")).
    map(line => line.split("\",\"")).
    map(columns => columns.map(e => e.split("\":\"")(1).replace("\"",""))).
    map(attributes => {
        // Warning: idOriginal puede estar vacío!!
        val idorig = if(attributes(4)=="")  0 else attributes(4).toLong
        
        Tweet(attributes(0).toLong, attributes(1), attributes(2), attributes(3).toLong, idorig)
    })
formattedRDD

MapPartitionsRDD[487] at map at <console>:102

<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/warning.png" align="left" width="50px"> 
        </td>
        <td style="border:none !important; text-align:left">
            <ul>
                <li>Este método no funciona bien en el <strong>notebook de Jupyter</strong></li>
                <li>Compruébalo en el laboratorio</li>
            </ul>
        </td>
    </tr>
</table>

* Los dataframes se pueden crear a partir de un RDD, especificando un schema:

In [141]:
import org.apache.spark.sql._
import org.apache.spark.sql.types._

// Leer fichero de texto plano
val originalRDD = spark.sparkContext.textFile("test.txt")

// Generar el schema especificando cada campo y su tipo
val fields = List(
  StructField("idTweet", LongType, nullable = true),
  StructField("text", StringType, nullable = true),
  StructField("date", StringType, nullable = true),
  StructField("authorId", LongType, nullable = true),
  StructField("idOriginal", LongType, nullable = true))

val schema = StructType(fields)

// Leer el RDD de un fichero de texto
val rowRDD = originalRDD.map(line => line.replace("{", "").replace("}", "")).
  map(line => line.split("\",\"")).
  map(columns => columns.map(e => e.split("\":\"")(1).replace("\"",""))).
  map(attributes => {
    // Warning: idOriginal puede estar vacío!!
    val idOrig = if(attributes(4)=="") 0 else attributes(4).toLong

    Row(attributes(0).toLong, attributes(1), attributes(2), attributes(3).toLong, idOrig)
  })

// Aplicar el schema al RDD
val tweetDF = spark.createDataFrame(rowRDD, schema)

#### Jugando con DataFrames

Los DataFrames pueden usarse casi como una base de datos relacional SQL:

In [10]:
// Registrar el DataFrame como una vista SQL temporal
df.createOrReplaceTempView("tweets")

// Seleccionar tweets que NO son retweets
val originalTweetsQueryDF
= spark.sql("SELECT * FROM tweets WHERE idOriginal LIKE ''")

originalTweetsQueryDF.show

+---------+--------------------+----------+------------------+--------------------+
| authorId|                date|idOriginal|           idTweet|                text|
+---------+--------------------+----------+------------------+--------------------+
|  2099361|Thu Oct 05 08:52:...|          |915831940745441280|Yo ya he escogido...|
|110117638|Thu Oct 05 08:48:...|          |915830958443687936|#AmicsAmigos no p...|
|184865048|Thu Oct 05 07:18:...|          |915808416639143936|Si no lo arreglan...|
|142775869|Thu Oct 05 09:10:...|          |915836526789046273|La elegancia del ...|
+---------+--------------------+----------+------------------+--------------------+



* El equivalente, usando funciones Spark SQL y $_notation:

In [12]:
val sqlC = new org.apache.spark.sql.SQLContext(sc)           // Los notebooks de Jupyter requieren estas frases
import sqlC.implicits._                                      // para utilizar la $_notation

// Seleccionar tweets que NO son retweets
// $"colName" => acceso a la columna colName del DataFrame
df.select($"idOriginal", $"date", $"authorId", $"idTweet", $"text").where("idOriginal LIKE''").show

// Alternativa:
df.select($"idOriginal", $"date", $"authorId", $"idTweet", $"text").filter($"idOriginal".like("")).show

+----------+--------------------+---------+------------------+--------------------+
|idOriginal|                date| authorId|           idTweet|                text|
+----------+--------------------+---------+------------------+--------------------+
|          |Thu Oct 05 08:52:...|  2099361|915831940745441280|Yo ya he escogido...|
|          |Thu Oct 05 08:48:...|110117638|915830958443687936|#AmicsAmigos no p...|
|          |Thu Oct 05 07:18:...|184865048|915808416639143936|Si no lo arreglan...|
|          |Thu Oct 05 09:10:...|142775869|915836526789046273|La elegancia del ...|
+----------+--------------------+---------+------------------+--------------------+

+----------+--------------------+---------+------------------+--------------------+
|idOriginal|                date| authorId|           idTweet|                text|
+----------+--------------------+---------+------------------+--------------------+
|          |Thu Oct 05 08:52:...|  2099361|915831940745441280|Yo ya he esco

<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/warning.png" align="left" width="50px"> 
        </td>
        <td style="border:none !important; text-align:left">
            <ul>
                <li>Fuera de los <strong>notebooks de Jupyter</strong> debemos reemplazar los primeros import por: <em>import spark.implicits._</em></li>
                <li><em>spark</em> es el conector de la Spark Session.</li>
                <li>Compruébalo en el laboratorio.</li>
            </ul>
        </td>
    </tr>
</table>

<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/notepad.png" align="left" width="50px"> 
        </td>
        <td style="border:none !important; text-align:left">
            <ul>
                <li>Spark SQL proporciona funciones equivalentes a la directivas SQL: <em>where, like, select, count</em>...</li>
                <li>La <strong>$_notation</strong> nos permite acceder a las columnas de un DataFrame por sus nombres.</li>
                <li>Las funciones de la Spark API, como <em>filter</em>, están también sobreescritas en la Spark SQL API, con el objetivo de aplicar las optimizaciones donde sea posible.</li>
            </ul>
        </td>
    </tr>
</table>

#### Agregaciones:

* Una de las tareas más comunes con las bases de datos relacionales es agrupar y/o agregar atributos con ciertas condiciones para realizarle algunas acciones al resultado, como contar, sumar, calcular la media, etc.


* Spark SQL proporciona la función <strong>groupBy</strong>, que devuelve un <em>RelationalGroupedDataset</em>


* Este tipo tiene una serie de funciones de agregación relacional: sum, count, avg, max, min.

In [78]:
import org.apache.spark.sql.functions._       // NECESARIO para las funciones de agrupación

// Ejemplo de agrupación:
val grouped = df.groupBy($"idOriginal")
grouped

org.apache.spark.sql.RelationalGroupedDataset@222e7463

In [42]:
// Contando por idOriginal:
val groupedCount = grouped.count
groupedCount.printSchema

root
 |-- idOriginal: string (nullable = true)
 |-- count: long (nullable = false)



In [43]:
c.show

+------------------+-----+
|        idOriginal|count|
+------------------+-----+
|915830945785237504|    1|
|915808416639143936|    1|
|915830958443687936|    2|
|                  |    4|
|915523419281739776|    2|
+------------------+-----+



In [90]:
// Ordenando los resultados
groupedCount.orderBy($"count".desc).show


+------------------+-----+
|        idOriginal|count|
+------------------+-----+
|                  |    4|
|915523419281739776|    2|
|915830958443687936|    2|
|915808416639143936|    1|
|915830945785237504|    1|
+------------------+-----+



In [91]:
// Average, max, min...
groupedCount.agg(avg($"count")).show
groupedCount.agg(max($"count")).show
groupedCount.agg(min($"count")).show

+----------+
|avg(count)|
+----------+
|       2.0|
+----------+

+----------+
|max(count)|
+----------+
|         4|
+----------+

+----------+
|min(count)|
+----------+
|         1|
+----------+



<a name="subsubseccion-DataSets"></a>
### DataSets


* Resumiendo: DataFrames con tipo


* Los DataSets son una versión <strong>con tipo</strong> de los DataFrames: tenemos que especificar los tipos de cada columna en un DataSet.


* En realidad: DataFrame = DataSet\[Row\]


* Recuperamos la <strong>flexibilidad</strong> de los RDD y los tipos y funciones definidos por el programador, pero también manteniendo los tipos predefinidos en <strong>SparkSQL</strong> (<em>Int, Long, String...</em>) y las funciones relacionales (<em>SELECT, COUNT, WHERE...</em>)


* Por otro lado, obtenemos <strong>parte</strong> de las optimizaciones de los DataFrames.


* DataSets pueden verse como un compromiso entre los RDD y los DataFrames.

#### Creando DataSets

* Los DataSets pueden crearse a partir de un RDD existente:

In [31]:
val ds = spark.createDataset(originalRDD)
ds

[value: string]

* Los DataSets pueden crearse a partir de un DataFrame existente a través de una <strong>conversión de tipos a medida</strong>:

In [127]:
val tweetDs = df.map(row => {
    // Warning: idOriginal puede estar vacío!!
    val idorig = if(row.getAs[String]("idOriginal")=="") 0 else row.getAs[String]("idOriginal").toLong
    
    Tweet(row.getAs[String]("idTweet").toLong, 
          row.getAs[String]("text"), 
          row.getAs[String]("date"), 
          row.getAs[String]("authorId").toLong, 
          idorig)
})

tweetDs.printSchema

root
 |-- idTweet: long (nullable = false)
 |-- text: string (nullable = true)
 |-- date: string (nullable = true)
 |-- AuthorId: long (nullable = false)
 |-- idOriginal: long (nullable = false)



<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/warning.png" align="left" width="50px"> 
        </td>
        <td style="border:none !important; text-align:left">
            <ul>
                <li>Este método no funciona bien en los <strong>notebook de Jupyter</strong></li>
                <li>Compruébalo en el laboratorio.</li>
            </ul>
        </td>
    </tr>
</table>

* Los DataSets pueden crearse a partir de un DataFrame existente mediante una <strong>conversión de tipos implícita</strong>:

In [139]:
val ds = tweetDF.as[Tweet]
ds.printSchema
ds.show

root
 |-- idTweet: long (nullable = true)
 |-- text: string (nullable = true)
 |-- date: string (nullable = true)
 |-- authorId: long (nullable = true)
 |-- idOriginal: long (nullable = true)

+------------------+--------------------+--------------------+----------+------------------+
|           idTweet|                text|                date|  authorId|        idOriginal|
+------------------+--------------------+--------------------+----------+------------------+
|915831976929714177|RT @Societatcc: A...|Thu Oct 05 08:52:...|2885455811|915523419281739776|
|915831940745441280|Yo ya he escogido...|Thu Oct 05 08:52:...|   2099361|                 0|
|915831968301973504|RT @pedroveraOyP:...|Thu Oct 05 08:52:...| 799792832|915830958443687936|
|915831985582612480|RT @Societatcc: A...|Thu Oct 05 08:52:...| 105157939|915523419281739776|
|915832004658286593|RT @pedroveraOyP:...|Thu Oct 05 08:52:...| 124248712|915830958443687936|
|915830958443687936|#AmicsAmigos no p...|Thu Oct 05 08:48:...| 

#### Jugando con DataSets

In [153]:
ds.groupByKey(t => t.idOriginal).                // RDD API!!
    count.show                                   // DataFrames API!!

+------------------+--------+
|             value|count(1)|
+------------------+--------+
|                 0|       4|
|915808416639143936|       1|
|915523419281739776|       2|
|915830958443687936|       2|
|915830945785237504|       1|
+------------------+--------+



In [196]:
val mentions = ds.flatMap(t => t.text.split(" ").map(w => w.replaceAll(":$",""))).filter(text => text.startsWith("@"))

mentions.distinct.show
mentions.groupBy($"value").count.show

+-------------+
|        value|
+-------------+
| @gsemprunmdg|
|@pedroveraOyP|
|    @carmouna|
| @radio3_rne…|
|  @Societatcc|
+-------------+

+-------------+-----+
|        value|count|
+-------------+-----+
| @gsemprunmdg|    1|
|@pedroveraOyP|    2|
|    @carmouna|    1|
| @radio3_rne…|    2|
|  @Societatcc|    2|
+-------------+-----+



In [184]:
mentions.groupBy($"value").agg(count($"value").as[Double]).show

+-------------+------------+
|        value|count(value)|
+-------------+------------+
| @gsemprunmdg|           1|
|@pedroveraOyP|           2|
|    @carmouna|           1|
| @radio3_rne…|           2|
|  @Societatcc|           2|
+-------------+------------+



In [217]:
val groupedMentions = ds.groupByKey(tweet => tweet.idOriginal)

groupedMentions.agg(count($"idTweet").as[Double]).show

+------------------+--------------+
|             value|count(idTweet)|
+------------------+--------------+
|                 0|             4|
|915808416639143936|             1|
|915523419281739776|             2|
|915830958443687936|             2|
|915830945785237504|             1|
+------------------+--------------+



<a name="subsubseccion-Uso de RDDs DataFrames y Datasets"></a>
### Uso de RDDs, DataFrames y Datasets


Entonces, ¿dónde debería usar RDD, Datasets o DataFrames en mi aplicación? Resumiremos las características de cada estructura de datos. Deberías usar...


* RDD cuando...

    - tus datos no están estructurados, por ejemplo, flujos binarios (media) o flujos de texto
    - deseas controlar tu conjunto de datos y usar transformaciones y acciones de bajo nivel
    - está bien pasar por alto las optimizaciones para DataFrames y Datasets para datos estructurados y semiestructurados que están disponibles de manera inmediata
    - no te importa el schema, el formato de columna y las construcciones de programación funcional que están listas para su uso


* DataFrames cuando...

    - tus datos están estructurados (entrada RDBMS) o semiestructurados (json, csv)
    - deseas obtener el mejor rendimiento obtenido del motor de ejecución optimizado de SQL
    - necesitas ejecutar consultas de hive
    - aprecias la API de 'domain specific language' (.groupBy, .agg, .orderBy)
    - estás usando R o Python


* DataSets cuando...

    - tus datos están estructurados o semiestructurados
    - aprecias la seguridad de tipos en tiempo de compilación y una API fuertemente tipada
    - necesitas un buen rendimiento (sobre todo mayor que con RDD), pero no el mejor (generalmente más bajo que con DataFrames)
