# Proyecto: Uniones, Joins y Reducciones en RDDs

En la presente práctica veremos cómo:

* crear RDDs a partir de archivos .csv 

* filtrar registros en un RDD

* Unión entre dos RDDs. (Pegado vertical)

* JOIN (llave-valor) entre dos RDDs (Pegado Horizontal considerando una columna en común para dos RDDs)

* Accesar a las componentes de un RDD mediante corchetes, Ej: RDD[0][1][0]

* Extracción de valores en diccionarios (llave, valor)

Algunas funciones que veremos son:

`.filter(lambda x: 'some_value' not in x[k])` Filtra los registros que no contienen el valor 'some_value' en la k-ésima columna del RDD

`RDD1 = RDD1.union( RDD2 )` Une el RDD1 con el RDD2, extendiendo el número de registros (renglones) del RDD. Ambos RDDs deben tener el mismo número de columnas teniendo cuidado que el segundo RDD no tenga encabezado. 

`.takeSample()` Regresa una selección aleatoria de registros 

`.reduceByKey( add )` Regresa una lista de diccionarios de la forma (llave, número de valores correspondientes a la llave)

`.sortBy(lambda x: x[k], ascending=True)` Ordena un RDD con respecto a los valores de la k-ésima columna


In [1]:
from pyspark import SparkContext

In [2]:
# Inicializamos Contexto en nuestra computadora 'local'
sc = SparkContext(master='local', appName = 'transformacionesYAcciones')

## Creación de un RDD cargando un archivo .csv

In [3]:
# Visualizamos el contenido de la carpeta 'Data' 
!ls ./Data/

deporte.csv	 deportistaError.csv  modelo_relacional.jpg  salidatexto
deportista2.csv  evento.csv	      paises.csv
deportista.csv	 juegos.csv	      resultados.csv


In [4]:
# Ruta en donde se encuentran los datos con los que trabajaremos:
path = './Data/'

In [5]:
# Creamos RDDs con distintos archivos 'textFile'
# .map(lambda l : l.split(',')) <-- Le asigan formato al contenido de archivos .csv

equiposOlimpicosRDD = sc.textFile(path+'paises.csv').map(lambda l : l.split(','))

deportistaOlimpicoRDD = sc.textFile(path+'deportista.csv').map(lambda l : l.split(','))

deportistaOlimpicoRDD2 = sc.textFile(path+'deportista2.csv').map(lambda l : l.split(','))

resultado = sc.textFile(path+'resultados.csv').map(lambda l : l.split(','))


# deportista.csv <-- datos con encabezado
# deportista2.csv <-- continuación de datos sin encabezado

In [6]:
# Visualizamos los primeros N registros de 'equiposOlimpicosRDD'
# Hay que notar que el encabezado cuenta como primer registro
N=10
equiposOlimpicosRDD.take(N)

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG'],
 ['5', 'Afghanistan', 'AFG'],
 ['6', 'Akatonbo', 'IRL'],
 ['7', 'Alain IV', 'SUI'],
 ['8', 'Albania', 'ALB'],
 ['9', 'Alcaid', 'POR']]

In [7]:
# Visualizamos los primeros N registros de 'deportistaOlimpicoRDD'
deportistaOlimpicoRDD.take(N)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705'],
 ['6', 'Per Knut Aaland', '1', '31', '188', '75', '1096'],
 ['7', 'John Aalberg', '1', '31', '183', '72', '1096'],
 ['8', 'Cornelia Cor Aalten Strannood ', '2', '18', '168', '0', '705'],
 ['9', 'Antti Sami Aalto', '1', '26', '186', '96', '350']]

In [8]:
# Visualizamos los primeros N registros de 'deportistaOlimpicoRDD2'
deportistaOlimpicoRDD2.take(N)

