# Uso de RDD's en Spark

## Creación de un RDD 
Lo primero es obtener el objeto SparkContext. Recomendación personal: Hacerlo a partir del objeto SparkSession

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("01-rdd1") \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.dir", "file:///opt/spark/logs/history") \
    .config("spark.history.fs.logDirectory", "file:///opt/spark/logs/history") \
    .getOrCreate()

spark.version  # Verifica la versión de Spark

#spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### A partir de una colección local
Para crear un RDD a partir de una colección se usa el método parallelize. A continuación se muestran dos ejemplos: Uno para enteros y otro para strings

In [2]:
rdd_int = sc.parallelize(range(10))
rdd_st = sc.parallelize ("Big Data aplicado. Curso de especialización de Inteligencia Artificial y Big Data".split())

También hay algún parámetro opcional, como el número de particiones.

In [3]:
rdd_int_par = sc.parallelize(range(20),4)


rdd_st_par = sc.parallelize ("Big Data aplicado. Curso de especialización de Inteligencia Artificial y Big Data".split(),5)


### A partir de fuentes de datos
Para ello tenemos dos opciones:
- textfile: Para crear un RDD a partir de un archivo de texto
- wholeTextFiles: Para crear un RDD a partir de varios archivos de texto

In [11]:
rdd_file = sc.textFile("/home/jovyan/work/data/flight-data/csv/2015-summary.csv")
rdd_whole_files = sc.wholeTextFiles("/home/jovyan/work/data/flight-data/csv")

### A partir de DataFrames existentes
Una forma muy sencilla de crear RDD's es a partir de un DataFrame o DataSet existente (se verán en próximas sesiones).

In [5]:
spark.range(10).rdd.collect()

                                                                                

[Row(id=0),
 Row(id=1),
 Row(id=2),
 Row(id=3),
 Row(id=4),
 Row(id=5),
 Row(id=6),
 Row(id=7),
 Row(id=8),
 Row(id=9)]

Así obtenemos un RDD formado por objetos de tipo *Row*. Para poder manejar estos datos, es necesario convertir estos objetos tipo *Row* al tipo correcto o extraer los valores, como se muestra en el sigiuente ejemplo:

In [12]:
spark.range(10).toDF("id").rdd.map(lambda row: row[0]).collect()

                                                                                

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

## Acciones
A continuación vamos a ver algunas de las acciones más frecuentes:
### collect
Permite mostrar todos los elementos de un RDD


In [17]:
print (rdd_int.collect())
print (rdd_st.collect())
#print (rdd_file.collect())
#print (rdd_whole_files.collect())

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
['Big', 'Data', 'aplicado.', 'Curso', 'de', 'especialización', 'de', 'Inteligencia', 'Artificial', 'y', 'Big', 'Data']


### take
Permite obtener un número determinado de elmentos del RDD

In [15]:
print (rdd_int.take(3))
print (rdd_st.take(3))
#print (rdd_file.take(2))
#print (rdd_whole_files.take(2))

[0, 1, 2]
['Big', 'Data', 'aplicado.']


### count
Devuelve el número de elementos de un RDD

In [18]:
print (rdd_int.count())
print (rdd_st.count())
print (rdd_file.count())
print (rdd_whole_files.count())

10
12


25/03/10 13:04:03 WARN TaskSetManager: Lost task 0.0 in stage 24.0 (TID 50) (172.21.0.4 executor 1): java.io.FileNotFoundException: File file:/home/jovyan/work/data/flight-data/csv/2015-summary.csv does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
	at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
	at org.apache.hadoop.fs.ChecksumFileSystem.lambda$openFileWithOptions$0(ChecksumFileSystem.java:896)
	at org.apache.hadoop.util.LambdaUtils.eval(LambdaUtils.java:52)
	at org.apache.hadoop.fs.ChecksumFileSystem.openFileWithOptions(Checksum

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 24.0 failed 4 times, most recent failure: Lost task 0.3 in stage 24.0 (TID 57) (172.21.0.3 executor 0): java.io.FileNotFoundException: File file:/home/jovyan/work/data/flight-data/csv/2015-summary.csv does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
	at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
	at org.apache.hadoop.fs.ChecksumFileSystem.lambda$openFileWithOptions$0(ChecksumFileSystem.java:896)
	at org.apache.hadoop.util.LambdaUtils.eval(LambdaUtils.java:52)
	at org.apache.hadoop.fs.ChecksumFileSystem.openFileWithOptions(ChecksumFileSystem.java:894)
	at org.apache.hadoop.fs.FileSystem$FSDataInputStreamBuilder.build(FileSystem.java:4768)
	at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:115)
	at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:290)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:289)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:247)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:99)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.io.FileNotFoundException: File file:/home/jovyan/work/data/flight-data/csv/2015-summary.csv does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
	at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
	at org.apache.hadoop.fs.ChecksumFileSystem.lambda$openFileWithOptions$0(ChecksumFileSystem.java:896)
	at org.apache.hadoop.util.LambdaUtils.eval(LambdaUtils.java:52)
	at org.apache.hadoop.fs.ChecksumFileSystem.openFileWithOptions(ChecksumFileSystem.java:894)
	at org.apache.hadoop.fs.FileSystem$FSDataInputStreamBuilder.build(FileSystem.java:4768)
	at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:115)
	at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:290)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:289)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:247)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:99)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


