# Comenzando a programar con Python y Spark

## Estructuras de datos lógicas

### RDDs

* Resilient Distributed Dataset


* Colecciones paralelas para la computación distribuida de programación funcional


* Una colección de datos (tipados), que se puede distribuir fácilmente sobre los nodos 'worker', de manera que cada nodo se hace cargo de un trozo de todo el conjunto de datos a procesar.


* Un RDD es una referencia lógica a un conjunto de datos que es fragmentado a través de muchos servidores en el cluster de Spark.


* Los RDD son fragmentados y distribuidos sobre los nodos 'worker' en el cluster de Spark de forma automática (sin la intervención del programador). Ver la sección anterior sobre la estructura física de un cluster de Spark.


* El esquema de fragmentación puede modificarse, pero por defecto Spark intenta minimizar el tráfico de la red entre los nodos cuando procesa los RDD. Por ejemplo: en un entorno local, hay normalmente una partición por cada nodo 'worker' (los cores de la CPU disponibles para Spark).


In [1]:
from pyspark.sql import *

spark = SparkSession.builder.master("local").appName("NOMBRE").getOrCreate()

In [2]:
rdd = spark.sparkContext.parallelize(range(1,1001))
rdd.reduce(lambda x, y: x + y)

500500

#### Funciones para RDDs:

Agregaciones: conteo, suma...

In [3]:
rdd.count()

1000

In [4]:
rdd.sum()

500500

Funciones de Map-Reduce: map, reduce, filter, flatmap...

In [67]:
rdd_2 = rdd.map(lambda x: x*2)

In [69]:
rdd_2.reduce(lambda x, y: x + y)

999000

Sacar por pantalla los elementos del RDD:

In [126]:
print("Elementos del RDD: %s" % rdd)

Elementos del RDD: PythonRDD[37] at collect at <ipython-input-22-0fbf847bdd64>:1


¡Ups! Error típico cuando empezamos a usar Spark: para aplicar una función de este tipo (lo explicaremos más adelante) primero debemos "recopilar" los datos en el *driver*:

In [125]:
print("Elementos del RDD: %s" % rdd.collect())

Elementos del RDD: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218

#### Otra opción es usar *take* para especificar cuántos elementos queremos "recolectar":

In [68]:
print("elementos del RDD: %s" % rdd.take(20))
print("elementos del RDD: %s" % rdd_2.take(20))

elementos del RDD: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
elementos del RDD: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40]


## DataFrames

* Conceptualmente equivalente a una tabla SQL


* Los DataFrame están compuestos por <strong>filas</strong> (sin tipo)


* Perdimos la flexibilidad de los RDD (los tipos) y las funciones definidos por el programador, contra un conjunto de tipos predefinidos (dependientes del lenguaje que se utilice para interactuar con Spark) y funciones relacionales (<em>SELECT, COUNT, WHERE...</em>)


* Por otro lado, obtenemos enormes <strong>optimizaciones</strong> en términos de eficiencia en tiempo gracias a estas fuertes restricciones.


* Catalyst es el componente Spark a cargo de las optimizaciones de esos métodos.


**NOTA:** Existe otra estructura de datos en Spark, los Datasets, pero no están disponibles en lenguajes con tipificación débil, como Python o R, y sí en lenguajes de tipificación fuerte, como Java o Scala.

#### Ejemplo: Leyendo de un fichero Json con Pyspark


Fichero Json de entrada:

### Instrucciones en Spark para leer el fichero y construir un Dataframe:

In [5]:
# volcar archivo JSON a un dataframe
df = spark.read.json("test.txt")
df.printSchema()
df.show()

root
 |-- authorId: string (nullable = true)
 |-- date: string (nullable = true)
 |-- idOriginal: string (nullable = true)
 |-- idTweet: string (nullable = true)
 |-- text: string (nullable = true)

+----------+--------------------+------------------+------------------+--------------------+
|  authorId|                date|        idOriginal|           idTweet|                text|
+----------+--------------------+------------------+------------------+--------------------+
|2885455811|Thu Oct 05 08:52:...|915523419281739776|915831976929714177|RT @Societatcc: A...|
|   2099361|Thu Oct 05 08:52:...|                  |915831940745441280|Yo ya he escogido...|
| 799792832|Thu Oct 05 08:52:...|915830958443687936|915831968301973504|RT @pedroveraOyP:...|
| 105157939|Thu Oct 05 08:52:...|915523419281739776|915831985582612480|RT @Societatcc: A...|
| 124248712|Thu Oct 05 08:52:...|915830958443687936|915832004658286593|RT @pedroveraOyP:...|
| 110117638|Thu Oct 05 08:48:...|                  |91583

