# Ciencia de datos - Big Data

# Práctica 1.1 - Introducción a la programación en Apache Spark

En esta práctica es una introducción básica a los RDDs de Spark, de hecho, no es más que una guía. Sigue detenidamente todos los bloques y prueba a cambiar los valores establecidos para comprobar su funcionamiento.

Ten en cuenta que una vez tengas en marcha Spark, podrás visualizar la evolución de cada trabajo de Spark en  <http://localhost:4040>

## **Uso básico de los notebooks y su integración con Python**

In [2]:
# Esto es una celda ed Python, puedes ejecutar el código Python en estas celdas con ctrl + Intro
print('La suma de 1 y 1 es {0}'.format(1 + 1))

La suma de 1 y 1 es 2


In [3]:
# Otra celda Python que utiliza una variable x y un if
x = 28
if x > 18:
    print('La variable x es mayor que 18')

La variable x es mayor que 18


In [4]:
# Nueva celda, con misma funcionalidad que la anterior pero con otro formato
x = 28
if x > 18: print('La variable x es mayor que 18')

La variable x es mayor que 18


### Estado de un Notebook

El notebook tiene estado, implica que las variables y sus valores se mantienen hasta que el kernel del notebook se reinicia. Si no se ejecutan todas las celdas, las variables pueden no estar bien inicializadas y pueden provocar errores. Hay que prestar atención a esto si se ha modificado algún valor de una variable, ya que siempre se cargará el último valor ejecutado.

In [5]:
# Se utiliza la variable x definida en la celda anterior, si no ejecutamos la celda anaterior este código fallará
print(x * 2)

56


# Introducción a Spark

Inicializar el `SparkContext`

In [6]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("My App")
sc = SparkContext(conf = conf)

## Creación de un RDD
Podemos crear un RDD de dos formas

1. Cargando un conjunto de datos almacenado en un medio externo: `sc.textFile(fichero)`
2. Distribuyendo una colección de objetos existente: `sc.parallelize(colección_python)`

In [7]:
# Creación de un RDD desde un fichero
quijoteRDD = sc.textFile("./datos/pg2000.txt")

# Creación de un RDD desde una colección
datos = [1, 2, 3, 4, 5]
datoRDD = sc.parallelize(datos)

# Operaciones con RDDs

Podemos realizar dos tipos de operaciones sobre los RDDs
1. **Transformaciones:** Crean un nuevo RDD a partir de otro **(¡RECORDAR EVALUACIÓN VAGA!)** hasta que no se ejecuta una acción no se realiza la transformación
2. **Acciones:** Utilizan el RDD para lograr un resultado, recibido por el driver o escrito en el disco

## Transformaciones sobre RDDs

### Transformaciones básicas


Transformación | Descripción
------------- | -------------
*map(func)* | Devuelve un nuevo RDD formado aplicando a cada elemento del RDD original la función func
*filter(func)* | Devuelve un nuevo RDD formado por los elementos para los cuales el aplicarles la función func devuelve true
*distinct()* | Devuelve un nuevo RDD que contiene los elementos distintos dentro del RDD original
*flatMap(func)* | Similar al map, pero cada elemento de entrada puede ser mapeado a 0 o más elementos de salida (por tanto, func devuelve una secuencia en vez de un único elemento)



### `map(func)`

In [10]:
rdd = sc.parallelize([1, 2, 3, 4])
rdd.map(lambda x: x*2).collect() # Utilizamos collect() para ver el resultado en el driver, necesitamos de una acción para ver la transformación

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 5) (SERGIO-PC executor driver): java.io.IOException: Cannot run program "python3": CreateProcess error=2, El sistema no puede encontrar el archivo especificado
	at java.lang.ProcessBuilder.start(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:165)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.IOException: CreateProcess error=2, El sistema no puede encontrar el archivo especificado
	at java.lang.ProcessImpl.create(Native Method)
	at java.lang.ProcessImpl.<init>(Unknown Source)
	at java.lang.ProcessImpl.start(Unknown Source)
	... 16 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.IOException: Cannot run program "python3": CreateProcess error=2, El sistema no puede encontrar el archivo especificado
	at java.lang.ProcessBuilder.start(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:165)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.io.IOException: CreateProcess error=2, El sistema no puede encontrar el archivo especificado
	at java.lang.ProcessImpl.create(Native Method)
	at java.lang.ProcessImpl.<init>(Unknown Source)
	at java.lang.ProcessImpl.start(Unknown Source)
	... 16 more


