RDDs con pares clave/valor (*Pair RDDs*)
--------------------------------------------

-   Tipos de datos muy usados en Big Data (MapReduce)

-   Spark dispone de operaciones especiales para su manejo
               

### Creación de *Pair RDDs*

-   A partir de una lista de tuplas

In [1]:
from test_helper import Test
from __future__ import print_function

prdd = sc.parallelize([('a',2), ('b',5), ('a',3)])
Test.assertEquals(prdd.collect(), [('a', 2), ('b', 5), ('a', 3)])
prdd = sc.parallelize(zip(['a', 'b', 'c'], range(3)))
Test.assertEquals(prdd.collect(), [('a', 0), ('b', 1), ('c', 2)])

1 test passed.
1 test passed.


-   A partir de otro RDD

In [3]:
linesrdd = sc.textFile("datos/quijote.txt")
prdd = linesrdd.map(lambda x: (x.split(" ")[0], x))
print("Par (1ª palabra, línea): {0}\n".format(prdd.takeSample(False, 1)[0]))

# keyBy(f): Crea tuplas de los elementos del RDD usando f para obtener la clave.
nrdd = sc.parallelize(xrange(2,5))
prdd = nrdd.keyBy(lambda x: x*x)
Test.assertEquals(prdd.collect(), [(4, 2), (9, 3), (16, 4)])

# zipWithIndex(): Zipea el RDD con los índices de sus elementos.
# Este método dispara un spark job cuando el RDD tiene más de una partición.
rdd = sc.parallelize(["a", "b", "c", "d", "e", "f", "g", "h"], 3)
prdd = rdd.zipWithIndex()
Test.assertEquals(prdd.collect(), [('a',0),('b',1),('c',2),('d',3),('e',4),('f',5),('g',6),('h',7)])

# zipWithUniqueId(): Zipea el RDD con identificadores únicos (long) para cada elemento.
# Los elementos en la partición k-ésima obtienen los ids k, n+k, 2*n+k,... siendo n = nº de particiones
# No dispara un trabajo spark
rdd = sc.parallelize(["a", "b", "c", "d", "e", "f", "g", "h"], 3)
print("Particionado del RDD: {0}".format(rdd.glom().collect()))
prdd = rdd.zipWithUniqueId()
Test.assertEquals(prdd.collect(), [('a',0),('b',3),('c',1),('d',4),('e',2),('f',5),('g',8),('h',11)])

Par (1ª palabra, línea): (u'-S\xed', u'-S\xed -respondi\xf3 don Quijote-, y muchos; y es raz\xf3n que los haya, para adorno')

1 test passed.
1 test passed.
Particionado del RDD: [['a', 'b'], ['c', 'd'], ['e', 'f', 'g', 'h']]
1 test passed.


- Mediante un zip de dos RDDs
    - Los RDDs deben tener el mismo número de particiones y el mismo número de elementos en cada partición

In [4]:
rdd1 = sc.parallelize(xrange(0,5), 2)
rdd2 = sc.parallelize(range(1000, 1005), 2)
prdd = rdd1.zip(rdd2)
Test.assertEquals(prdd.collect(), [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)])

1 test passed.


### Transformaciones sobre un RDD clave/valor

#### Transformaciones de agregación

-   `reduceByKey(func)`/`foldByKey(func)` devuelven un RDD con los valores con la misma
    clave reducidos usando `func`

In [5]:
from operator import add
prdd   = sc.parallelize([("a", 2), ("b", 5), ("a", 8)]).cache()
redrdd = prdd.reduceByKey(add)
Test.assertEquals(redrdd.collect(), [('a',10), ('b',5)])

1 test passed.


-   `groupByKey()` agrupa valores con la misma clave
    - Operación muy costosa en comunicaciones
    - Mejor usar operaciones de reducción

In [6]:
groupdd = prdd.groupByKey()
Test.assertEquals([(k, list(v)) for k,v in groupdd.collect()], [('a',[2,8]), ('b',[5])])

1 test passed.


