# RDDs + Programación Funcional

# Creamos un contexo para crear RDDs

In [None]:
import pandas as pd
!pip install pyspark --quiet
from pyspark import SparkContext
sc = SparkContext(master = "local", appName = "Transformaciones sobre un RDD")

# Cargamos un RDDs

Para la realzación del ejercicio, con ayuda del archivo paises, realizaremos el equivalente de operaciones 'select','count','group by' y 'filter / where'

Cambia el valor de la ruta para que apunte a la ruta donde tienes los datos

In [32]:
from google.colab import drive
drive.mount('/content/drive/')
#LEER CSV
equiposOlimpicosRDD = sc.textFile("/content/drive/MyDrive/Colab Notebooks/paises.csv").map(lambda line : line.split(","))

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [33]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [34]:
equiposOlimpicosRDD.take(10)

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG'],
 ['5', 'Afghanistan', 'AFG'],
 ['6', 'Akatonbo', 'IRL'],
 ['7', 'Alain IV', 'SUI'],
 ['8', 'Albania', 'ALB'],
 ['9', 'Alcaid', 'POR']]

### Deshacemos un RDD

Con ayuda del método `collect()`, permite desparalelizar un RDD.


In [35]:
equiposOlimpicosRDD.collect()

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG'],
 ['5', 'Afghanistan', 'AFG'],
 ['6', 'Akatonbo', 'IRL'],
 ['7', 'Alain IV', 'SUI'],
 ['8', 'Albania', 'ALB'],
 ['9', 'Alcaid', 'POR'],
 ['10', 'Alcyon-6', 'FRA'],
 ['11', 'Alcyon-7', 'FRA'],
 ['12', 'Aldebaran', 'ITA'],
 ['13', 'Aldebaran II', 'ITA'],
 ['14', 'Aletta', 'IRL'],
 ['15', 'Algeria', 'ALG'],
 ['16', 'Ali-Baba II', 'SWE'],
 ['17', 'Ali-Baba IV', 'SUI'],
 ['18', 'Ali-Baba IX', 'SUI'],
 ['19', 'Ali-Baba VI', 'SUI'],
 ['20', 'Allegro', 'FRA'],
 ['21', 'Almaz', 'URS'],
 ['22', 'Aloha II', 'SWE'],
 ['23', 'Amateur Athletic Association', 'AUS'],
 ['24', 'American Samoa', 'ASA'],
 ['25', 'Amolgavar', 'ESP'],
 ['26', 'Amstel Amsterdam', 'NED'],
 ['27', 'Amulet-3', 'FRA'],
 ['28', 'Amulet-7', 'FRA'],
 ['29', 'Ancora', 'GBR'],
 ['30', 'Andorinha', 'BRA'],
 ['31', 'Andorra', 'AND'],
 ['32', 'Andromeda', 'GBR'],
 ['33', 'Angelita', 'USA'

Creamos un dataframe a partir de un RDD mediante `pd.DataFrame()`

In [36]:
pd.DataFrame(equiposOlimpicosRDD.collect()[1:], columns = equiposOlimpicosRDD.collect()[0] )

Unnamed: 0,id,equipo,sigla
0,1,30. Februar,AUT
1,2,A North American Team,MEX
2,3,Acipactli,MEX
3,4,Acturus,ARG
4,5,Afghanistan,AFG
...,...,...,...
1179,1180,Yugoslavia-2,YUG
1180,1181,Zambia,ZAM
1181,1182,Zefyros,GRE
1182,1183,Zimbabwe,ZIM


# Operaciones sobre un RDDs

Para la realzación del ejercicio, con ayuda del archivo paises, realizaremos el equivalente de operaciones `select`,`count`,`group by` , `filter` y `where`

### Ejercicio 1:
**Mostrar los países sin repetidos**

*   Seleccionar la columna de las siglas PISTA: Usa un map y quedate con la columna con indice 2.
*   Usamos el `distinct()`



In [37]:
#Inserta aquí tu código
columna_siglas = equiposOlimpicosRDD.map( lambda fila : fila[2] )
columna_siglas_sin_repetidos = columna_siglas.distinct()
columna_siglas_sin_repetidos.take(5)


['sigla', 'AUT', 'MEX', 'ARG', 'AFG']

### Ejercicio 2
**Contar la cantidad de siglas diferentes de los equipos olímpicos existentes**

PISTA: Puedes usar `count()`

In [38]:
#Inserta aquí tu código
from pyspark.sql import SparkSession

# Crea una sesión de Spark
spark = SparkSession.builder.appName("EquiposOlimpicos").getOrCreate()

# Lee el archivo CSV y crea un DataFrame
equiposOlimpicosDF = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/paises.csv", header=True)

# Selecciona la columna de las siglas (columna con índice 2)
siglasDF = equiposOlimpicosDF.select(equiposOlimpicosDF.columns[2])

# Utiliza distinct() para obtener los valores distintos en la columna de siglas
siglas_distintas = siglasDF.distinct()

# Cuenta la cantidad de siglas diferentes
cantidad_siglas_diferentes = siglas_distintas.count()

# Imprime la cantidad de siglas diferentes
print("Cantidad de siglas diferentes:", cantidad_siglas_diferentes)

# Detén la sesión de Spark
spark.stop()


Cantidad de siglas diferentes: 230


### Ejercicio 3 (Díficil, puedes hacerlo el último)
**Agrupamos datos para poder determinar cuantos equipos posee un pais**

Recuerda que los RDDs poseen una estructura de `clave-valor`, por lo cual debemos poner primero el valor 'clave' (la sigla del país).

Con 'mapValues', al componente 'valor' le indicamos que operación deseamos que se le aplique.

**Objetivo 1**: Tener un diccionario con `clave` la sigla del país y `valor` la lista de equipos de ese país

Con `groupByKey` podemos agrupar por `clave`, en este caso por sigla del país  

In [39]:
#Inserta aquí tu código
from pyspark import SparkContext

# Crea el contexto de Spark
sc = SparkContext("local", "EquiposOlimpicos")

# Carga los datos desde el archivo CSV en tu Google Drive
equiposOlimpicosRDD = sc.textFile("/content/drive/MyDrive/Colab Notebooks/paises.csv").map(lambda fila: fila.split(","))

# Mapea para crear un RDD de pares clave-valor (sigla_del_pais, equipo_olimpico)
pares_pais_equipos = equiposOlimpicosRDD.map(lambda fila: (fila[2], fila[1]))

# Utiliza groupByKey para agrupar por la sigla del país
grupos_por_pais = pares_pais_equipos.groupByKey()

# Utiliza mapValues para obtener un diccionario con la sigla del país como clave y una lista de equipos como valor
diccionario_pais_equipos = grupos_por_pais.mapValues(list).collectAsMap()

# Imprime el diccionario con clave sigla del país y valor lista de equipos
for sigla, equipos in diccionario_pais_equipos.items():
    print(f"Sigla del país: {sigla}, Equipos: {equipos}")

# Detén el contexto de Spark
sc.stop()


Sigla del país: sigla, Equipos: ['equipo']
Sigla del país: AUT, Equipos: ['30. Februar', 'Austria', 'Austria-1', 'Austria-2', 'Breslau', 'Brigantia', 'Donar III', 'Evita VI', 'May-Be 1960', '"R.-V. Germania; Leitmeritz"', 'Surprise']
Sigla del país: MEX, Equipos: ['A North American Team', 'Acipactli', 'Chamukina', 'Mexico', 'Mexico-1', 'Mexico-2', 'Nausikaa 4', 'Tlaloc', 'Xolotl']
Sigla del país: ARG, Equipos: ['Acturus', 'Antares', 'Arcturus', 'Ardilla', 'Argentina', 'Argentina-1', 'Argentina-2', 'Blue Red', 'Covunco III', 'Cupidon III', 'Djinn', 'Gullvinge', 'Matrero II', 'Mizar', 'Pampero', 'Rampage', 'Tango', 'Wiking']
Sigla del país: AFG, Equipos: ['Afghanistan']
Sigla del país: IRL, Equipos: ['Akatonbo', 'Aletta', 'Ireland', 'Ireland-1', 'Ireland-2', 'The Cloud', 'Three Leaves']
Sigla del país: SUI, Equipos: ['Alain IV', 'Ali-Baba IV', 'Ali-Baba IX', 'Ali-Baba VI', 'Baccara', 'Ballerina IV', 'Fantasio III', 'Kln', 'Lerina', 'Pousse-Moi Pas VII', 'Switzerland', 'Switzerland-1', 'S

**Objetivo 2**: Tener un diccionario con `clave` la sigla del país y `valor` la cantidad de equipos de ese país

In [40]:
#Inserta aquí tu código
from pyspark import SparkContext

# Crea el contexto de Spark
sc = SparkContext("local", "EquiposOlimpicos")

# Carga los datos desde el archivo CSV en tu Google Drive
equiposOlimpicosRDD = sc.textFile("/content/drive/MyDrive/Colab Notebooks/paises.csv").map(lambda fila: fila.split(","))

# Mapea para crear un RDD de pares clave-valor (sigla_del_pais, 1) donde 1 representa un equipo
pares_pais_equipos = equiposOlimpicosRDD.map(lambda fila: (fila[2], 1))

# Utiliza reduceByKey para sumar la cantidad de equipos por país
cantidad_equipos_por_pais = pares_pais_equipos.reduceByKey(lambda a, b: a + b)

# Recoge los resultados en forma de diccionario
diccionario_pais_cantidad_equipos = cantidad_equipos_por_pais.collectAsMap()

# Imprime el diccionario con clave sigla del país y valor cantidad de equipos
for sigla, cantidad in diccionario_pais_cantidad_equipos.items():
    print(f"Sigla del país: {sigla}, Cantidad de equipos: {cantidad}")

# Detén el contexto de Spark
sc.stop()


Sigla del país: sigla, Cantidad de equipos: 1
Sigla del país: AUT, Cantidad de equipos: 11
Sigla del país: MEX, Cantidad de equipos: 9
Sigla del país: ARG, Cantidad de equipos: 18
Sigla del país: AFG, Cantidad de equipos: 1
Sigla del país: IRL, Cantidad de equipos: 7
Sigla del país: SUI, Cantidad de equipos: 17
Sigla del país: ALB, Cantidad de equipos: 1
Sigla del país: POR, Cantidad de equipos: 21
Sigla del país: FRA, Cantidad de equipos: 155
Sigla del país: ITA, Cantidad de equipos: 36
Sigla del país: ALG, Cantidad de equipos: 1
Sigla del país: SWE, Cantidad de equipos: 52
Sigla del país: URS, Cantidad de equipos: 16
Sigla del país: AUS, Cantidad de equipos: 23
Sigla del país: ASA, Cantidad de equipos: 1
Sigla del país: ESP, Cantidad de equipos: 11
Sigla del país: NED, Cantidad de equipos: 26
Sigla del país: GBR, Cantidad de equipos: 79
Sigla del país: BRA, Cantidad de equipos: 13
Sigla del país: AND, Cantidad de equipos: 1
Sigla del país: USA, Cantidad de equipos: 92
Sigla del país:

### Operación filter para obtener un subconjunto

Con el método 'filter', reducimos el conjuntos de equipos.

Nos quedamos con la sigla equivalente de argentina

In [41]:
from pyspark import SparkContext

# Crea el contexto de Spark
sc = SparkContext("local", "EquiposOlimpicos")

# Carga los datos desde el archivo CSV en tu Google Drive
equiposOlimpicosRDD = sc.textFile("/content/drive/MyDrive/Colab Notebooks/paises.csv").map(lambda fila: fila.split(","))

# Define la sigla de Argentina (ajústala según tus datos)
sigla_argentina = "ARG"

# Utiliza filter para quedarte con los equipos que tienen la sigla de Argentina
equipos_argentina = equiposOlimpicosRDD.filter(lambda fila: fila[2] == sigla_argentina)

# Recoge los resultados en forma de lista
lista_equipos_argentina = equipos_argentina.collect()

# Imprime los equipos con la sigla de Argentina
for equipo in lista_equipos_argentina:
    print(f"Equipo de Argentina: {equipo[1]}")

# Detén el contexto de Spark
sc.stop()


Equipo de Argentina: Acturus
Equipo de Argentina: Antares
Equipo de Argentina: Arcturus
Equipo de Argentina: Ardilla
Equipo de Argentina: Argentina
Equipo de Argentina: Argentina-1
Equipo de Argentina: Argentina-2
Equipo de Argentina: Blue Red
Equipo de Argentina: Covunco III
Equipo de Argentina: Cupidon III
Equipo de Argentina: Djinn
Equipo de Argentina: Gullvinge
Equipo de Argentina: Matrero II
Equipo de Argentina: Mizar
Equipo de Argentina: Pampero
Equipo de Argentina: Rampage
Equipo de Argentina: Tango
Equipo de Argentina: Wiking


# Accciones sobre RDDs

In [136]:
# Si ya tienes un contexto de Spark, deténlo
if 'sc' in locals() or 'sc' in globals():
    sc.stop()

In [None]:
deportistaOlimpicoRDD = sc.textFile("/content/drive/MyDrive/Colab Notebooks/deportista.csv").map(lambda line : line.split(","))
deportistaOlimpico2RDD = sc.textFile("/content/drive/MyDrive/Colab Notebooks/deportista2.csv").map(lambda line : line.split(","))

In [116]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.union(deportistaOlimpico2RDD)

### Formas de visualizar datos de un RDDs

La operación 'take' nos devuelve 'N' valores que encuentre spark.

La operación 'top', previo ordena respecto al valor llave y nos devuelve 'N' valores.

La operacion 'takeSample', nos devuelve una muestra aleatoria de los valores, Observa que recibe tres parametros

| Orden | Argumento | Descripción | Valor
|-------|--------|-----|--------|
|1|withReplacement|Indica si la muetra podrá traer replicados|Bool|
|2| num| Cantidad de valores a retornar|int|
|3|seed|semilla para el generador aleatorio|int|

Nota: Si encuentras complicado leer el código en los segmentos donde usamos indices en las listas, ejecutalo por partes para que visualizes que componentes seleccionamos

Nota: Para hacer join con RDDs, debemos tener selecionada la llave al inicio del RDD para poder realizar el cruce.

In [90]:
deportistaOlimpicoRDD.take(3)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199']]

In [91]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).take(3)

[['equipo_id',
  ['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso']],
 ['199', ['1', 'A Dijiang', '1', '24', '180', '80']],
 ['199', ['2', 'A Lamusi', '1', '23', '170', '60']]]

In [120]:
if 'sc' in locals() or 'sc' in globals():
    sc.stop()  # Detén el contexto de Spark existente si existe

# Crea un nuevo contexto de Spark
sc = SparkContext("local", "EquiposDeportistas")

# Realiza tus operaciones de Spark dentro de este nuevo contexto


In [None]:
equiposOlimpicosRDD.map(lambda x : [x[0],x[2]]).take(3)

In [None]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                        .map(lambda x : [x[0],x[2]])).take(6)

