A continuación se muestra cómo crear un RDD en Spark:

# Diferentes formas de crear un RDD en Spark

In [62]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Sparkies').master('local[*]').getOrCreate()

In [63]:
sc = spark.sparkContext

In [64]:
#Crear un RDD vacío
rdd_1 = sc.emptyRDD()

In [65]:
#Crear un RDD con parallelize
rdd_2 = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 4)

#Número de particiones	
rdd_2.getNumPartitions()

4

In [66]:
#Número de elementos
rdd_2.count()

10

In [67]:
#Muestra los elementos del RDD
rdd_2.collect()  

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [68]:
#Muestra los elementos del RDD por particiones
rdd_2.glom().collect()

[[1, 2], [3, 4], [5, 6], [7, 8, 9, 10]]

In [69]:
#Crear un RDD a partir de un fichero
rdd_3 = sc.textFile("./work/sources/rdd_source.txt")

#Muestra los elementos del RDD
rdd_3.collect()

['Así podemos crear', 'un RDD desde un', 'archivo de texto!!!']

In [70]:
#Crear un RDD a partir de un dataframe
df = spark.createDataFrame([("a", 1), ("b", 2), ("c",  3), ("d", 4)], ["Col1", "Col2"])
rdd_5 = df.rdd

#Muestra los elementos del RDD  
rdd_5.collect()

[Row(Col1='a', Col2=1),
 Row(Col1='b', Col2=2),
 Row(Col1='c', Col2=3),
 Row(Col1='d', Col2=4)]

# Transformaciones en un RDD


In [71]:
#RDD usando map
rdd_4 = rdd_2.map(lambda x: x**2)

#Muestra los elementos del RDD
rdd_4.collect()

[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

In [72]:
#RDD usando filter
rdd_5 = rdd_2.filter(lambda x: x%2 == 0)

#Muestra los elementos del RDD
rdd_5.collect()

[2, 4, 6, 8, 10]

In [73]:
#RDD usando flatMap
rdd_6 = rdd_2.flatMap(lambda x: (x, x**2))

#Muestra los elementos del RDD
rdd_6.collect()

[1, 1, 2, 4, 3, 9, 4, 16, 5, 25, 6, 36, 7, 49, 8, 64, 9, 81, 10, 100]

In [74]:
#RDD usando coalesce
#Reduce el número de particiones
rdd_7 = rdd_2.coalesce(2)

#Número de particiones
rdd_7.getNumPartitions()

2

In [75]:
#RDD usando repartition
#Reduce el número de particiones
rdd_8 = rdd_2.repartition(2)

#Número de particiones
rdd_8.getNumPartitions()

2

In [76]:
#RDD usando reduceByKey
rdd_9 = rdd_2.map(lambda x: (x%2, x)).reduceByKey(lambda x,y: x+y)

#Muestra los elementos del RDD
rdd_9.collect()

[(0, 30), (1, 25)]

In [77]:
#Cree un RDD llamado lenguajes que contenga los siguientes lenguajes de programación: Python, R, C, Scala, Rugby y SQL.
lenguajes = sc.parallelize(["Python", "R", "C", "Scala", "Rugby", "SQL"])

In [78]:
lenguajes_upper = lenguajes.map(lambda x: x.upper())

#Muestra los elementos del RDD
lenguajes_upper.collect()

['PYTHON', 'R', 'C', 'SCALA', 'RUGBY', 'SQL']

In [79]:
lenguajes_lower = lenguajes.map(lambda x: x.lower())

#Muestra los elementos del RDD
lenguajes_lower.collect()

['python', 'r', 'c', 'scala', 'rugby', 'sql']

In [80]:
lenguajes_init_r = lenguajes.filter(lambda x: x.startswith("R"))

#Muestra los elementos del RDD
lenguajes_init_r.collect()

['R', 'Rugby']

In [81]:
#Cree un RDD llamado pares que contenga los números pares existentes en el intervalo [20;30].
pares = sc.parallelize(range(20,31)).filter(lambda x: x%2 == 0)

#Muestra los elementos del RDD
pares.collect()

[20, 22, 24, 26, 28, 30]

In [82]:
pares_sqrt = pares.map(lambda x: x**2)

#Muestra los elementos del RDD
pares_sqrt.collect()

[400, 484, 576, 676, 784, 900]

In [83]:
pares_list = pares.map(lambda x: (x, x**2))

#Muestra los elementos del RDD
pares_list.collect()

[(20, 400), (22, 484), (24, 576), (26, 676), (28, 784), (30, 900)]

In [84]:
pares_lista_2 = pares.flatMap(lambda x: (x, x**3))

#Muestra los elementos del RDD
pares_lista_2.collect()

[20, 8000, 22, 10648, 24, 13824, 26, 17576, 28, 21952, 30, 27000]

# Acciones en un RDD

In [98]:
#RDD usando reduce y reduceByKey
rdd_10 = sc.parallelize([1,2,3,4,5,6,7,8,9,10])

#Suma de los elementos del RDD
rdd_sum = rdd_10.reduce(lambda x,y: x+y)

#Muestra los elementos del RDD
rdd_sum

55

In [88]:
#RDD usando reduceByKey
rdd_11 = sc.parallelize([(1,2),(1,3),(2,4),(2,5),(3,6),(3,7),(4,8),(4,9),(5,10),(5,11)])

#Suma de los elementos del RDD
rdd_11.reduceByKey(lambda x,y: x+y).collect()

[(4, 17), (1, 5), (5, 21), (2, 9), (3, 13)]

In [None]:
#Usando count, take, max y saveAsTextFile
#Número de elementos del RDD
rdd_11.count()

#Muestra los 3 primeros elementos del RDD
rdd_11.take(3)

#Máximo del RDD
rdd_11.max()

#Mínimo del RDD
rdd_11.min()

#Muestra los elementos del RDD
rdd_11.collect()

#Guarda el RDD en un fichero
rdd_11.saveAsTextFile("./work/results/rdd_11.txt")

# Aspectos avanzados sobre RDD

In [None]:
from pyspark.storagelevel import StorageLevel
#Almacenamiento en caché
rdd_12 = sc.parallelize([1,2,3,4,5,6,7,8,9,10])

#Almacenamiento en caché
rdd_12.persist(StorageLevel.MEMORY_ONLY)

In [100]:
#Particionado del RDD
#Ejemplo de como funciona el particionado
rdd_13 = sc.parallelize(['x', 'y', 'z'])

#Número de particiones
rdd_13.getNumPartitions()

4

In [None]:
# Indice de la partición = hash(key) % numPartitions
# hash('x') = 120
# hash('y') = 121
# hash('z') = 122
# hash('x') % 4 = 0
# hash('y') % 4 = 1
# hash('z') % 4 = 2

In [None]:
#Broadcast variables
#Crea una variable broadcast
broadcast_var = sc.broadcast([1,2,3,4,5])

#Muestra los elementos de la variable broadcast
broadcast_var.value

#Accede a los elementos de la variable broadcast
broadcast_var.value[0]

#Destruye la variable broadcast
broadcast_var.destroy()

In [101]:
#Acumuladores
#Crea un acumulador
acumulador = sc.accumulator(0)

#Muestra el valor del acumulador
acumulador.value

#Incrementa el valor del acumulador
acumulador.add(1)

#Usa foreach para incrementar el valor del acumulador
rdd_14 = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
rdd_14.foreach(lambda x: acumulador.add(x))

#Muestra el valor del acumulador
acumulador.value

56