- `combineByKey(createCombiner(func1), mergeValue(func2), mergeCombiners(func3))`
    - Función general para agregación por clave, similar a `aggregate`
    - Especifica tres funciones:

     1.  `createCombiner` al recorrer los elementos de cada partición, si
        nos encontramos una clave nueva se crea un acumulador y se
        inicializa con `func1`

     2.  `mergeValue` mezcla los valores de cada clave en cada partición usando `func2`

     3.  `mergeCombiners` mezcla los resultados de las diferentes
        particiones mediante `func3`

In [7]:
# Calcula la media por clave usando combineByKey()
l = sc.parallelize([("a",2), ("b",5), ("a",8), ("b", 6), ("b",1)])
# Para cada clave, suma y cuenta los valores
sumCount = l.combineByKey(
           (lambda x: (x, 1)),
           (lambda x, y: (x[0]+y, x[1]+1)),
           (lambda x, y: (x[0]+y[0], x[1]+y[1])))
Test.assertEquals(sumCount.collect(), [('a', (10, 2)), ('b', (12, 3))])

# Obtiene la media de los valores
m = sumCount.mapValues(lambda v: v[0]/v[1])
Test.assertEquals(m.collect(), [('a', 5), ('b', 4)])

1 test passed.
1 test passed.


#### Transformaciones sobre claves o valores

-   `keys()` devuelve un RDD con las claves
-   `values()` devuelve un RDD con los valores
-   `sortByKey()` devuelve un RDD clave/valor con las claves ordenadas

In [8]:
print("RDD completo: {0:>46s}".format(prdd.collect()))
print("RDD con las claves: {0:>25s}".format(prdd.keys().collect()))
print("RDD con los valores: {0:>18}".format(prdd.values().collect()))
print("RDD con las claves ordenadas: {0}".format(prdd.sortByKey().collect()))

RDD completo:                 [('a', 2), ('b', 5), ('a', 8)]
RDD con las claves:           ['a', 'b', 'a']
RDD con los valores:          [2, 5, 8]
RDD con las claves ordenadas: [('a', 2), ('a', 8), ('b', 5)]


-   `mapValues(func)` devuelve un RDD aplicando una función sobre los
    valores
-   `flatMapValues(func)` devuelve un RDD aplicando una función sobre
    los valores y “aplanando” la salida

In [9]:
mapv = prdd.mapValues(lambda x: (x, 10*x))
Test.assertEquals(mapv.collect(),  [('a', (2, 20)), ('b', (5, 50)), ('a', (8, 80))])
fmapv = prdd.flatMapValues(lambda x: (x, 10*x))
Test.assertEquals(fmapv.collect(), [('a', 2), ('a', 20), ('b', 5), ('b', 50), ('a', 8), ('a', 80)])

1 test passed.
1 test passed.


### Transformaciones sobre dos RDDs clave/valor

-   `join`/`rightOuterJoin`/`leftOuterJoin`/`fullOuterJoin` realiza
    inner/outer joins entre los dos RDDs

In [10]:
rdd1 = sc.parallelize([("a",2), ("b",5), ("a",8)]).cache()
rdd2 = sc.parallelize([("c",7), ("a",1)]).cache()

rdd3 = rdd1.join(rdd2)
Test.assertEquals(rdd3.collect(), [('a',(2,1)),('a',(8,1))])

rdd4 = rdd1.leftOuterJoin(rdd2)
Test.assertEquals(rdd4.collect(), [('a',(2,1)),('a',(8,1)),('b',(5,None))])

rdd5 = rdd1.rightOuterJoin(rdd2)
Test.assertEquals(rdd5.collect(), [('a',(2,1)),('a',(8,1)),('c',(None,7))])

rdd6 = rdd1.fullOuterJoin(rdd2)
Test.assertEquals(rdd6.collect(), [('a',(2,1)),('a',(8,1)),('c',(None,7)),('b',(5,None))])

1 test passed.
1 test passed.
1 test passed.
1 test passed.


-   `subtractByKey` elimina elementos con una clave presente en otro RDD