In [None]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                        .map(lambda x : [x[0],x[2]])).top(5)

In [None]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                        .map(lambda x : [x[0],x[2]])).takeSample(False,4,10)

### Ejercicio 4
**Muestra 17 filas del RDD deportistaOlimpicoRDD**


In [101]:
#Inserta aquí tu código
deportistaOlimpicoRDD.take(18)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705'],
 ['6', 'Per Knut Aaland', '1', '31', '188', '75', '1096'],
 ['7', 'John Aalberg', '1', '31', '183', '72', '1096'],
 ['8', 'Cornelia Cor Aalten Strannood ', '2', '18', '168', '0', '705'],
 ['9', 'Antti Sami Aalto', '1', '26', '186', '96', '350'],
 ['10', 'Einar Ferdinand Einari Aalto', '1', '26', '0', '0', '350'],
 ['11', 'Jorma Ilmari Aalto', '1', '22', '182', '76.5', '350'],
 ['12', 'Jyri Tapani Aalto', '1', '31', '172', '70', '350'],
 ['13', 'Minna Maarit Aalto', '2', '30', '159', '55.5', '350'],
 ['14', 'Pirjo Hannele Aalto Mattila ', '2', '32', '171', '65', '350'],
 ['15', 'Arvo Ossian Aaltonen', '1', '22', '0', '0', '35

## Importancia de countAprox

Debido a la cantidad de datos no siempre es recomendable hacer operaciones tipo count.

Por lo que 'countAprox' es la solución mas viable cuando solo queremos darnos una idea de cuantos datos podemos leer durante un tiempo determinado. Nota: el parametro está en milisegundos

In [None]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                            .map(lambda x : [x[0],x[2]])).count()

