# RDDs + Programación Funcional

# Creamos un contexo para crear RDDs

In [1]:
import pandas as pd
!pip install pyspark --quiet
from pyspark import SparkContext
sc = SparkContext(master = "local", appName = "Transformaciones sobre un RDD")

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


# Cargamos un RDDs

Para la realzación del ejercicio, con ayuda del archivo paises, realizaremos el equivalente de operaciones 'select','count','group by' y 'filter / where'

Cambia el valor de la ruta para que apunte a la ruta donde tienes los datos

In [2]:

#LEER CSV
equiposOlimpicosRDD = sc.textFile("/content/paises.csv").map(lambda line : line.split(","))

In [3]:
equiposOlimpicosRDD.take(10)

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG'],
 ['5', 'Afghanistan', 'AFG'],
 ['6', 'Akatonbo', 'IRL'],
 ['7', 'Alain IV', 'SUI'],
 ['8', 'Albania', 'ALB'],
 ['9', 'Alcaid', 'POR']]

### Deshacemos un RDD

Con ayuda del método `collect()`, permite desparalelizar un RDD.


In [4]:
#equiposOlimpicosRDD.collect()

Creamos un dataframe a partir de un RDD mediante `pd.DataFrame()`

In [5]:
pd.DataFrame(equiposOlimpicosRDD.collect()[1:], columns = equiposOlimpicosRDD.collect()[0] )

Unnamed: 0,id,equipo,sigla
0,1,30. Februar,AUT
1,2,A North American Team,MEX
2,3,Acipactli,MEX
3,4,Acturus,ARG
4,5,Afghanistan,AFG
...,...,...,...
1179,1180,Yugoslavia-2,YUG
1180,1181,Zambia,ZAM
1181,1182,Zefyros,GRE
1182,1183,Zimbabwe,ZIM


# Operaciones sobre un RDDs

Para la realzación del ejercicio, con ayuda del archivo paises, realizaremos el equivalente de operaciones `select`,`count`,`group by` , `filter` y `where`

### Ejercicio 1:
**Mostrar los países sin repetidos**

*   Seleccionar la columna de las siglas PISTA: Usa un map y quedate con la columna con indice 2.
*   Usamos el `distinct()`



In [6]:
#Inserta aquí tu código
#solo mediante pyspark se hace con mapas
columna_siglas = equiposOlimpicosRDD.map(lambda fila : fila[2] )
unicos = columna_siglas.distinct()
unicos.take(5)

['sigla', 'AUT', 'MEX', 'ARG', 'AFG']

In [None]:
#forma abreviada
equiposOlimpicosRDD.map(lambda fila: fila[2] ).distinct().take(5)

### Ejercicio 2
**Contar la cantidad de siglas diferentes de los equipos olímpicos existentes**

PISTA: Puedes usar `count()`

In [38]:
#Inserta aquí tu código
#forma corta
unicos.count()

#forma larga
columna_siglas = equiposOlimpicosRDD.map(lambda fila : fila[2] )
conteo = columna_siglas.distinct()
conteo.count()

231

### Ejercicio 3 (Díficil, puedes hacerlo el último)
**Agrupamos datos para poder determinar cuantos equipos posee un pais**

Recuerda que los RDDs poseen una estructura de `clave-valor`, por lo cual debemos poner primero el valor 'clave' (la sigla del país).

Con 'mapValues', al componente 'valor' le indicamos que operación deseamos que se le aplique.

**Objetivo 1**: Tener un diccionario con `clave` la sigla del país y `valor` la lista de equipos de ese país

Con `groupByKey` podemos agrupar por `clave`, en este caso por sigla del país  

In [8]:
#Inserta aquí tu código
equiposOlimpicosRDD.map(lambda fila : (fila[2], fila[1])).groupByKey().mapValues(list)

**Objetivo 2**: Tener un diccionario con `clave` la sigla del país y `valor` la cantidad de equipos de ese país

In [9]:
#Inserta aquí tu código


### Operación filter para obtener un subconjunto

Con el método 'filter', reducimos el conjuntos de equipos.