[['67787', 'Lee BongJu', '1', '27', '167', '56', '970'],
 ['67788', 'Lee BuTi', '1', '23', '164', '54', '203'],
 ['67789', 'Anthony N. Buddy Lee', '1', '34', '172', '62', '1096'],
 ['67790', 'Alfred A. Butch Lee Porter', '1', '19', '186', '80', '825'],
 ['67791', 'Lee ByeongGu', '1', '22', '175', '68', '970'],
 ['67792', 'Lee ByeongGyu', '1', '21', '185', '82', '970'],
 ['67793', 'Lee ByeongIn', '2', '20', '174', '69', '970'],
 ['67794', 'Lee ByeongNam', '1', '24', '175', '75', '970'],
 ['67795', 'Lee ByeongYong', '1', '29', '197', '98', '970'],
 ['67796', 'Kar Wai Calvin Lee', '1', '29', '172', '68', '470']]

In [9]:
# Visualizamos los primeros N registros de 'deportistaOlimpicoRDD2'
resultado.take(N)

# 'NA' <-- indica que el jugador no ganó medallas

[['resultado_id', 'medalla', 'deportista_id', 'juego_id', 'evento_id'],
 ['1', 'NA', '1', '39', '1'],
 ['2', 'NA', '2', '49', '2'],
 ['3', 'NA', '3', '7', '3'],
 ['4', 'Gold', '4', '2', '4'],
 ['5', 'NA', '5', '36', '5'],
 ['6', 'NA', '5', '36', '6'],
 ['7', 'NA', '5', '38', '5'],
 ['8', 'NA', '5', '38', '6'],
 ['9', 'NA', '5', '40', '5']]

In [10]:
# Extraemos unicamente los registros que sí ganaron alguna medalla:
resultadoGanador = resultado.filter(lambda x: 'NA' not in x[1])

resultadoGanador.take(N)

[['resultado_id', 'medalla', 'deportista_id', 'juego_id', 'evento_id'],
 ['4', 'Gold', '4', '2', '4'],
 ['38', 'Bronze', '15', '7', '19'],
 ['39', 'Bronze', '15', '7', '20'],
 ['41', 'Bronze', '16', '50', '14'],
 ['42', 'Bronze', '17', '17', '21'],
 ['43', 'Gold', '17', '17', '22'],
 ['45', 'Gold', '17', '17', '24'],
 ['49', 'Gold', '17', '17', '28'],
 ['51', 'Bronze', '17', '19', '22']]

In [11]:
# realizamos un reconteo (con tiempo limitado) de los registros en los RDDs:

limit_time = 20 #<-- limite de tiempo en milisegundos

equiposOlimpicosRDD.countApprox(limit_time)

1185

In [12]:
deportistaOlimpicoRDD.countApprox(limit_time)

67787

In [13]:
deportistaOlimpicoRDD2.countApprox(limit_time)

67785

In [14]:
resultadoGanador.countApprox(limit_time)

39784

## Unión de dos RDDs

La unión entre 2 RDDs extiende el número de registros (renglones), por lo que ambos RDDs deben tener el mismo número de columnas (el segundo RDD debe ser continuación del primero)

In [15]:
# Unimos los 2 RDDs, deportistaOlimpicoRDD y deportistaOlimpicoRDD2
deportistaOlimpicoRDD = deportistaOlimpicoRDD.union( deportistaOlimpicoRDD2 )

In [16]:
deportistaOlimpicoRDD.countApprox(limit_time)

135572

## Join de dos RDDs

El JOIN entre dos RDDs extiende el número de columnas

In [17]:
# Primeros 5 registros del RDD 'deportistaOlimpicoRDD': 
deportistaOlimpicoRDD.take(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]

In [18]:
# Primeros 5 registros del RDD 'equiposOlimpicosRDD': 
equiposOlimpicosRDD.take(5)

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG']]

Notemos que la última columna x[-1] del RDD 'deportistaOlimpicoRDD' corresponde a la primer columna x[0] del RDD 'equiposOlimpicosRDD', 

dichas columnas serán las llaves por las que haremos el JOIN entre ambos RDDs

La forma de la función JOIN en términos de llave-valor, es:

`RDD1.map(lambda x: (columna_llave_RDD1,columnas_valores_RDD1) ).join( RDD2.map(lambda x: (columna_llave_RDD2,columnas_valores_RDD2) )).take(N)`