In [None]:
deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                        .map(lambda x : [x[0],x[2]])).countApprox(20)

# Acciones de modificacion

### Obtenemos el equipo y el deportista

Guardamos el RDD resultante en equipoDeportista

In [125]:
equipoDeportista = deportistaOlimpicoRDD.map(lambda x :[x[-1], x[:-1]]).join(equiposOlimpicosRDD \
                                            .map(lambda x : [x[0],x[2]]))

Py4JJavaError: ignored

In [None]:
equipoDeportista.take(1)


In [None]:
equipoDeportista.map(lambda x : (x[1][0][0],x[1][0][1:],x[1][1]) )

### Carga de resultados

Esta tabla posee las medallas que los jugadores han ganado

In [None]:
resultado = sc.textFile("/content/drive/MyDrive/Colab Notebooks/resultados.csv").map(lambda line : line.split(","))

Eliminamos todas las medallas no ganadoras y nos quedamos con el valor de la medalla y  del deportista_id

In [None]:
resultadoGanador = resultado.filter(lambda l : 'NA' not in l[1])
resultadoGanador = resultadoGanador.map(lambda l : [l[2],l[1]])

In [None]:
resultadoGanador.take(15)

### Obtenemos la relación buscada: deportista,pais y medalla.

In [108]:
jugadoresMedalla =  equipoDeportista.join(resultadoGanador)
jugadoresMedalla.take(1)