Nos quedamos con la sigla equivalente de argentina

In [10]:
equiposArgentinos = equiposOlimpicosRDD.filter(lambda l : "ARG" in l)
equiposArgentinos.collect()

[['4', 'Acturus', 'ARG'],
 ['37', 'Antares', 'ARG'],
 ['42', 'Arcturus', 'ARG'],
 ['43', 'Ardilla', 'ARG'],
 ['45', 'Argentina', 'ARG'],
 ['46', 'Argentina-1', 'ARG'],
 ['47', 'Argentina-2', 'ARG'],
 ['119', 'Blue Red', 'ARG'],
 ['238', 'Covunco III', 'ARG'],
 ['252', 'Cupidon III', 'ARG'],
 ['288', 'Djinn', 'ARG'],
 ['436', 'Gullvinge', 'ARG'],
 ['644', 'Matrero II', 'ARG'],
 ['672', 'Mizar', 'ARG'],
 ['774', 'Pampero', 'ARG'],
 ['843', 'Rampage', 'ARG'],
 ['1031', 'Tango', 'ARG'],
 ['1162', 'Wiking', 'ARG']]

# Accciones sobre RDDs

In [11]:
deportistaOlimpicoRDD = sc.textFile("/content/deportista.csv").map(lambda line : line.split(","))
deportistaOlimpico2RDD = sc.textFile("/content/deportista2.csv").map(lambda line : line.split(","))

In [12]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.union(deportistaOlimpico2RDD)

### Formas de visualizar datos de un RDDs

La operación 'take' nos devuelve 'N' valores que encuentre spark.

La operación 'top', previo ordena respecto al valor llave y nos devuelve 'N' valores.

La operacion 'takeSample', nos devuelve una muestra aleatoria de los valores, Observa que recibe tres parametros

| Orden | Argumento | Descripción | Valor
|-------|--------|-----|--------|
|1|withReplacement|Indica si la muetra podrá traer replicados|Bool|
|2| num| Cantidad de valores a retornar|int|
|3|seed|semilla para el generador aleatorio|int|

Nota: Si encuentras complicado leer el código en los segmentos donde usamos indices en las listas, ejecutalo por partes para que visualizes que componentes seleccionamos

Nota: Para hacer join con RDDs, debemos tener selecionada la llave al inicio del RDD para poder realizar el cruce.

In [13]:
deportistaOlimpicoRDD.take(3)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199']]

In [14]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).take(3)

[['equipo_id',
  ['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso']],
 ['199', ['1', 'A Dijiang', '1', '24', '180', '80']],
 ['199', ['2', 'A Lamusi', '1', '23', '170', '60']]]

In [15]:
equiposOlimpicosRDD.map(lambda x : [x[0],x[2]]).take(3)

[['id', 'sigla'], ['1', 'AUT'], ['2', 'MEX']]

In [16]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                        .map(lambda x : [x[0],x[2]])).take(6)

[('199', (['1', 'A Dijiang', '1', '24', '180', '80'], 'CHN')),
 ('199', (['2', 'A Lamusi', '1', '23', '170', '60'], 'CHN')),
 ('199', (['602', 'Abudoureheman', '1', '22', '182', '75'], 'CHN')),
 ('199', (['1463', 'Ai Linuer', '1', '25', '160', '62'], 'CHN')),
 ('199', (['1464', 'Ai Yanhan', '2', '14', '168', '54'], 'CHN')),
 ('199', (['3605', 'An Weijiang', '1', '22', '178', '72'], 'CHN'))]

In [17]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                        .map(lambda x : [x[0],x[2]])).top(5)

[('999', (['92679', 'Trygve Bjarne Pedersen', '1', '35', '0', '0'], 'NOR')),
 ('999', (['1144', 'Henrik Agersborg', '1', '47', '0', '0'], 'NOR')),
 ('999', (['10765', 'Einar Berntsen', '1', '28', '0', '0'], 'NOR')),
 ('998',
  (['111659', 'G. Bernard Bernie Skinner', '1', '34', '182', '82'], 'CAN')),
 ('996', (['116030', 'Edward Eddy Stutterheim', '1', '39', '0', '0'], 'NED'))]