### `filter(func)`

In [None]:
rdd.filter(lambda x: x != 1).collect()

In [None]:
rdd.filter(lambda x: x % 2 == 0).collect()

### `distinct()`

In [None]:
rdd = sc.parallelize([1, 4, 2, 2, 3])
rdd.distinct().collect()

### `map(func)` vs `flatMap(func)`
`map` devuelve tantos elementos como tenga el RDD original, mientras que `flatMap` devuelve una lista de elementos (que puede ser vacía o tener más de un elemento) y concatena todas las listas en un único RDD de elementos

In [10]:
rdd = sc.parallelize([1, 2, 3])
print("map: " + str(rdd.map(lambda x: [x, x+5]).collect()))
print("flatMap: " + str(rdd.flatMap(lambda x: [x, x+5]).collect()))

map: [[1, 6], [2, 7], [3, 8]]
flatMap: [1, 6, 2, 7, 3, 8]


In [11]:
lines = sc.parallelize(["hello world", "hi", "dime tu nombre", "hasta luego"])

wordsMap = lines.map(lambda line: line.split(" "))
wordsFlatMap = lines.flatMap(lambda line: line.split(" "))

print("map: " + str(wordsMap.collect()))
print("flatMap: " + str(wordsFlatMap.collect()))

map: [['hello', 'world'], ['hi'], ['dime', 'tu', 'nombre'], ['hasta', 'luego']]
flatMap: ['hello', 'world', 'hi', 'dime', 'tu', 'nombre', 'hasta', 'luego']


### Transformaciones con pseudo-conjuntos
Transformación | Descripción
------------- | -------------
*distinct()* | Devuelve el RDD sin elementos repetidos – ¡Cuidado! Requiere shuffle (enviar datos por red)
*union(rdd)* | Devuelve la unión de los elementos en los dos RDDs  (se mantienen los duplicados)
*intersection(rdd)* | Devuelve la instersección de los elementos en los dos RDDs (elimina los duplicados) – ¡Cuidado! Requiere shuffle (datos por red)
*subtract(rdd)* | Devuelve los elementos presentes en el primer RDD y no en el segundo – ¡Cuidado! También requiere de shuffle
*cartesian(rdd)* | Devuelve un RDD con todos los posibles pares entre elementos de ambos RDDs

In [12]:
rdd1 = sc.parallelize(["agua", "vino", "cerveza", "agua", "agua", "vino"])
rdd2 = sc.parallelize(["cerveza", "cerveza", "agua", "agua", "vino", "coca-cola", "naranjada"])

print("distinct: " + str(rdd1.distinct().collect()))
print("union: " + str(rdd1.union(rdd2).collect()))
print("intersection: " + str(rdd1.intersection(rdd2).collect()))
print("substract: " + str(rdd1.subtract(rdd2).collect()))
print("cartesian: " + str(rdd1.cartesian(rdd2).collect()))

distinct: ['agua', 'vino', 'cerveza']
union: ['agua', 'vino', 'cerveza', 'agua', 'agua', 'vino', 'cerveza', 'cerveza', 'agua', 'agua', 'vino', 'coca-cola', 'naranjada']
intersection: ['vino', 'cerveza', 'agua']


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 15.0 failed 1 times, most recent failure: Lost task 4.0 in stage 15.0 (TID 153, DESKTOP-1NS7HVN, executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:131)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174)
	... 14 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:168)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:131)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174)
	... 14 more


## Acciones sobre RDDs

### Acciones básicas