NameError: ignored

### Agrupamos las medallas respecto a la sigla del pais jugador

In [107]:
d = {'Gold':7, 'Silver':5, 'Bronze':4}
paisesMedallas = jugadoresMedalla.map(lambda x : (x[1][0][-1],d[x[1][1]]) )
paisesMedallas.takeSample(False,10)

NameError: ignored

### Obtenemos los valores de los puntuajes históricos de los paises jugadores

In [None]:
from operator import add
conclusion = paisesMedallas.reduceByKey((add)).sortBy(lambda x : x[1],ascending=False)
conclusion.take(10)

# Estadística básica sobre los RDDs

Spark posee una suite integrada de forma natural para poder obtener estadísticas básicas.

In [None]:
conclusion.map(lambda l : l[1]).stats()

In [None]:
conclusion.map(lambda l : l [1]).mean()

In [None]:
conclusion.map(lambda l : l [1]).sum()

In [None]:
conclusion.map(lambda l : l [1]).histogram(10)

## Ejercicios Programación funcional

### Ejercicio 5
**Escribir una función que aplique el IVA a un precio.**
* Nombre de la función: apply_iva
* Argumentos: precio, iva
* Resultado: precio con iva aplicado


* Prueba a llamar a la función


In [126]:
#Inserta aquí tu código
def apply_iva(precio, iva):
    if 0 <= iva <= 100:  # Asegurarse de que el IVA esté en un rango válido (0-100%)
        precio_con_iva = precio + (precio * (iva / 100))
        return precio_con_iva
    else:
        return "El valor del IVA debe estar en el rango de 0-100."

