In [1]:
from pyspark import SparkContext

In [2]:
sc = SparkContext()

In [3]:
lista = [1,2,3,4,5,6,7,8,9,0]

# crear rdd con parallelize

In [4]:
rdd = sc.parallelize(lista)

In [5]:
 rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 0]

# filter -> filtra los elementos de un rdd

In [6]:
filtrado_rdd = rdd.filter(lambda n:n > 3)

In [7]:
filtrado_rdd.collect()

[4, 5, 6, 7, 8, 9]

In [8]:
filtrado_rdd_pares = rdd.filter(lambda n:n%2 == 0)

In [9]:
filtrado_rdd_pares.collect()

[2, 4, 6, 8, 0]

# map -> aplica una funcion a los elementos de un rdd

In [10]:
def addOne(x):
    return x+1

In [11]:
filtrado_sumado_rdd = rdd.map(addOne)

In [12]:
filtrado_sumado_rdd.collect()

[2, 3, 4, 5, 6, 7, 8, 9, 10, 1]

In [13]:
def removeOneIfElementIsPrime(x):
    if (x%2 != 0):
        return x
    else:
        return x-1

In [14]:
filtrado_multioperacion_rdd = rdd.map(removeOneIfElementIsPrime)

In [15]:
filtrado_multioperacion_rdd.collect()

[1, 1, 3, 3, 5, 5, 7, 7, 9, -1]

Aplicar mas de 1 función o map en una misma transformacion

In [17]:
cuadrado_rdd = (rdd.map(addOne).map(removeOneIfElementIsPrime))

In [18]:
cuadrado_rdd.collect()

[1, 3, 3, 5, 5, 7, 7, 9, 9, 1]

In [19]:
ejemplo_rdd = (rdd.map(addOne).map(lambda x: (x, x**2)))

In [20]:
ejemplo_rdd.collect()

[(2, 4),
 (3, 9),
 (4, 16),
 (5, 25),
 (6, 36),
 (7, 49),
 (8, 64),
 (9, 81),
 (10, 100),
 (1, 1)]

# flatmap -> es igual que map pero convierte el resultado a una lista simple

In [21]:
fm_rdd = (rdd.map(addOne).flatMap(lambda x: (x, x**2)))

In [22]:
fm_rdd.collect()

[2, 4, 3, 9, 4, 16, 5, 25, 6, 36, 7, 49, 8, 64, 9, 81, 10, 100, 1, 1]

# sample(withReplacement, fraction, seed)

withReplacement -> puede reeemplazar: false -> cada elemento se puede escoger una vez, true -> muchas veces
fraction -> el tamaño esperado de la muestra
seed -> generador automatico de nuemros aleatorios

In [23]:
cuadrado_rdd.sample(False, 1).collect()

[1, 3, 3, 5, 5, 7, 7, 9, 9, 1]

Los escogio todos debido a la fraction

In [25]:
cuadrado_rdd.sample(False, 0.5).collect()

[1, 3, 5, 1]

In [26]:
cuadrado_rdd.sample(False, 0.8).collect()

[3, 5, 7, 7, 9, 9]

In [27]:
cuadrado_rdd.sample(True, 2).collect()

[1, 1, 1, 3, 3, 3, 3, 3, 3, 5, 5, 5, 5, 5, 5, 7, 7, 7, 9, 9, 9, 9, 9, 1, 1]

In [29]:
sample_rdd = cuadrado_rdd.sample(True, 5)

In [30]:
sample_rdd.collect()

[1,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 3,
 3,
 3,
 3,
 3,
 3,
 3,
 3,
 3,
 5,
 5,
 5,
 5,
 5,
 5,
 5,
 5,
 5,
 9,
 9,
 9,
 9,
 9,
 9,
 9,
 9,
 9,
 9,
 9,
 9,
 1,
 1,
 1,
 1,
 1,
 1,
 1]

# distinct -> devuelve un nuevo rdd quitando duplicados

In [31]:
sample_rdd.distinct().collect()

[1, 3, 5, 9]

# groupBy -> devuelve rdd con datos agrupados (clave,valor)

In [32]:
rdd_grupo = rdd.groupBy(lambda x: x> 1)

In [33]:
rdd_grupo.collect()

[(False, <pyspark.resultiterable.ResultIterable at 0x7f4ec404b160>),
 (True, <pyspark.resultiterable.ResultIterable at 0x7f4ec404b9b0>)]

In [35]:
print([(x,sorted(y)) for (x,y) in rdd_grupo.collect()])

[(False, [0, 1]), (True, [2, 3, 4, 5, 6, 7, 8, 9])]


# transformaciones sobre 2 rdd

union rdda.union(rrdb)

In [36]:
rrda = sc.parallelize([1,2,3,4])
rrdb = sc.parallelize([5,6,7,8])

In [37]:
rrdu = rrda.union(rrdb)

In [38]:
rrdu.collect()

[1, 2, 3, 4, 5, 6, 7, 8]

interseccion rrda.intersection(rddb)

In [39]:
rrdu = rrda.intersection(rrdb)

In [40]:
rdda = sc.parallelize([1,2,3,4,5,6])

In [41]:
rrdu = rdda.intersection(rrdb)

In [42]:
rrdu.collect()

[6, 5]

subtract -> devuelve los datos de a menos los de b

In [45]:
rrds = rdda.subtract(rrdb)

rdda.union(rrdb)

In [46]:
rrds.collect()

[2, 4, 1, 3]

In [50]:
# cartesian() -> producto cartesiano -> operacion costosa

In [48]:
rddc = rrda.cartesian(rrdb)

In [49]:
rddc.collect()

[(1, 5),
 (1, 6),
 (1, 7),
 (1, 8),
 (2, 5),
 (2, 6),
 (2, 7),
 (2, 8),
 (3, 5),
 (3, 6),
 (3, 7),
 (3, 8),
 (4, 5),
 (4, 6),
 (4, 7),
 (4, 8)]