#### Vamos a jugar un poco con nuestro nuevo Dataframe

#### Seleccionar columnas

In [6]:
df.text

Column<'text'>

In [7]:
tweets = df.select(df.text)
tweets.show()

+--------------------+
|                text|
+--------------------+
|RT @Societatcc: A...|
|Yo ya he escogido...|
|RT @pedroveraOyP:...|
|RT @Societatcc: A...|
|RT @pedroveraOyP:...|
|#AmicsAmigos no p...|
|RT @gsemprunmdg: ...|
|RT @carmouna: Si ...|
|Si no lo arreglan...|
|La elegancia del ...|
+--------------------+



#### Filtra aquellos tuits que tengan cualquier hashtag

Con lo que sabemos hasta ahora, lo mejor es pasar un Dataframe a RDD y a partir de ahí actuar:

In [8]:
# RDD a partir de un Dataframe
testRDD = tweets.rdd.flatMap(list)

hashtagTweets = testRDD.filter(lambda t: "#" in t)
print(hashtagTweets.count())
print(hashtagTweets.collect())

6
['RT @pedroveraOyP: #AmicsAmigos no pelearsen que es muy #ranciofacts https://t.co/mjMhHQfHuB', 'RT @pedroveraOyP: #AmicsAmigos no pelearsen que es muy #ranciofacts https://t.co/mjMhHQfHuB', '#AmicsAmigos no pelearsen que es muy #ranciofacts https://t.co/mjMhHQfHuB', 'RT @gsemprunmdg: el desbroce    x      Davila\n\n#FelizJueves\n#AmicsAmigos\n#LaCafeteraPARLEM\n#DíaMundialDeLosDocentes https://t.co/trDDTvrjgr', 'RT @carmouna: Si no lo arreglan los que mandan, lo haremos todos nosotros. Juntos. Envía tu canción a #amicsamigos @radio3_rne… ', 'Si no lo arreglan los que mandan, lo haremos todos nosotros. Juntos. Envía tu canción a #amicsamigos @radio3_rne… https://t.co/ayQQEgCvVz']


### Pregunta: ¿Cómo podemos obtener los hashtags?

#### Recupera los tuits que han sido retuiteados

In [9]:
hashtagTweets = testRDD.filter(lambda t: t.startswith("RT"))
print(hashtagTweets.count())
print(hashtagTweets.collect())

6
['RT @Societatcc: Ayúdanos a difundir, necesitamos llegar a todos los rincones, no tenemos TV3 pero... ¡¡os tenemos a vosotros!!\n¿Com… ', 'RT @pedroveraOyP: #AmicsAmigos no pelearsen que es muy #ranciofacts https://t.co/mjMhHQfHuB', 'RT @Societatcc: Ayúdanos a difundir, necesitamos llegar a todos los rincones, no tenemos TV3 pero... ¡¡os tenemos a vosotros!!\n¿Com… ', 'RT @pedroveraOyP: #AmicsAmigos no pelearsen que es muy #ranciofacts https://t.co/mjMhHQfHuB', 'RT @gsemprunmdg: el desbroce    x      Davila\n\n#FelizJueves\n#AmicsAmigos\n#LaCafeteraPARLEM\n#DíaMundialDeLosDocentes https://t.co/trDDTvrjgr', 'RT @carmouna: Si no lo arreglan los que mandan, lo haremos todos nosotros. Juntos. Envía tu canción a #amicsamigos @radio3_rne… ']


## PairRDDs


* Intuición: versión paralela y distribuida de un Map


* Un RDD que contiene tuplas de (clave, valor)


* Muy útil porque los Map son una de las abstracciones de datos más utilizadas

### Caso de uso de PairRDDs: Contando palabras en un RDD


1.- Primero: vamos a dividir el contenido del RDD en palabras: usando <strong>flatMap</strong>

2.- Después, crea un PairRDD con: (Palabra, 1): usando <strong>map</strong>

3.- Finalmente, agrupa cada par en función de su primer componente (la palabra) y suma los segundos componentes (ocurrencias de las palabras): usando la función <strong>reduceByKey</strong> de los PairRDDs