Acción | Descripción
------------- | -------------
reduce(func) | Agrega los elementos del RDD usando la función func. func toma dos argumentos y devuelve uno, y es conmutativa y asociativa de tal forma que puede calcularse correctamente en paralelo
*take(n)* | Devuelve una lista con los n primeros elementos del RDD
*collect()* | Devuelve todos los elementos del RDD como una lista. **CUIDADO: Hay que asegurarse de que vayan a caber en el driver**
*takeOrdered(n, key=func)* | Devuelve n elementos ordenados de manera ascendente o en el orden especificado por la función de orden opcional func
*foreach(func)* | Aplica la función func a cada elemento del RDD. No devuelve nada, puede valer para realizar inserciones a BBDD por ejemplo
*count()* | Cuenta el número de elementos en el RDD



In [16]:
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8])

prod = rdd.reduce(lambda a, b: a * b)
print("Resultado del producto: " + str(prod))  # función conmutativa y asociativa!!!
print("Dos primeros valores con take(2): " + str( rdd.take(2) ))
print("Todo el RDD con collect(): " + str( rdd.collect() ))

rdd = sc.parallelize([5, 3, 1, 2]) 
print("Los tres elementos más grandes con takeOrdered(3, func): " + str( rdd.takeOrdered(3, lambda s: s) ))

Resultado del producto: 40320
Dos primeros valores con take(2): [1, 2]
Todo el RDD con collect(): [1, 2, 3, 4, 5, 6, 7, 8]
Los tres elementos más grandes con takeOrdered(3, func): [1, 2, 3]


## Transformaciones clave-valor

### Transformaciones básicas


Transformación | Descripción
------------- | -------------
reduceByKey(func)  | Devuelve un nuevo RDD de tuplas (K, V) donde los valores para cada clave K son agregados usando una función de reducción func, cuyo tipo debe ser (V, V) à V
*sortByKey()* | Devuelve un nuevo RDD de tuplas (K, V) ordenadas por clave en orden ascendente
*groupByKey() * | Devuelve un nuevo RDD de tuplas (K, iterable(V)) **¡Cuidado! Puede ser muy costoso – datos por red**



In [14]:
rdd = sc.parallelize([(1, 2), (3, 4), (3, 6)]) 
print("reduceByKey: " + str( rdd.reduceByKey(lambda a, b: a + b).collect() ))

rdd2 = sc.parallelize([(1,'a'), (2,'c'), (1,'b')])
print("reduceByKey: " + str( rdd2.sortByKey().collect() ))
    
rdd2 = sc.parallelize([(1,'a'), (2,'c'), (1,'b')]) 
print("reduceByKey: " + str( rdd2.groupByKey().collect() ) )

reduceByKey: [(1, 2), (3, 10)]
reduceByKey: [(1, 'a'), (1, 'b'), (2, 'c')]
reduceByKey: [(1, <pyspark.resultiterable.ResultIterable object at 0x000001E17CA0AC10>), (2, <pyspark.resultiterable.ResultIterable object at 0x000001E17CA0ABE0>)]


### Transformaciones Join tipo SQL

Transformación | Descripción
------------- | -------------
join(rdd)  | Inner join entre dos RDDs
*leftOuterJoin(rdd)* | Realiza un join entre los dos RDDs donde la clave debe estar presente en el segundo de ellos
*rightOuterJoin(rdd)* | Realiza un join entre los dos RDDs donde la clave debe estar presente en el primero de ellos
*fullOuterJoin(rdd)* | Realiza un join entre los dos RDDs donde la clave debe estar presente alguno de ellos

### `join`

In [None]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
sorted(x.join(y).collect())

### `leftOuterJoin`

In [None]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
sorted(x.leftOuterJoin(y).collect())

### `rightOuterJoin`

In [None]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
sorted(x.rightOuterJoin(y).collect())

### `fullOuterJoin`

In [None]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("c", 8)])
sorted(x.fullOuterJoin(y).collect())

## Acciones clave-valor

### Acciones básicas


Acción | Descripción
------------- | -------------
countByKey() | Cuenta el número de elementos para cada clave
*collectAsMap()* | Recolecta el RDD como un Map para facilitar las búsquedas
*lookup(key)* | Devuelve el valor asociado con la clave dada




In [13]:
rdd = sc.parallelize([(1, 2), (3, 4), (3, 6)])

print("countByKey: " + str( rdd.countByKey() ))
print("collectAsMap: " + str( rdd.collectAsMap() ))
print("lookup(3): " + str( rdd.lookup(3) ))