In [18]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                        .map(lambda x : [x[0],x[2]])).takeSample(False,4,10)

[('93',
  (['76502', 'Yelena Viktorovna Matoshko', '2', '30', '177', '80'], 'BLR')),
 ('487', (['110997', 'Ajit Singh', '1', '23', '185', '73'], 'IND')),
 ('249',
  (['41654', 'Toms Pedro Gonzlez Barrios', '1', '21', '178', '75'], 'CUB')),
 ('259', (['7157', 'Jlius Bal', '1', '22', '0', '0'], 'TCH'))]

### Ejercicio 4
**Muestra 17 filas del RDD deportistaOlimpicoRDD**


In [19]:
#Inserta aquí tu código
deportistaOlimpicoRDD.take(17)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705'],
 ['6', 'Per Knut Aaland', '1', '31', '188', '75', '1096'],
 ['7', 'John Aalberg', '1', '31', '183', '72', '1096'],
 ['8', 'Cornelia Cor Aalten Strannood ', '2', '18', '168', '0', '705'],
 ['9', 'Antti Sami Aalto', '1', '26', '186', '96', '350'],
 ['10', 'Einar Ferdinand Einari Aalto', '1', '26', '0', '0', '350'],
 ['11', 'Jorma Ilmari Aalto', '1', '22', '182', '76.5', '350'],
 ['12', 'Jyri Tapani Aalto', '1', '31', '172', '70', '350'],
 ['13', 'Minna Maarit Aalto', '2', '30', '159', '55.5', '350'],
 ['14', 'Pirjo Hannele Aalto Mattila ', '2', '32', '171', '65', '350'],
 ['15', 'Arvo Ossian Aaltonen', '1', '22', '0', '0', '35

## Importancia de countAprox

Debido a la cantidad de datos no siempre es recomendable hacer operaciones tipo count.

Por lo que 'countAprox' es la solución mas viable cuando solo queremos darnos una idea de cuantos datos podemos leer durante un tiempo determinado. Nota: el parametro está en milisegundos

In [20]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                            .map(lambda x : [x[0],x[2]])).count()

135427

In [21]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                        .map(lambda x : [x[0],x[2]])).countApprox(20)

135427

# Acciones de modificacion

### Obtenemos el equipo y el deportista

Guardamos el RDD resultante en equipoDeportista

In [22]:
equipoDeportista = deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                                            .map(lambda x : [x[0],x[2]]))

In [23]:
equipoDeportista.take(1)


[('199', (['1', 'A Dijiang', '1', '24', '180', '80'], 'CHN'))]

In [24]:
equipoDeportista.map(lambda x : (x[1][0][0],x[1][0][1:],x[1][1]) )

PythonRDD[77] at RDD at PythonRDD.scala:53

### Carga de resultados

Esta tabla posee las medallas que los jugadores han ganado

In [25]:
resultado = sc.textFile("/content/resultados.csv").map(lambda line : line.split(","))

Eliminamos todas las medallas no ganadoras y nos quedamos con el valor de la medalla y  del deportista_id

In [26]:
resultadoGanador = resultado.filter(lambda l : 'NA' not in l[1])
resultadoGanador = resultadoGanador.map(lambda l : [l[2],l[1]])

In [27]:
resultadoGanador.take(15)

[['deportista_id', 'medalla'],
 ['4', 'Gold'],
 ['15', 'Bronze'],
 ['15', 'Bronze'],
 ['16', 'Bronze'],
 ['17', 'Bronze'],
 ['17', 'Gold'],
 ['17', 'Gold'],
 ['17', 'Gold'],
 ['17', 'Bronze'],
 ['20', 'Gold'],
 ['20', 'Bronze'],
 ['20', 'Silver'],
 ['20', 'Bronze'],
 ['20', 'Silver']]

### Obtenemos la relación buscada: deportista,pais y medalla.

In [28]:
jugadoresMedalla =  equipoDeportista.join(resultadoGanador)
jugadoresMedalla.take(1)