In [19]:
# Hacmemos un JOIN de 'deportistaOlimpicoRDD' y 'equiposOlimpicosRDD'
deportistaPaises = deportistaOlimpicoRDD.map(lambda x: (x[-1],x[: -1]) ).join(  # llave para conexión:'columna x[-1]', Valores: columnas x[: -1]
    equiposOlimpicosRDD.map(lambda x: (x[0],x[2]) )) # llave para conexión:'columna x[0]', Valores: columna x[2]

# Muestra los primeros N registros:
deportistaPaises.take(N)

[('199', (['1', 'A Dijiang', '1', '24', '180', '80'], 'CHN')),
 ('199', (['2', 'A Lamusi', '1', '23', '170', '60'], 'CHN')),
 ('199', (['602', 'Abudoureheman', '1', '22', '182', '75'], 'CHN')),
 ('199', (['1463', 'Ai Linuer', '1', '25', '160', '62'], 'CHN')),
 ('199', (['1464', 'Ai Yanhan', '2', '14', '168', '54'], 'CHN')),
 ('199', (['3605', 'An Weijiang', '1', '22', '178', '72'], 'CHN')),
 ('199', (['3610', 'An Yulong', '1', '19', '173', '70'], 'CHN')),
 ('199', (['3611', 'An Zhongxin', '2', '23', '170', '65'], 'CHN')),
 ('199', (['4639', 'Ao Changrong', '1', '25', '173', '71'], 'CHN')),
 ('199', (['4641', 'Ao Tegen', '1', '21', '181', '90'], 'CHN'))]

In [20]:
# Usamos .takeSample(False,N,25) para mostrar una muestra aletoria de tamaño N
# False <-- indica no se tomarán valores repetidos
# 25 <-- representa una semilla para generar muestras aleatorias

deportistaPaises.takeSample(False,N,25)

[('273',
  (['56134', 'Niels Christian Kold Jrgensen', '1', '21', '0', '0'], 'DEN')),
 ('624', (['13109', 'Harry Arthur Bonavia', '1', '20', '0', '0'], 'MLT')),
 ('399', (['98241', 'Eberhard Radzik', '1', '25', '168', '75'], 'GER')),
 ('96', (['83781', 'Albert Muylle', '1', '0', '0', '0'], 'BEL')),
 ('1096',
  (['101476', 'Clarence Franklin Robison', '1', '25', '192', '77'], 'USA')),
 ('967', (['18615', 'Lesley Carstens', '2', '27', '0', '0'], 'RSA')),
 ('362', (['78848', 'mile Mercier', '1', '0', '0', '0'], 'FRA')),
 ('810',
  (['94563', 'Andrzej Ryszard Pitkowski', '1', '22', '169', '68'], 'POL')),
 ('1096', (['114820', 'Jerome Steinert', '1', '28', '0', '0'], 'USA')),
 ('576', (['32099', 'Assaf ElMurr', '1', '24', '0', '0'], 'LIB'))]

In [21]:
# Hacemos un join simple entre 'deportistaPaises' y 'resultadoGanador'

deportistaPaises.join(resultadoGanador).take(N)

[('74',
  ((['65', 'Patimat Abakarova', '2', '21', '165', '49'], 'AZE'), 'Gold')),
 ('74', ((['129', 'Ruslan Abbasov', '1', '22', '181', '74'], 'AZE'), 'Gold')),
 ('74', ((['130', 'Tural Abbasov', '1', '18', '182', '76'], 'AZE'), 'Gold')),
 ('74', ((['131', 'Tran Abbasova', '2', '33', '159', '53'], 'AZE'), 'Gold')),
 ('74',
  ((['335', 'Abdulqdir Abdullayev', '1', '28', '188', '91'], 'AZE'), 'Gold')),
 ('74',
  ((['336', 'Arif Yadulla Abdullayev', '1', '27', '164', '60'], 'AZE'),
   'Gold')),
 ('74',
  ((['339', 'Namiq Yadulla Abdullayev', '1', '25', '167', '55'], 'AZE'),
   'Gold')),
 ('74',
  ((['340', 'Laye Abdullayeva', '2', '21', '170', '47'], 'AZE'), 'Gold')),
 ('74', ((['341', 'lqar Abdulov', '1', '27', '175', '74'], 'AZE'), 'Gold')),
 ('74', ((['450', 'Ceyhun Abiyev', '1', '29', '163', '48'], 'AZE'), 'Gold'))]