In [11]:
rdd7 = rdd1.subtractByKey(rdd2)
Test.assertEquals(rdd7.collect(), [('b', 5)])

1 test passed.


-   `cogroup` agrupa los datos que comparten la misma clave en ambos
    RDDs

In [12]:
rdd8 = rdd1.cogroup(rdd2)
Test.assertEquals(rdd8.mapValues(lambda v: [list(l) for l in v]).collectAsMap(),
                 {'a': [[2, 8], [1]], 'b': [[5], []],'c': [[], [7]]})

1 test passed.


### Acciones sobre RDDs clave/valor

-   `countByKey()` devuelve un mapa indicando el número de ocurrencias de cada clave

In [13]:
prdd = sc.parallelize([("a", 2), ("b", 5), ("a", 8)]).cache()
countMap = prdd.countByKey()
Test.assertEquals(countMap, {'a': 2, 'b': 1})

1 test passed.


-   `collectAsMap()` obtiene el RDD en forma de mapa

In [14]:
rddMap = prdd.collectAsMap()
Test.assertEquals(rddMap, {'a': 8, 'b': 5})

1 test passed.


-   `lookup(key)` devuelve una lista con los valores asociados con una clave

In [15]:
listA = prdd.lookup('a')
Test.assertEquals(listA, [2, 8])

1 test passed.


## Práctica 2

A partir del fichero cite75_99.txt obtener el número de citas que recibe cada patente.

In [434]:
rdd = sc.textFile("datos/patentes-mini/cite75_99.txt")

prdd = sc.parallelize(rdd.map(lambda x: (x.split(",")[1].encode('utf-8'),x.split(",")[0].encode('utf-8'))).collect()).cache()

ncitas = prdd.combineByKey(
           (lambda x: (x, 1)),
           (lambda x, y: (x[0]+y, x[1]+1)),
           (lambda x, y: (x[0]+y[0], x[1]+y[1]))).mapValues(lambda v: v[1])


Test.assertEquals(ncitas.lookup('3986997'), [2])
Test.assertEquals(ncitas.lookup('4418284'), [2])
Test.assertEquals(ncitas.lookup('4314227'), [3])
Test.assertEquals(ncitas.lookup('3911418'), [3])

1 test passed.
1 test passed.
1 test passed.
1 test passed.


A partir del fichero apat63_99.txt, obten la media de reivindicaciones ("claims", campo 8) de las patentes por países ("country", campo 4)

In [433]:
def pasarEntero(valor):
    if(valor == ""): return 0;
    else: return int(valor)

rdd = sc.textFile("datos/patentes-mini/apat63_99.txt")

prdd = rdd.map(lambda x: (x.split(",")[4].encode('utf-8'),x.split(",")[8].encode('utf-8')))

#print (prdd.collect())

prdd_filtrado = prdd.filter(lambda x: x[0] != '\"COUNTRY\"')

# Aplica un filtro para eliminar la cabecera y crea un RDD clave valor con (country, claims)
# además, hay que pasar los valores a entero para poder sumarlos
countryclaims = prdd_filtrado.map(lambda x: (x[0],pasarEntero(x[1])))
#print (countryclaims.collect())
#print (countryclaims.lookup('"GB"'))
# Usa combineByKey para que, para cada clave, sume y cuente los valores
sumCount = countryclaims.combineByKey(
           (lambda x: (x, 1)),
           (lambda x, y: (x[0]+y, x[1]+1)),
           (lambda x, y: (x[0]+y[0], x[1]+y[1])))

Test.assertEquals(sumCount.lookup('"GB"'), [(4650, 504)])
Test.assertEquals(sumCount.lookup('"US"'), [(135880, 11651)])

# Obtiene la media de los valores
mean = sumCount.mapValues(lambda v: float(v[0])/v[1])

Test.assertEquals(mean.lookup('"GB"'), [float(4650)/504])
Test.assertEquals(mean.lookup('"US"'), [float(135880)/11651])

1 test passed.
1 test passed.
1 test passed.
1 test passed.