[('716',
  ((['553', 'John Charles Abrams', '1', '22', '183', '0'], 'NZL'), 'Gold'))]

### Agrupamos las medallas respecto a la sigla del pais jugador

In [29]:
d = {'Gold':7, 'Silver':5, 'Bronze':4}
paisesMedallas = jugadoresMedalla.map(lambda x : (x[1][0][-1],d[x[1][1]]) )
paisesMedallas.takeSample(False,10)

[('ITA', 4),
 ('NED', 4),
 ('EUN', 5),
 ('RSA', 5),
 ('LES', 7),
 ('NED', 7),
 ('NED', 5),
 ('ITA', 5),
 ('NED', 7),
 ('MDA', 4)]

### Obtenemos los valores de los puntuajes históricos de los paises jugadores

In [30]:
from operator import add
conclusion = paisesMedallas.reduceByKey((add)).sortBy(lambda x : x[1],ascending=False)
conclusion.take(10)

[('ITA', 74920),
 ('NED', 65560),
 ('GER', 22323),
 ('JPN', 19950),
 ('NZL', 9220),
 ('TCH', 8160),
 ('BLR', 5012),
 ('RSA', 4735),
 ('TUR', 3965),
 ('GHA', 3430)]

# Estadística básica sobre los RDDs

Spark posee una suite integrada de forma natural para poder obtener estadísticas básicas.

In [31]:
conclusion.map(lambda l : l[1]).stats()

(count: 65, mean: 3735.6153846153843, stdev: 12505.143665022015, max: 74920.0, min: 5.0)

In [32]:
conclusion.map(lambda l : l [1]).mean()

3735.6153846153843

In [33]:
conclusion.map(lambda l : l [1]).sum()

242815

In [34]:
conclusion.map(lambda l : l [1]).histogram(10)

([5.0,
  7496.5,
  14988.0,
  22479.5,
  29971.0,
  37462.5,
  44954.0,
  52445.5,
  59937.0,
  67428.5,
  74920],
 [59, 2, 2, 0, 0, 0, 0, 0, 1, 1])

## Ejercicios Programación funcional

### Ejercicio 5
**Escribir una función que aplique el IVA a un precio.**
* Nombre de la función: apply_iva
* Argumentos: precio, iva
* Resultado: precio con iva aplicado


* Prueba a llamar a la función


In [35]:
#Inserta aquí tu código
#def apply_iva(precio, iva):
#  return precio + iva
#apply_iva(20, 12)

def apply_iva(base_imponible, porcentaje_iva):
  iva = 100 * porcentaje_iva
  return base_imponible * iva / 100


32

### Ejercicio 6
** Escribe una función que aplica la función cuadrado() a todos los elementos de una lista.**

    Parámetros:
        funcion: Recibe la función a aplicar.
        lista: Es una lista con valores que se pasarán como argumentos a funcion.
    Devuelve:
        Una lista con el resultado de aplicar la función a los valores de la lista.

In [40]:
def cuadrado(n):
    return n ** 2


In [41]:
#Inserta aquí tu código
lista = [1,2,3,4,5]

def alcuadrado(lista, funcion):
  resultado= []
  for numero in lista:
    resultado.append(funcion(numero))
  return resultado

In [42]:
alcuadrado([1,2,3,4,5,6], cuadrado)

[1, 4, 9, 16, 25, 36]

In [43]:
list(map(lambda numero: numero *3, [1,2,3,4,5,6]))

[3, 6, 9, 12, 15, 18]

In [46]:
rdd_lista = sc.parallelize([1,2,3,4,5,6])
rdd_lista.map(lambda numero: numero * 3).collect()

[3, 6, 9, 12, 15, 18]

### Ejercicio 7
**Detectar y corregir los errores del siguiente programa que aplica el iva a una factura:**


In [48]:
base = float(input('Introduce la base imponible de la factura: '))
iva = int(input('Introduce el iva de la factura: '))
def aplica_iva(base, iva = 21):
    base = base * ((iva+ 100)/100)
    return base

print(aplica_iva(base, iva))

Introduce la base imponible de la factura: 50
Introduce el iva de la factura: 21
1050.0