countByKey: defaultdict(<class 'int'>, {1: 1, 3: 2})
collectAsMap: {1: 2, 3: 6}
lookup(3): [4, 6]


# Caché de RDDs
Si se va a reusar un RDD es conveniente cachearlo para que no se recalcule cada vez

In [None]:
quijoteRDD = sc.textFile("./datos/pg2000.txt")
palabrasQuijoteRDD = quijoteRDD.flatMap(lambda line: line.split(' ')).cache()
print("Cabeza aparece " + str( quijoteRDD.filter(lambda line: "cabeza" in line).count() ) + " veces")
print("Lanza aparece " + str( quijoteRDD.filter(lambda line: "Lanza" in line).count() ) + " veces")

# Aspectos avanzados

## Variables broadcast
Permiten enviar eficientemente valores de gran tamaño a los workers (solo lectura)

In [None]:
tablaLookUp = {1: "a", 2: "b", 3: "c", 4: "d"} # suponer que es muy grande

tablaLookUp[1]

In [None]:
tablaLookUp = {1: "a", 2: "b", 3: "c", 4: "d"} # suponer que es muy grande
rdd = sc.parallelize([1, 2, 3, 4])

sinBroadcast = rdd.map(lambda v: tablaLookUp[v]).collect()

tablaLookUpBroadcast = sc.broadcast(tablaLookUp) # creamos la variable tipo broadcast que se distribuye a los workers
conBroadcast = rdd.map(lambda v: tablaLookUpBroadcast.value[v]).collect() # con .value accedemos al valor de la variable broadcast

print("Sin broadcast: " + str(sinBroadcast))
print("Con broadcast: " + str(conBroadcast))

## Acumuladores
Agregan valores del os executors en el driver. Solo el driver puede leer las variables, para los workers son solo de escritura

In [None]:
accum = sc.accumulator(0)
rdd = sc.parallelize([1, 2, 3, 4])

def f(x):
    global accum 
    accum += x
    
rdd.foreach(f)
accum.value

In [None]:
quijoteRDD = sc.textFile("./datos/pg2000.txt")
#Creamos un Accumulator[Int] inicializado a 0
blankLines = sc.accumulator(0)

def extraePalabrasBlankLines(line):
    global blankLines # Hacemos la variable global accesible
    if (line == ""):
        blankLines += 1
    return line.split(" ")
    
palabrasQuijoteRDD = quijoteRDD.flatMap(extraePalabrasBlankLines)
# Provocamos que se ejecute la transformación
palabrasQuijoteRDD.count()
print("Líneas en blancos: %d" % blankLines.value)

## Trabajar con datos por particiones
Trabajamos con todos los datos de una partición a la vez.
En vez de aplicar una función por elemento se aplica una función para el iterador de elementos de la partición.
Permite evitar rehacer trabajos de configuración con cada elemento o trabajar con todos los elementos a la vez.

Transformación / Acción | Descripción
------------- | -------------
*mapPartitions(func)* | Aplica la función func a cada partición del RDD. La función func recibe un iterador de elementos y devuelve otro iterador con elementos que pueden ser de diferente tipo
*mapPartitionsWithIndex(func)* | Aplica la función func a cada partición del RDD. La función func recibe una tupla (entero, iterador) donde el entero representa el índice de la partición y el iterador contiene todos los elementos de la partición.
*foreachPartition(func)* | Aplica la función func a cada partición del RDD. No devuelve nada. Puede usarse para realizar inserciones en una BBDD por ejemplo. func recibe un iterador de elementos y no devuelve nada.


### Media sin `mapPartitions`

In [None]:
def combineCtrs(c1, c2):
    return (c1[0] + c2[0], c1[1] + c2[1])

def basicAvg(nums):
    """Compute the average"""
    sumCount = nums.map(lambda num: (num, 1)).reduce(combineCtrs)
    return sumCount[0] / float(sumCount[1])

a = sc.parallelize([1, 2, 3, 4, 5, 6, 7])
basicAvg(a)

### Media con `mapPartitions`