In [62]:
# Mecanismo habitual para contar elementos mapeando un RDD a un PairRDD
countWords = testRDD.flatMap(lambda line: line.split(" ")).map(lambda w: (w, 1)).reduceByKey(lambda x, y: x + y)

print("Palabras leídas: %d" % countWords.count())

# Imprimiendo
print(countWords.take(15))

Palabras leídas: 70
[('RT', 6), ('@Societatcc:', 2), ('Ayúdanos', 2), ('a', 8), ('difundir,', 2), ('necesitamos', 2), ('llegar', 2), ('todos', 4), ('los', 4), ('rincones,', 2), ('no', 7), ('tenemos', 4), ('TV3', 2), ('pero...', 2), ('¡¡os', 2)]


## Vamos a pensarlo dos veces...


* El primer paso va desde un RDD[String] a un RDD[String]: flatMap divide cada <em>Tweet</em> en una Collection[<em>palabras</em>], y luego las aplana, obteniendo un RDD[<em>palabras</em>].


* El segundo paso va de un RDD[String], donde cada String es una palabra, a un RDD[(String, Int)], que es un PairRDD[(String, Int)].

* Finalmente, <strong>reduceByKey</strong> agrupa todas las tuplas con la misma palabra, sumando sus valores, produciendo un PairRDD[(String, Int)] que representa un RDD[<em>(palabra, ocurrenciasDePalabra)</em>]



## Pero, realmente... es un poco tedioso, ¿no es cierto?


   1.- No son buenos para procesar datos estructurados o semi-estructurados: 
    
   - En el ejemplo, intentamos leer un fichero **estructurado**, en formato Json
    
   - Así que perdimos toda esa <strong>información preciada</strong> (campos, valores, etc.) transformándolo en una colección (resiliente y distribuible) de strings.
    
   - Y luego usamos los mismos <strong>split-get-replace</strong> viejos y aburridos de la clase String para extraer las partes interesantes del string.


   2.- El shuffling puede convertirse en el cuello de botella de nuestra aplicación y a veces no es fácil de evitar.
    

<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/optimizar.png" align="left" width="50px"> 
        </td>
        <td style="border:none !important; text-align:left">
            <ul>
                <li>En relación al <strong>data shuffle</strong>, en la siguiente sección estudiaremos las operaciones básicas en Spark (<em>Transformaciones</em> y <em>Acciones</em>), sus efectos y la manera en que se gestionan en el cluster de Spark.</li>
                <li>Respecto al procesado de <strong>información estructurada y semi-estructurada</strong>: Spark ofrece una manera mucho mejor de lidiar con este tipo de datos a través de la librería <em>Spark SQL</em> y sus estructuras de datos relacionales: <em>DataFrames</em> y <em>Datasets</em>. Las estudiaremos más adelante.</li>
            </ul>
        </td>
    </tr>
</table>

<br><br>
<img src="images/sparkSQL.png" width="30%" align="left"/>





# Características de Spark SQL


* Librería de Spark que integra la sintaxis basada en SQL para realizar operaciones en datos distribuidos.


* Define estructuras de datos para facilitar la implementación de operaciones relacionales (select, group-by, order-by, max, min, average, count, etc.): DataFrames y Datasets.


* Estas estructuras de datos integran optimizaciones de rendimiento del álgebra relacional de SQL.



## Jugando con DataFrames y Spark SQL

### Los DataFrames pueden usarse casi como una base de datos relacional SQL:

In [78]:
# Registrar el DataFrame como una vista SQL temporal
df.createOrReplaceTempView("tweets")

# Seleccionar tweets que NO son retweets
originalTweetsQueryDF = spark.sql("SELECT * FROM tweets WHERE idOriginal LIKE ''")

originalTweetsQueryDF.show()

+---------+--------------------+----------+------------------+--------------------+
| authorId|                date|idOriginal|           idTweet|                text|
+---------+--------------------+----------+------------------+--------------------+
|  2099361|Thu Oct 05 08:52:...|          |915831940745441280|Yo ya he escogido...|
|110117638|Thu Oct 05 08:48:...|          |915830958443687936|#AmicsAmigos no p...|
|184865048|Thu Oct 05 07:18:...|          |915808416639143936|Si no lo arreglan...|
|142775869|Thu Oct 05 09:10:...|          |915836526789046273|La elegancia del ...|
+---------+--------------------+----------+------------------+--------------------+



