## Joins

In [2]:
people_data = [
    (1,'People A'),
    (2,'People B'),
    (3,'People C'),
    (4,'People D'),
    (5,'People E')
]

people = sc.parallelize(people_data)

In [3]:
people

ParallelCollectionRDD[1] at parallelize at PythonRDD.scala:195

In [4]:
people.collect()

[(1, 'People A'),
 (2, 'People B'),
 (3, 'People C'),
 (4, 'People D'),
 (5, 'People E')]

In [5]:
subject_data = [(1, 'Subject 1'),
               (2, 'Subject 2'),
               (200, 'Subject 1500'),
               (2, 'Subject 2 Repetido')]

subjects = sc.parallelize(subject_data)

In [6]:
subjects.collect()

[(1, 'Subject 1'),
 (2, 'Subject 2'),
 (200, 'Subject 1500'),
 (2, 'Subject 2 Repetido')]

## Inner Join (Join)

Cuando se llama para sets de datos del tipo (K,V) y (K,W) devuelve un set de datos del tipo (K, (V,W)) con todos los pares de elementos para cada key. (especificamente los que hay en comun por esa clave en ambos sets de datos)

In [7]:
people.join(subjects).collect()

[(1, ('People A', 'Subject 1')),
 (2, ('People B', 'Subject 2')),
 (2, ('People B', 'Subject 2 Repetido'))]

## Left Outer Join

Cuando se llama para sets de datos del tipo (K,V) y (K,W) devuelve un set de datos del tipo (K, (V,W)) asegurandonos que todos los datos del set de datos izquierdo estaran en el resultado del join.

In [8]:
people.leftOuterJoin(subjects).collect()

[(4, ('People D', None)),
 (1, ('People A', 'Subject 1')),
 (5, ('People E', None)),
 (2, ('People B', 'Subject 2')),
 (2, ('People B', 'Subject 2 Repetido')),
 (3, ('People C', None))]

## Right Outer Join

Cuando se llama para sets de datos del tipo (K,V) y (K,W) devuelve un set de datos del tipo (K, (V,W)) asegurandonos que todos los datos del set de datos derecho estaran en el resultado del join.

In [9]:
people.rightOuterJoin(subjects).collect()

[(200, (None, 'Subject 1500')),
 (1, ('People A', 'Subject 1')),
 (2, ('People B', 'Subject 2')),
 (2, ('People B', 'Subject 2 Repetido'))]

## Outer/Full Join

Cuando se llama para sets de datos del tipo (K,V) y (K,W) devuelve un set de datos del tipo (K, (V,W)) asegurandonos que todos los datos de ambos set de datos estaran aunque no haya match de keys.

In [10]:
people.fullOuterJoin(subjects).collect()

[(200, (None, 'Subject 1500')),
 (4, ('People D', None)),
 (1, ('People A', 'Subject 1')),
 (5, ('People E', None)),
 (2, ('People B', 'Subject 2')),
 (2, ('People B', 'Subject 2 Repetido')),
 (3, ('People C', None))]

## Broadcast Join (map-side join)

### Variable Broadcast

Una variable Broadcast nos permite mantener una variable solo lectura cacheada en cada una de las maquinas del cluster en vez de enviar esa informacion con cada una de las tareas que se envian al cluster.

Esto es particularmente util cuando cuando tareas a partir de multiples etapas (stages) necesitan la misma información o cuando cachear información de forma deserializada es importante.

Tener en cuenta que esto **es posible** cuando uno de los data sets o conjunto de datos **es lo suficientemente pequeño para ser broadcasteado a todos los nodos/workers del cluster**.

In [11]:
# Vamos a suponer que tenemos un RDD de productos por sus IDs identificando ventas de los mismos
prodsList = [1,11,1,4,5,11,2,3,4,5,6,4,5,4,3,2,1,11,2,3,4,5,6,4,3,2,1,1]
prods = sc.parallelize(prodsList,3)

In [12]:
# Un hash con los productos y sus nombres
productNames = {1:'papas',
                2:'cebollas',
                3:'tomates',
                4:'zanahorias',
                5:'batatas',
                6:'peras',
                7:'cilantro',
                8:'apio',
                9:'morrones',
                10:'manzanas',
                11:'naranjas'}

#hacemos un broadcast de la variable
bproductNames = sc.broadcast(productNames)

In [13]:
# Buscamos los productos que se vendieron 
# mas de 4 veces
popularProds = prods.map(lambda x:(x,1))\
    .reduceByKey(lambda x,y:x+y)\
    .filter(lambda x:x[1]>=4)
popularProds.collect()

[(3, 4), (1, 5), (4, 6), (2, 4), (5, 4)]

El join se realiza de forma implicita usando un map y dentro del mismo accediendo a la informacion de la variable a la que se realizo el broadcast via .value

In [14]:
# Buscamos los nombres
# map side-join
# a partir del broadcast la informacion de los nombres de los productos que fueron distribuidos
# a los nodos en el cluster y son consultados en el map (haciendo el join implicito)
popularProds = popularProds.map(
    lambda x:(bproductNames.value[x[0]],x[0],x[1]))
popularProds.collect()

[('tomates', 3, 4),
 ('papas', 1, 5),
 ('zanahorias', 4, 6),
 ('cebollas', 2, 4),
 ('batatas', 5, 4)]

### Ventajas

Cuando un valor es "broadcasteado" al cluster, este es copiado a los nodos/workers **sólo una vez** (en vez de múltiples veces si la información fuera a enviarse en cada task). De esta forma se resuelve la consulta más rapidamente.