In [10]:
rdd_st.count()

12

### reduce
Permite, mediante una función especificada por el programador reducir el RDD a un único valor. Esta función recibe dos parámetros y devuelve un único resultado. Se pueden emplear también funciones *lambda*.


In [11]:
# Sumamos todos los elementos
print(rdd_int.reduce (lambda x,y: x+y))

def word_length_reducer(word1,word2):
    if (len(word1) > len (word2)):
        return word1
    else:
        return word2


print (rdd_st.reduce (word_length_reducer))
print ( rdd_file.reduce (word_length_reducer)) # Cambiar?

45
especialización
"Bonaire, Sint Eustatius, and Saba",United States,58


### first
Devuelve el primer elemento de un RDD

In [12]:
rdd_int.first()

0

In [13]:
rdd_st.first()

'Big'

### max/min
Devuelve el valor máximo/mínimo de un RDD



In [14]:
rdd_int.min()

0

In [15]:
rdd_st.max()

'y'

## Transformaciones
### map
Permite aplicar una función especificada por el programador a cada uno de los elementos del RDD devolviendo un RDD del mismo tamaño que el original

In [16]:
rdd_int.map (lambda x: 2*x).collect()

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

In [17]:
rdd_st.map (lambda x: list(x)).collect()

[['B', 'i', 'g'],
 ['D', 'a', 't', 'a'],
 ['a', 'p', 'l', 'i', 'c', 'a', 'd', 'o', '.'],
 ['C', 'u', 'r', 's', 'o'],
 ['d', 'e'],
 ['e', 's', 'p', 'e', 'c', 'i', 'a', 'l', 'i', 'z', 'a', 'c', 'i', 'ó', 'n'],
 ['d', 'e'],
 ['I', 'n', 't', 'e', 'l', 'i', 'g', 'e', 'n', 'c', 'i', 'a'],
 ['A', 'r', 't', 'i', 'f', 'i', 'c', 'i', 'a', 'l'],
 ['y'],
 ['B', 'i', 'g'],
 ['D', 'a', 't', 'a']]

### flatMap
Permite realizar operaciones map que no sean 1:1. Por ejemplo, en el siguiente código aplicamos la misma función que en el caso anterior pero el resultado es muy distinto:


In [18]:
rdd_st.flatMap(lambda x: list(x)).collect()

['B',
 'i',
 'g',
 'D',
 'a',
 't',
 'a',
 'a',
 'p',
 'l',
 'i',
 'c',
 'a',
 'd',
 'o',
 '.',
 'C',
 'u',
 'r',
 's',
 'o',
 'd',
 'e',
 'e',
 's',
 'p',
 'e',
 'c',
 'i',
 'a',
 'l',
 'i',
 'z',
 'a',
 'c',
 'i',
 'ó',
 'n',
 'd',
 'e',
 'I',
 'n',
 't',
 'e',
 'l',
 'i',
 'g',
 'e',
 'n',
 'c',
 'i',
 'a',
 'A',
 'r',
 't',
 'i',
 'f',
 'i',
 'c',
 'i',
 'a',
 'l',
 'y',
 'B',
 'i',
 'g',
 'D',
 'a',
 't',
 'a']

### distinct
Elimina los duplicados

In [19]:
rdd_st.distinct().collect()

['Curso',
 'aplicado.',
 'especialización',
 'Big',
 'de',
 'y',
 'Inteligencia',
 'Artificial',
 'Data']

### filter
Permite seleccionar los elementos del RDD que cumplen determinada condición


In [20]:
rdd_int.filter(lambda x: x % 2 == 0).collect()

[0, 2, 4, 6, 8]