In [None]:
def partitionCtr(nums):
    """Compute sumCounter for partition"""
    sumCount = [0, 0]
    for num in nums:
        sumCount[0] += num
        sumCount[1] += 1
    return [sumCount]

def fastAvg(nums):
    """Compute the avg"""
    sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs)
    return sumCount[0] / float(sumCount[1])

a = sc.parallelize([1, 2, 3, 4, 5, 6, 7])
fastAvg(a)

### `mapPartitionsWithIndex`

In [None]:
parallel = sc.parallelize(range(1, 10), 3)
def show(index, iterator): 
    return ['index: ' + str(index) + " values: " + str(list(iterator))] # debemos devolver una lista ya que requiere un iterador

parallel.mapPartitionsWithIndex(show).collect()  


## Operaciones con RDD numéricos



### Método `stats()`
Método `stats()` devuelve un `StatsCounter` con todas las estadísticas calculadas mediante una única pasada por todo el RDD

In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7])
rdd.stats() # devuelve un StatsCounter

### Métodos propios sobre el RDD

Método (acción) | Descripción
------------- | -------------
*Método* | Descripción
*count()* | Número de elementos en el RDD
*mean()* | Media de los elementos en el RDD
*sum()* | Suma total de los elementos en el RDD
*max()* | Máximo valor
*min()* | Mínimo valor
*variance()* | Varianza de los elementos
*sampleVariance()* | Variance de los elementos calculada para una muestra
*stdev()* | Desviación estándar de los elementos
*sampleStdev()* | Desviación estándar para una muestra



In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7])

# ESTA FORMA ES MUCHO MENOS ÓPTIMA QUE STATS() DEBIDO A QUE SE REALIZA UNA PASADA POR EL DATASET PARA CADA ESTADÍSTICA
print("Count: " + str(rdd.count()))
print("Mean: " + str(rdd.mean()))
print("Sum: " + str(rdd.sum()))
print("Max: " + str(rdd.max()))
print("Min: " + str(rdd.min()))
print("Variance: " + str(rdd.variance()))
print("Smaple variance: " + str(rdd.sampleVariance()))
print("Standard deviation: " + str(rdd.stdev()))
print("Sample standard deviation: " + str(rdd.sampleStdev()))

# Lectura y escritura de ficheros

## Ficheros de texto

In [None]:
input1 = sc.textFile("datos/pg2000.txt")
input2 = sc.textFile("datos/")
input3 = sc.textFile("datos/*.txt")
input4 = sc.wholeTextFiles("datos/*.txt")

print("Elementos en RDD a partir de pg2000.txt: " + str(input1.count()))
# print "Elementos en RDD a partir de datos/: " + str(input2.count())    #  datasets muy grandes - mejor no esperar
print("Elementos en RDD a partir de datos/*.txt: " + str(input3.count()))
print("Elementos en RDD a partir de datos/.*txt con wholeTextFiles: " + str(input4.count()))

# CUIDADO, Falla si la carpeta ya existe
input1.saveAsTextFile("datos/salida")
print("Ver datos escritos en datos/salida")

## Ficheros JSON

In [None]:
import json
data = sc.textFile("datos/json.json").map(lambda x: json.loads(x))
print("Elementos en RDD a partir de datos/json.json: " + str(data.count()))


data.map(lambda x: json.dumps(x)).saveAsTextFile("datos/salida.json")
print("Ver datos escritos en datos/salida.json")

## Ficheros CSV

In [None]:
import csv
import StringIO
def loadRecord(line):
    """Parse a CSV line"""
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["nombre", "tel", "email"])
    return reader.next()

inputCSV = sc.textFile("datos/personas.csv").map(loadRecord)
print("Primeras 10 personas: ")
print(str(inputCSV.take(10)))


def writeRecords(records):
    """Write out CSV lines"""
    output = StringIO.StringIO()
    writer = csv.DictWriter(output, fieldnames=["nombre", "tel", "email"])
    for record in records:
        writer.writerow(record)
    return [output.getvalue()]
    
inputCSV.mapPartitions(writeRecords).saveAsTextFile("datos/salida.csv")
print("Ver datos escritos en datos/salida.csv")