## Accesando a componetes de un RDD usando corchetes

In [22]:
# Definimos un diccionaro con valores para las clases de medallas
valoresMedallas = {'Gold':7 , 'Silver':5 , 'Bronze':4}

In [23]:
# Guardamos el JOIN simple entre 'deportistaPaises' y 'resultadoGanador'
paisesMedallas = deportistaPaises.join(resultadoGanador)

In [24]:
# Accedemos al valor de un registro del RDDs para ver su estructura:
r = paisesMedallas.take(1)[0]
r

('74', ((['65', 'Patimat Abakarova', '2', '21', '165', '49'], 'AZE'), 'Gold'))

In [25]:
# Accedemos a las componentes del valor del registro:

# si r = ('74', ((['65', 'Patimat Abakarova', '2', '21', '165', '49'], 'AZE'), 'Gold'))

# r[1] <-- accede a: ((['65', 'Patimat Abakarova', '2', '21', '165', '49'], 'AZE'), 'Gold')
# r[1][0] <-- accede a: (['65', 'Patimat Abakarova', '2', '21', '165', '49'], 'AZE')
# r[1][0][-1] <--accede a: 'AZE'
r[1][0][-1]

'AZE'

In [26]:
# Accedemos a las componentes del valor del registro:

# si r = ('74', ((['65', 'Patimat Abakarova', '2', '21', '165', '49'], 'AZE'), 'Gold'))

# r[1] <-- accede a: ((['65', 'Patimat Abakarova', '2', '21', '165', '49'], 'AZE'), 'Gold')
# r[1][1] <-- accede a: 'Gold'
r[1][1]

'Gold'

## Extracción de valores específicos en diccionarios (llave , valor)

In [27]:
# Extraemos las siglas del país y el tipo de medalla:
paisesMedallas.map(lambda x: (x[1][0][-1] , x[1][1]) ).take(N)

[('AZE', 'Gold'),
 ('AZE', 'Gold'),
 ('AZE', 'Gold'),
 ('AZE', 'Gold'),
 ('AZE', 'Gold'),
 ('AZE', 'Gold'),
 ('AZE', 'Gold'),
 ('AZE', 'Gold'),
 ('AZE', 'Gold'),
 ('AZE', 'Gold')]

In [28]:
# Extraemos las siglas del país y el valor de la medalla (accediendo a los valores del diccionario 'valoresMedallas'):
paisesMedallas = paisesMedallas.map(lambda x: (x[1][0][-1] , valoresMedallas[x[1][1]] ) )

paisesMedallas.take(N)

[('AZE', 7),
 ('AZE', 7),
 ('AZE', 7),
 ('AZE', 7),
 ('AZE', 7),
 ('AZE', 7),
 ('AZE', 7),
 ('AZE', 7),
 ('AZE', 7),
 ('AZE', 7)]

In [29]:
# A continuación sumamos el valor de las medallas acumuladas en cada país:
from operator import add

# Hacemos una reducción por llaves aplicando la operación suma 'add':
paisesMedallas.reduceByKey( add ).take(N)

[('ARG', 12520),
 ('FIN', 42),
 ('USA', 1342),
 ('GRE', 203),
 ('IRL', 14),
 ('STP', 52),
 ('BHU', 147),
 ('CAN', 32538),
 ('ANZ', 172),
 ('THA', 14)]

In [30]:
# Calculamos lo anterior pero ordenado de forma descendente de acuerdo al valor de la segunda componente 'x[1]'
conclusion = paisesMedallas.reduceByKey( add ).sortBy(lambda x: x[1], ascending=False)

conclusion.take(N)

[('CAN', 32538),
 ('ARG', 12520),
 ('HUN', 10860),
 ('MEX', 6124),
 ('RSA', 3788),
 ('BLR', 3580),
 ('LTU', 1535),
 ('MGL', 1460),
 ('USA', 1342),
 ('AZE', 1218)]

In [31]:
# Cerramos sesión para liberar memoria:
sc.stop()