In [21]:
rdd_st.filter(lambda x: len(x) >= 5).collect()

['aplicado.', 'Curso', 'especialización', 'Inteligencia', 'Artificial']

### sortBy
Permite reordenar el RDD en función de un criterio que puede ser especificado mediante una función lambda. Si queremos hacer orden inverso tenemos que poner el valor en negativo.

In [22]:
# Orden ascendente
rdd_st.sortBy(lambda x: len(x)).collect()

# Orden descendente
# rdd_st.sortBy(lambda x: len(x)*-1).collect()


['y',
 'de',
 'de',
 'Big',
 'Big',
 'Data',
 'Data',
 'Curso',
 'aplicado.',
 'Artificial',
 'Inteligencia',
 'especialización']

### randomsplit
Permite dividir un RDD convirtiéndolo en un array de RDD’s en función de un array de pesos especificado por el programador

In [23]:
for rdd in rdd_int.randomSplit([0.4, 0.6]):
    print(rdd.collect())

for rdd in rdd_st.randomSplit([0.5,0.5]):
    print(rdd.collect())


[0, 2, 5, 7, 9]
[1, 3, 4, 6, 8]
['aplicado.', 'de', 'especialización', 'de', 'y', 'Big', 'Data']
['Big', 'Data', 'Curso', 'Inteligencia', 'Artificial']


## Otras operaciones
### foreachPartition
Permite especificar que función aplicar a cada partición

In [5]:
def f (iterator):
    for x in iterator:
        print(type(x))
        print (x)

rdd_int_par.pipe("wc -l").collect()
print(rdd_int_par.foreachPartition(f))


None


In [8]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
partitions = rdd.glom().collect()  # glom() devuelve una lista de particiones
for i, partition in enumerate(partitions):
    print(f"Partition {i}: {partition}")


Partition 0: []
Partition 1: []
Partition 2: []
Partition 3: [1]
Partition 4: []
Partition 5: []
Partition 6: []
Partition 7: [2]
Partition 8: []
Partition 9: []
Partition 10: []
Partition 11: [3]
Partition 12: []
Partition 13: []
Partition 14: []
Partition 15: [4]
Partition 16: []
Partition 17: []
Partition 18: []
Partition 19: [5]


### glom
La función *glom* convierte cada partición del DataFrame en Arrays. Es una función que puede ser muy útil, pero si los arrays son muy grandes podría causar errores.

In [41]:
rdd_st_par.glom().collect()

[['Big', 'Data'],
 ['aplicado.', 'Curso'],
 ['de', 'especialización'],
 ['de', 'Inteligencia'],
 ['Artificial', 'y', 'Big', 'Data']]

### Almacenar RDD's en archivos
Es posible almacenar los RDD's en archivos de texto. Dos métodos:
- **saveAsTextFile**: Almacena el RDD en archivos de texto. Hay que especificar la ruta y (opcionalmente) un códec de compresión:

In [42]:
rdd_st.saveAsTextFile("/home/jovyan/texto.txt")

- **saveAsPickleFile**: En un entorno *HDFS* una *secuenceFile* es un archivo de texto formado por pares clave-valor binarios. Este método permite escribir un RDD como un *sequenceFile*:

In [44]:
rdd_st.saveAsPickleFile("/home/jovyan/secuencia")

### Checkpointing
Permite almacenar estados intermedios para no tener que repetir la secuencia de operaciones desde el principio.

In [45]:
sc.setCheckpointDir("/home/jovyan/checkpoints")
rdd_st.checkpoint()

### MapPartitions
El método **pipe** nos permite enviar, de forma similar a las tuberías de un shell, RDD's a la entrada de un comando del SO. A continuación se muestra un ejemplo sencillo con el comando **wc**:

In [29]:
rdd_st_par.pipe("wc -l").collect()

['2', '2', '2', '2', '4']

Observando el código anterior destaca el hecho de que Spark ejecuta operaciones a nivel de partición. La función *map* es realmente un alias para ejecutar la función *mapPartitions* a nivel de fila. Este hecho hace posible ejecutar la operación *map* a nivel de fila, empleando la función *mapPartitions* e iteradores.

In [27]:
cosa = rdd_st_par.mapPartitions(lambda x: [list(x)])
print(type(cosa))
cosa.collect()


<class 'pyspark.rdd.PipelinedRDD'>


[['Big', 'Data'],
 ['aplicado.', 'Curso'],
 ['de', 'especialización'],
 ['de', 'Inteligencia'],
 ['Artificial', 'y', 'Big', 'Data']]