* El equivalente, usando funciones Spark SQL:

In [102]:
# Seleccionar tweets que NO son retweets
df.select(df.idOriginal, df.date, df.authorId, df.idTweet, df.text).where("idOriginal LIKE''").show()

+----------+--------------------+---------+------------------+--------------------+
|idOriginal|                date| authorId|           idTweet|                text|
+----------+--------------------+---------+------------------+--------------------+
|          |Thu Oct 05 08:52:...|  2099361|915831940745441280|Yo ya he escogido...|
|          |Thu Oct 05 08:48:...|110117638|915830958443687936|#AmicsAmigos no p...|
|          |Thu Oct 05 07:18:...|184865048|915808416639143936|Si no lo arreglan...|
|          |Thu Oct 05 09:10:...|142775869|915836526789046273|La elegancia del ...|
+----------+--------------------+---------+------------------+--------------------+



#### Agregaciones:

* Una de las tareas más comunes con las bases de datos relacionales es agrupar y/o agregar atributos con ciertas condiciones para realizarle algunas acciones al resultado, como contar, sumar, calcular la media, etc.


* Spark SQL proporciona la función <strong>groupBy</strong>, que devuelve un <em>RelationalGroupedDataset</em>


* Este tipo tiene una serie de funciones de agregación relacional: sum, count, avg, max, min.

In [88]:
# Ejemplo de agrupación:
grouped = df.groupBy(df.idOriginal)

In [90]:
# Contando por idOriginal:
groupedCount = grouped.count()
groupedCount.printSchema()

root
 |-- idOriginal: string (nullable = true)
 |-- count: long (nullable = false)



In [97]:
# Ordenando los resultados
groupedCount.orderBy(groupedCount["count"].desc()).show()


+------------------+-----+
|        idOriginal|count|
+------------------+-----+
|                  |    4|
|915830958443687936|    2|
|915523419281739776|    2|
|915830945785237504|    1|
|915808416639143936|    1|
+------------------+-----+



In [101]:
# Average, max, min...
from pyspark.sql import functions as F
groupedCount.agg(F.avg(groupedCount["count"])).show()
groupedCount.agg(F.max(groupedCount["count"])).show()
groupedCount.agg(F.min(groupedCount["count"])).show()

+----------+
|avg(count)|
+----------+
|       2.0|
+----------+

+----------+
|max(count)|
+----------+
|         4|
+----------+

+----------+
|min(count)|
+----------+
|         1|
+----------+



### Con esto último que hemos aprendido, ¿cómo sacamos los tweets que tengan algún hashtag?

<br><br>
<img src="images/spark_transformations_actions.png" width="40%" align="left"/>





# Operaciones Básicas de Spark: Transformaciones y Acciones



Los RDD de Apache Spark soportan dos tipos de operaciones: Transformaciones y Acciones.



## Transformaciones


* Son funciones que producen nuevos RDD a partir de los ya existentes. Eejemplos: map(), filter().


* Dado que los RDD de entrada no pueden modificarse (son inmutables por naturaleza), cada vez que aplicamos una transformación se crean nuevos RDD.


* Las transformaciones se evalúan con evaluación "perezosa", lo que significa que no se ejecutan de inmediato. Una transformación se ejecuta efectivamente cuando llamamos a una acción.


* Por lo tanto, aplicar una (cantidad de) transformaciones no produce ningún efecto inmediato. En cambio, se crea un linaje de RDD, que va del RDD original (que invoca la primera transformación) a los RDD finales (resultado de todas las transformaciones). El linaje de RDD, representado por un <strong>DAG</strong> (Directed Acyclic Graph o Grafo acíclico dirigido), es un plan de ejecución lógica de todas las transformaciones.

Ejemplos de transformaciones y del DAG:

In [76]:
print(countWords.toDebugString().decode('UTF-8'))            # imprimir el plan de ejecución (DAG) de las transformaciones