# Ejemplo de uso de la función
precio = 100  # Precio sin IVA
iva = 18     # Tasa de IVA del 18%

precio_final = apply_iva(precio, iva)
print(f"Precio con {iva}% de IVA: ${precio_final:.2f}")


Precio con 18% de IVA: $118.00


### Ejercicio 6
** Escribe una función que aplica la función cuadrado() a todos los elementos de una lista.**

    Parámetros:
        funcion: Recibe la función a aplicar.
        lista: Es una lista con valores que se pasarán como argumentos a funcion.
    Devuelve:
        Una lista con el resultado de aplicar la función a los valores de la lista.

In [127]:
#Inserta aquí tu código
def aplicar_funcion_a_lista(funcion, lista):
    # Aplicar la función a cada elemento de la lista y guardar los resultados en una nueva lista
    resultados = [funcion(elemento) for elemento in lista]
    return resultados

# Ejemplo de uso de la función cuadrado
def cuadrado(x):
    return x ** 2

# Lista de números de entrada
numeros = [1, 2, 3, 4, 5]

# Aplicar la función cuadrado a la lista de números
resultados_cuadrado = aplicar_funcion_a_lista(cuadrado, numeros)

print(resultados_cuadrado)


[1, 4, 9, 16, 25]


### Ejercicio 7
**Detectar y corregir los errores del siguiente programa que aplica el iva a una factura:**


In [None]:
base = input('Introduce la base imponible de la factura: ')
print(aplica_iva(base, iva))

def aplica_iva(base, iva = 21):
    base = base * iva
    return base

# errores corregidos

In [None]:
def aplica_iva(base, iva=21):
    base_con_iva = base + (base * iva / 100)
    return base_con_iva

try:
    base = float(input('Introduce la base imponible de la factura: '))  # Convierte la entrada a un número de punto flotante

    iva = 21  # Puedes cambiar el valor del IVA si es necesario

    resultado = aplica_iva(base, iva)
    print(f'El precio con IVA del {iva}% es: {resultado:.2f}')
except ValueError:
    print('La base imponible debe ser un número válido.')
