In [1]:
from pyspark import SparkContext

In [2]:
sc = SparkContext.getOrCreate()
sc

# Creando un Pair RDD a partir de una lista de tuplas

In [3]:
lista_tuplas = [{'a',1}, {'b',2}, {'c', 3}]
lista_tuplas

[{1, 'a'}, {2, 'b'}, {'c', 3}]

In [4]:
pair_rdd = sc.parallelize(lista_tuplas)
pair_rdd

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

In [5]:
pair_rdd.collect()

[{1, 'a'}, {2, 'b'}, {'c', 3}]

# utilizando zip()

In [6]:
pair_rdd2 = sc.parallelize(zip(['a', 'b', 'c'], range(1,4,1)))

In [7]:
pair_rdd2.collect()

[('a', 1), ('b', 2), ('c', 3)]

# Ejemplo utizando un archivo

In [8]:
rdd_celestina = sc.textFile('La+Celestina.txt')

In [9]:
pair_rdd_celestina = rdd_celestina.map(lambda x : (x.split(' ')[0], x))

In [10]:
pair_rdd_celestina.collect()

[('**This',
  '**This is a COPYRIGHTED Project Gutenberg Etext, Details Below**'),
 ('The', 'The Project Gutenberg Etext of La Celestina by Fernando de Rojas'),
 ('', ''),
 ('Copyright', 'Copyright 1998 R. S. Rudder'),
 ('', ''),
 ('Please', 'Please take a look at the important information in this header.'),
 ('We', 'We encourage you to keep this file on your own disk, keeping an'),
 ('electronic',
  'electronic path open for the next readers.  Do not remove this.'),
 ('', ''),
 ('', ''),
 ('**Welcome',
  '**Welcome To The World of Free Plain Vanilla Electronic Texts**'),
 ('', ''),
 ('**Etexts',
  '**Etexts Readable By Both Humans and By Computers, Since 1971**'),
 ('', ''),
 ('*These', '*These Etexts Prepared By Hundreds of Volunteers and Donations*'),
 ('', ''),
 ('Information',
  'Information on contacting Project Gutenberg to get Etexts, and'),
 ('further',
  'further information is included below.  We need your donations.'),
 ('', ''),
 ('', ''),
 ('LA', 'LA CELESTINA [In Spanish

# keyBy() 

In [11]:
rdd = sc.parallelize(range(5))

In [12]:
rdd.collect()

[0, 1, 2, 3, 4]

In [13]:
pair_rdd = rdd.keyBy(lambda x: x+1)

In [14]:
pair_rdd.collect()

[(1, 0), (2, 1), (3, 2), (4, 3), (5, 4)]

# zipWithIndex()

In [15]:
rdd = sc.parallelize(['a', 'b', 'c', 'd', 'e'], 3)

In [16]:
pair_rdd = rdd.zipWithIndex()

In [17]:
pair_rdd.collect()

[('a', 0), ('b', 1), ('c', 2), ('d', 3), ('e', 4)]

# zipWithUniqueId

In [18]:
pair_rdd.zipWithUniqueId().glom().collect()

[[(('a', 0), 0)],
 [(('b', 1), 1), (('c', 2), 4)],
 [(('d', 3), 2), (('e', 4), 5)]]

In [19]:
rdd.zipWithUniqueId().collect()

[('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)]

# zip() con dos rdd

In [20]:
rdd1 = sc.parallelize(range(5), 3)

In [21]:
rdd2 = sc.parallelize(range(100,105), 3)

In [22]:
rdd1.glom().collect()

[[0], [1, 2], [3, 4]]

In [23]:
rdd2.glom().collect()

[[100], [101, 102], [103, 104]]

In [24]:
pair_rdd = rdd1.zip(rdd2)

In [25]:
pair_rdd.collect()

[(0, 100), (1, 101), (2, 102), (3, 103), (4, 104)]

In [26]:
pair_rdd.glom().collect()

[[(0, 100)], [(1, 101), (2, 102)], [(3, 103), (4, 104)]]

# Transformaciones de agregación para pair RDDs

# reduceByKey y foldByKey() - son transformaciones y no acciones

In [27]:
tupla = [('a', 1), ('b', 2), ('c', 3), ('a', 4), ('b', 5), ('c', 6), ('c', 7)]
tupla

[('a', 1), ('b', 2), ('c', 3), ('a', 4), ('b', 5), ('c', 6), ('c', 7)]

In [28]:
pair_rdd = sc.parallelize(tupla)

In [29]:
pair_rdd

ParallelCollectionRDD[21] at parallelize at PythonRDD.scala:195

In [30]:
pair_rdd.collect()

[('a', 1), ('b', 2), ('c', 3), ('a', 4), ('b', 5), ('c', 6), ('c', 7)]

In [31]:
from operator import add, sub

In [32]:
rdd_reduce = pair_rdd.reduceByKey(add)

In [33]:
rdd_reduce.collect()

[('a', 5), ('b', 7), ('c', 16)]

In [34]:
rdd_fold = pair_rdd.foldByKey(0, add).collect()

In [35]:
rdd_fold

[('a', 5), ('b', 7), ('c', 16)]

In [36]:
pair_rdd.collect()

[('a', 1), ('b', 2), ('c', 3), ('a', 4), ('b', 5), ('c', 6), ('c', 7)]

# groupByKey() agrupa los valores para cada clave()

In [37]:
rdd_group = pair_rdd.groupByKey()

In [38]:
rdd_group.collect()

[('a', <pyspark.resultiterable.ResultIterable at 0x7f9feafd2c88>),
 ('b', <pyspark.resultiterable.ResultIterable at 0x7f9feafd2518>),
 ('c', <pyspark.resultiterable.ResultIterable at 0x7f9feafd2ac8>)]

In [39]:
lista = [(x, list(y)) for x,y in rdd_group.collect()]

In [40]:
lista

[('a', [1, 4]), ('b', [2, 5]), ('c', [3, 6, 7])]

# combineByKey(createCombiner(), mergeValue(), mergeCombiners()) - parecido a aggregate

In [41]:
rdd_suma_cuenta = pair_rdd.combineByKey(lambda x : (x,1),
                                       lambda x, y: (x[0]+y, x[1]+1),
                                       lambda x, y: (x[0], y[0], x[1]+y[1]))

In [42]:
rdd_suma_cuenta.collect()

[('a', (5, 2)), ('b', (7, 2)), ('c', (16, 3))]

In [43]:
rdd_media = rdd_suma_cuenta.mapValues(lambda v: (v[0]/v[1]))
rdd_media.collect()

[('a', 2.5), ('b', 3.5), ('c', 5.333333333333333)]

# Transformaciones para claves o valores

In [44]:
pair_rdd.collect()

[('a', 1), ('b', 2), ('c', 3), ('a', 4), ('b', 5), ('c', 6), ('c', 7)]

In [45]:
pair_rdd.keys().collect() #Obtiene las claves de un RDD

['a', 'b', 'c', 'a', 'b', 'c', 'c']

In [46]:
pair_rdd.values().collect() #obtiene los valores de un rdd

[1, 2, 3, 4, 5, 6, 7]

In [47]:
pair_rdd.sortByKey().collect()

[('a', 1), ('a', 4), ('b', 2), ('b', 5), ('c', 3), ('c', 6), ('c', 7)]

# mapValues()- aplica la función sobre los valores 
# flatMapValues() - aplica la función sobre los valores y crea una lista simplificada

In [48]:
pair_rdd.mapValues(lambda x: (x, x*10)).collect()

[('a', (1, 10)),
 ('b', (2, 20)),
 ('c', (3, 30)),
 ('a', (4, 40)),
 ('b', (5, 50)),
 ('c', (6, 60)),
 ('c', (7, 70))]

In [49]:
pair_rdd.flatMapValues(lambda x: (x, x*10)).collect()

[('a', 1),
 ('a', 10),
 ('b', 2),
 ('b', 20),
 ('c', 3),
 ('c', 30),
 ('a', 4),
 ('a', 40),
 ('b', 5),
 ('b', 50),
 ('c', 6),
 ('c', 60),
 ('c', 7),
 ('c', 70)]

# Joins

In [50]:
rdd1 = sc.parallelize([('a',1),('b',2)])
rdd1.collect()

[('a', 1), ('b', 2)]

In [51]:
rdd2 = sc.parallelize([('a',4),('b',5), ('c',6)])
rdd2.collect()

[('a', 4), ('b', 5), ('c', 6)]

In [52]:
rdd1.join(rdd2).collect() # igual a un inner join

[('b', (2, 5)), ('a', (1, 4))]

In [53]:
rdd1.leftOuterJoin(rdd2).collect()

[('b', (2, 5)), ('a', (1, 4))]

In [54]:
rdd1.rightOuterJoin(rdd2).collect()

[('c', (None, 6)), ('b', (2, 5)), ('a', (1, 4))]

In [56]:
rdd1.fullOuterJoin(rdd2).collect()

[('c', (None, 6)), ('b', (2, 5)), ('a', (1, 4))]

# subtractByKey()

In [60]:
rdd1.subtractByKey(rdd2).collect()

[]

In [63]:
rdd2.subtractByKey(rdd1).collect()

[('c', 6)]

# cogroup()

In [65]:
rdd3 = rdd1.cogroup(rdd2)
rdd3.collect()

[('c',
  (<pyspark.resultiterable.ResultIterable at 0x7f9feafeee10>,
   <pyspark.resultiterable.ResultIterable at 0x7f9feb2ff6d8>)),
 ('b',
  (<pyspark.resultiterable.ResultIterable at 0x7f9feb2ff9b0>,
   <pyspark.resultiterable.ResultIterable at 0x7f9feb2ff898>)),
 ('a',
  (<pyspark.resultiterable.ResultIterable at 0x7f9feb2ff198>,
   <pyspark.resultiterable.ResultIterable at 0x7f9feb2ffa90>))]

In [67]:
rdd3.mapValues(lambda v: [list(l) for l in v]).collectAsMap()

{'a': [[1], [4]], 'b': [[2], [5]], 'c': [[], [6]]}

In [69]:
rdd3.mapValues(lambda v: [list(l) for l in v]).collect()

[('c', [[], [6]]), ('b', [[2], [5]]), ('a', [[1], [4]])]

# countByKey()

In [73]:
rdd2.countByKey()

defaultdict(int, {'a': 1, 'b': 1, 'c': 1})

# lookup()

In [77]:
rdd1.lookup('b')

[2]