(1) PythonRDD[107] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[99] at mapPartitions at PythonRDD.scala:133 []
 |  ShuffledRDD[98] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(1) PairwiseRDD[97] at reduceByKey at <ipython-input-62-5b85601e5432>:2 []
    |  PythonRDD[96] at reduceByKey at <ipython-input-62-5b85601e5432>:2 []
    |  MapPartitionsRDD[65] at javaToPython at NativeMethodAccessorImpl.java:0 []
    |  MapPartitionsRDD[64] at javaToPython at NativeMethodAccessorImpl.java:0 []
    |  SQLExecutionRDD[63] at javaToPython at NativeMethodAccessorImpl.java:0 []
    |  MapPartitionsRDD[62] at javaToPython at NativeMethodAccessorImpl.java:0 []
    |  FileScanRDD[61] at javaToPython at NativeMethodAccessorImpl.java:0 []


### Tipos de transformaciones:



* Transformaciones estrechas ('narrow'): no implican una mezcla de datos. Se pueden calcular por cada nodo 'worker' con sus propias particiones de datos.
     - Ejemplos: map, filter, flatMap, union, sample...



* Transformaciones amplias ('wide'): la lógica de procesamiento depende de los datos de múltiples particiones, por lo que es necesario combinar los datos para reunirlos en un solo lugar.
     - Ejemplos: distinct, join, reduceByKey, groupByKey...
   

<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/notepad.png" align="left" width="50px"> 
        </td>
        <td style="border:none !important; text-align:left">
            <ul>
                <li>Spark implementa un mecanismo para optimizar el plan de ejecución de las transformaciones con el fin de minimizar la combinación de datos ('data shuffling')</li>
                <li>Recuerda que las transformaciones son <strong>lazy</strong>: no se ejecutan cuando se declaran</li>
                <li>Una forma de realizar un conjunto de transformaciones es aplicar una acción al RDD de salida</li>
            </ul>
        </td>
    </tr>
</table>

<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/warning.png" align="left" width="50px"> 
        </td>
        <td style="border:none !important; text-align:left">
            <ul>
                <li>DAG es el mecanismo que permite que Spark sea tolerante a fallos, <strong>sin</strong> tener que escribir datos en el disco como una copia de seguridad</li>
                <li>Spark se recupera de las fallos volviendo a calcular las particiones perdidas, siguiendo el <strong>DAG</strong></li>
                <li>Es realmente <strong>rápido</strong> recuperar datos de transformaciones <strong>narrow</strong>, pero <strong>lento</strong> hacerlo de transformaciones <strong>wide</strong></li>
            </ul>
        </td>
    </tr>
</table>


## Acciones:



* Son operaciones Spark sobre RDD que producen valores que no son RDD.


* Los resultados de las acciones se almacenan en los nodos 'master' o en el sistema de almacenamiento externo. Por lo tanto, una acción es una de las maneras de enviar datos desde los nodos 'worker' al 'master'.


* Pone el modo 'lazy' de los RDD en movimiento, lo que significa que una acción provoca la ejecución de las transformaciones asociadas en el RDD.

* Ejemplos: count, collect, first, take...

### Repasemos el ejemplo de RDD desde un archivo de texto:

In [113]:
originalRDD = spark.sparkContext.textFile("test.txt")           # Leer fichero de texto plano

firstTransformation = originalRDD.map(lambda row: row.split("\","))

secondTransformation = firstTransformation.map(lambda row: row[1].replace("\"text\":\"", ""))

thirdTransformation = secondTransformation.filter(lambda text: "@" in text)

fourthTransformation = secondTransformation.flatMap(lambda text: text.split(" "))

fifthTransformation = fourthTransformation.filter(lambda word: word.startswith("#"))

sixthTransformation = fifthTransformation.map(lambda x: x.lower()).distinct()

<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/question.jpg" align="left" width="50px"> 
        </td>
        <td style="border:none !important; text-align:left">
            <ul>
                <li>¿Qué hemos hecho hasta el momento?</li>
                <li>¿Cuál es el contenido de cada RDD?</li>
            </ul>
        </td>
    </tr>
</table>


In [119]:
print(thirdTransformation.take(10))         # Transformación a computar: 3, 2 y 1
print()
print(fifthTransformation.take(10))         # Transformación a computar: 5, 4, 2 y 1
print()
print(sixthTransformation.take(10))         # Transformación a computar: 6, 5, 4, 2 y 1

