<a href="https://colab.research.google.com/github/RHCarrasco/PySpark/blob/main/PySpark_RDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark
from pyspark import SparkContext

In [2]:
sc=SparkContext()

In [23]:
textFile = sc.textFile('ejemplo.txt')

In [4]:
lista = [1,2,3,4,5]
rdd = sc.parallelize(lista)

#Acciones

In [None]:
#Devuelve el contenido del RDD
textFile.collect()

['primera linea', 'segunda linea', 'tercera linea', 'cuarta linea']

In [None]:
#Devuelve el numero de elementos del RDD
textFile.count()

4

In [None]:
#Devuelve el primer elemento del RDD
textFile.first()

'primera linea'

#Transformaciones

### filter() filtra los elementos de un RDD

In [None]:
#Filtra cualquier linea del RDD que tenga la palabra 'segunda'
segunda = textFile.filter(lambda linea: 'segunda' in linea)
segunda.collect()

['segunda linea']

In [None]:
filtrado_rdd = rdd.filter(lambda x: x > 1)
filtrado_rdd.collect()

[2, 3, 4, 5]

### map() aplica una función a todos los elementos de un RDD

In [None]:
#Suma 1 a todos los elmentos de un RDD
def suma1(x):
  return (x+1)

filtrado_sumado_rdd = filtrado_rdd.map(suma1)
filtrado_sumado_rdd.collect()

[3, 4, 5, 6]

In [None]:
#Haz el cuadrado de los elementos del RDD
cuadrado_rdd = filtrado_sumado_rdd.map(lambda x: (x, x**2))
print(cuadrado_rdd.collect())
#Tambien se podrian poner todas las operaciones anteriores seguidas
print(rdd.filter(lambda x:x>1).map(suma1).map(lambda x: (x, x**2)).collect())

[(3, 9), (4, 16), (5, 25), (6, 36)]
[(3, 9), (4, 16), (5, 25), (6, 36)]


### flatMap() es igual que map() pero te convierte el resultado a una lista simple

In [None]:
#Haz el cuadrado de los elementos del RDD
cuadrado_rdd = filtrado_sumado_rdd.flatMap(lambda x: (x, x**2))
cuadrado_rdd.collect()

[3, 9, 4, 16, 5, 25, 6, 36]

### union() une dos RDD

In [None]:
#Unir los RDD cuadrado_rdd y filtrado_sumado_rdd en un unico RDD
union = filtrado_sumado_rdd.union(cuadrado_rdd)
union.collect()

[3, 4, 5, 6, 3, 9, 4, 16, 5, 25, 6, 36]

### zip() crea un RDD de tipo clave valor a partir de una lista de tuplas

In [None]:
#Se puede crear una lista de tuplas a partir de una lista
lista_tuplas = [('a', 1), ('b', 2), ('c', 3)]
pair_rdd = sc.parallelize(lista_tuplas)
#Pero tambien se puede crear una lista de tuplas mediante zip
lista_tuplas2 = zip(['a','b','c'], range(1,4))
pair_rdd2 = sc.parallelize(lista_tuplas2)
pair_rdd2.collect()

[('a', 1), ('b', 2), ('c', 3)]

In [None]:
#Se puede utilizar tambien con dos RDD
#Tienen que tener el mismo numero de elementos(5),
#el mismo numero de particiones(3)
#y el mismo numero de elementos por particion.
rdd1 = sc.parallelize(range(5), 3)
rdd2 = sc.parallelize(range(100,105,1), 3)
print(rdd1.glom().collect())
print(rdd2.glom().collect())
#el resultado tendra el mismo numero de particiones y el mismo numero de elementos por particion
pair_rdd = rdd1.zip(rdd2)
pair_rdd.collect()

[[0], [1, 2], [3, 4]]
[[100], [101, 102], [103, 104]]


[(0, 100), (1, 101), (2, 102), (3, 103), (4, 104)]

### keyBy() crea una clave para cada valor del RDD

In [None]:
#Crear una lista clave valor en la que la clave sea la valor+1
rdd = sc.parallelize(range(5))
pair_rdd = rdd.keyBy(lambda x: x+1)
print(pair_rdd.collect())
#Sería igual que si hacemos un map en el que para cada elemento del rdd pongas una tupla de si mismo y su valor+1
print(rdd.map(lambda x: (x+1,x)).collect())

[(1, 0), (2, 1), (3, 2), (4, 3), (5, 4)]
[(1, 0), (2, 1), (3, 2), (4, 3), (5, 4)]


#Persistencia

###cache()  guarda por defecto en la memoria

In [5]:
rdd = sc.parallelize(range(100), 10)

In [6]:
rdd.is_cached

False

In [7]:
rdd.cache()

PythonRDD[4] at RDD at PythonRDD.scala:53

In [9]:
rdd.is_cached

True

###persist()  guarda donde lo indica el usuario

In [10]:
from pyspark import StorageLevel
rdd2 = rdd.map(lambda x:x*2)
rdd2.persist(StorageLevel.MEMORY_AND_DISK_DESER)

PythonRDD[5] at RDD at PythonRDD.scala:53

In [11]:
rdd2.is_cached

True

#Particionado

In [12]:
rdd = sc.parallelize([1,1,2,2,3,3,4,4], 4)

### getNumPartitions() obtenemos el numero de particiones

In [13]:
rdd.getNumPartitions()

4

### glom() obtenemos cómo están distribuidos los datos en las particiones

In [14]:
rdd.glom().collect()

[[1, 1], [2, 2], [3, 3], [4, 4]]

In [15]:
#El particionado se hereda
rdd2 = rdd.map(lambda x:x*2)
rdd2.glom().collect()

[[2, 2], [4, 4], [6, 6], [8, 8]]

### reduceByKey()

In [20]:
pair_rdd = rdd.map(lambda x:(x,x))
pair_rdd.glom().collect()

[[(1, 1), (1, 1)], [(2, 2), (2, 2)], [(3, 3), (3, 3)], [(4, 4), (4, 4)]]

In [21]:
pair_rdd.reduceByKey(lambda x,y: x+y).glom().collect()

[[(4, 8)], [(1, 2)], [(2, 4)], [(3, 6)]]

In [37]:
textFile.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a,b: a + b) \
             .collect()

[('primera', 1), ('tercera', 1), ('linea', 4), ('segunda', 1), ('cuarta', 1)]

In [36]:
textFile.flatMap(lambda line: line.split(" ")).count()

8

###repartition() devuelve un nuevo RDD que tiene n particiones

In [39]:
pair_rdd6 = pair_rdd.repartition(6)
pair_rdd6.getNumPartitions()

6

### coalesce() para reducir el numero de particiones

In [40]:
pair_rdd2 = pair_rdd6.coalesce(2)
pair_rdd2.getNumPartitions()

2

### partitionBy() utiliza una funcion de particion para RDDs de clave-valor

In [46]:
pair_rdd3=pair_rdd.partitionBy(3)
pair_rdd3.glom().collect()

[[(3, 3), (3, 3)], [(1, 1), (1, 1), (4, 4), (4, 4)], [(2, 2), (2, 2)]]

In [47]:
pair_rdd2=pair_rdd.partitionBy(2)
pair_rdd2.glom().collect()

[[(2, 2), (2, 2), (4, 4), (4, 4)], [(1, 1), (1, 1), (3, 3), (3, 3)]]