['RT @Societatcc: Ayúdanos a difundir, necesitamos llegar a todos los rincones, no tenemos TV3 pero... ¡¡os tenemos a vosotros!!\\n¿Com… ', 'RT @pedroveraOyP: #AmicsAmigos no pelearsen que es muy #ranciofacts https://t.co/mjMhHQfHuB', 'RT @Societatcc: Ayúdanos a difundir, necesitamos llegar a todos los rincones, no tenemos TV3 pero... ¡¡os tenemos a vosotros!!\\n¿Com… ', 'RT @pedroveraOyP: #AmicsAmigos no pelearsen que es muy #ranciofacts https://t.co/mjMhHQfHuB', 'RT @gsemprunmdg: el desbroce    x      Davila\\n\\n#FelizJueves\\n#AmicsAmigos\\n#LaCafeteraPARLEM\\n#DíaMundialDeLosDocentes https://t.co/trDDTvrjgr', 'RT @carmouna: Si no lo arreglan los que mandan, lo haremos todos nosotros. Juntos. Envía tu canción a #amicsamigos @radio3_rne… ', 'Si no lo arreglan los que mandan, lo haremos todos nosotros. Juntos. Envía tu canción a #amicsamigos @radio3_rne… https://t.co/ayQQEgCvVz']

['#AmicsAmigos', '#ranciofacts', '#AmicsAmigos', '#ranciofacts', '#AmicsAmigos', '#ranciofacts', '#amics

<table align="left" style="border-collapse: collapse; border: none !important; width: 100%;">
    <tr style="border:none !important;">
        <td style="border:none !important; width: 60px;">
<img src="icons/optimizar.png" align="left" width="50px"> 
        </td>
        <td style="border:none !important; text-align:left">
            <ul>
                <li>Recuerda que las transformaciones se evaluan mediante evaluación perezosa, así que... </li>
                <li>Fíjate en que las transformaciones 2 y 1 se evalúan ¡¡tres veces!!</li>
                <li>Spark proporciona un mecanismo para ayudar a los programadores a evitar esta situación: <strong>caching</strong>. Vamos a reescribir nuestro código:</li>
            </ul>
        </td>
    </tr>
</table>

In [123]:
originalRDD2 = spark.sparkContext.textFile("test.txt")         # leer el fichero de texto plano

firstT = originalRDD2.map(lambda row: row.split("\","))

secondT = firstT.map(lambda row: row[1].replace("\"text\":\"", "")).cache()    # ¡¡Guardar el resultado en la caché!!

thirdT = secondT.filter(lambda text: "@" in text)

fourthT = secondT.flatMap(lambda text: text.split(" "))

fifthT = fourthT.filter(lambda word: word.startswith("#"))

sixthT = fifthT.map(lambda x: x.lower()).distinct()

In [124]:
print(thirdT.take(10))         # Transformación a computar: 3, 2 y 1, y guarda en la caché la transformación 2
print()
print(fifthT.take(10))         # Transformación a computar: 5 and 4 sobre la ya ya evaluada y guardada 2
print()
print(sixthT.take(10))         # Transformación a computar: 6, 5, 4 sobre la ya ya evaluada y guardada 2

['RT @Societatcc: Ayúdanos a difundir, necesitamos llegar a todos los rincones, no tenemos TV3 pero... ¡¡os tenemos a vosotros!!\\n¿Com… ', 'RT @pedroveraOyP: #AmicsAmigos no pelearsen que es muy #ranciofacts https://t.co/mjMhHQfHuB', 'RT @Societatcc: Ayúdanos a difundir, necesitamos llegar a todos los rincones, no tenemos TV3 pero... ¡¡os tenemos a vosotros!!\\n¿Com… ', 'RT @pedroveraOyP: #AmicsAmigos no pelearsen que es muy #ranciofacts https://t.co/mjMhHQfHuB', 'RT @gsemprunmdg: el desbroce    x      Davila\\n\\n#FelizJueves\\n#AmicsAmigos\\n#LaCafeteraPARLEM\\n#DíaMundialDeLosDocentes https://t.co/trDDTvrjgr', 'RT @carmouna: Si no lo arreglan los que mandan, lo haremos todos nosotros. Juntos. Envía tu canción a #amicsamigos @radio3_rne… ', 'Si no lo arreglan los que mandan, lo haremos todos nosotros. Juntos. Envía tu canción a #amicsamigos @radio3_rne… https://t.co/ayQQEgCvVz']

['#AmicsAmigos', '#ranciofacts', '#AmicsAmigos', '#ranciofacts', '#AmicsAmigos', '#ranciofacts', '#amics