# Actividad Evaluada 3

En esta actividad vamos a evaluar que sepas escribir algoritmos bajo el paradigma de computación "Map-Reduce". Primero tendrás que escribir código en PySpark y después explicar ciertos puntos de cómo funciona tu código en un entorno distribuido.

## Parte 1 (4 puntos) - Escribiendo procedimientos en PySpark

En esta actividad vamos a trabajar con los datos de la actividad de BigQuery. Estos datos son los que representan el caso ficticio de una tienda online que vende frutas. Las tablas disponibilizadas son `Usuarios`, `Frutas` y `Compras`, y las puedes encontrar en el GitHub junto a este archivo. Recordemos que las compras disponibles en los datos son solo para el mes de febrero.

Para esta parte de la actividad vas a tener que escribir código en `pyspark` en celdas de código. Vas a tener permitido solamente usar las funciones más básicas de Spark, asociadas a [la API](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html) de los objetos `RDD`. Por lo mismo, **no puedes cargar los datos en objetos de Python y hacer análisis locales**, ya que tu código debería funcionar para un conjunto de datos más grande que está en un sistema de archivos distribuido.

### Consultas

Tienes que escribir procedimientos en PySpark que respondan las siguientes consultas:

1. **(0.5 ptos)** Entrega todas las frutas con un precio unitario mayor o igual a 15.
2. **(1 pto)** Entrega cada id de compra, junto con el nombre de la persona que la realizó, junto con la fruta que compró. Puedes pensar en retornar una fila por cada fruta en una compra.
3. **(1 pto)** Entrega el el id de cada compra, junto al monto de cada compra.
4. **(1.5 ptos)** Para cada día, entrega el id y el monto de la compra más cara.

## Parte 2 (2 puntos) - Explicando cómo se procesa cada consulta

Ahora, pensando en un entorno distribuido donde hay muchos datos, explica cómo se resuelve cada uno de las consultas. En particular, queremos que te enfoques en cómo Spark saca ventaja de contar con varios "workers" en un entorno distribuido.

## Detalles administrativos

- El trabajo es individual. Puedes consultar y discutir con tus compañeras y compañeros, pero a la hora de escribir el código, no puedes compartirlo con nadie. Puedes usar recursos como internet y modelos fundacionales de procesamiento de texto, pero nuevamente, no puedes compartir tu código.

- La entrega es el viernes 26 de mayo a las 20:00 horas. Vas a tener que entregar el notebook en el buzón habilitado. **Para corregir la tarea vamos a usar Google Colab, así que te recomendamos que pruebes que tu tarea corre en ese entorno**.

- Si tienes dudas, aprovéchanos, vamos a estar en la sala para ayudar. Recuerda que la idea en estas actividades es aprender y evaluar al mismo tiempo, ¡está bien si no todo sale a la primera!.

### Instalar librerias

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=fa4e19d2b7373a96eb12c8bdc4b6fec3d46803e7bc1aa659bd67caceeffbe049
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .getOrCreate()
    
sc = spark.sparkContext
sc

### Obtener datos

In [3]:
!gdown 13nInfre0ljprp3baOqu9-ulw3E2aNWhz
!gdown 1zBFZnZ-CXbhDs1X0l6GhaMmw_aDZWFkc 
!gdown 107lHtJfXsCLfo83HFttAPDg7IEd6yhHF

Downloading...
From: https://drive.google.com/uc?id=13nInfre0ljprp3baOqu9-ulw3E2aNWhz
To: /content/compras.csv
100% 7.94k/7.94k [00:00<00:00, 27.9MB/s]
Downloading...
From: https://drive.google.com/uc?id=1zBFZnZ-CXbhDs1X0l6GhaMmw_aDZWFkc
To: /content/frutas.csv
100% 215/215 [00:00<00:00, 650kB/s]
Downloading...
From: https://drive.google.com/uc?id=107lHtJfXsCLfo83HFttAPDg7IEd6yhHF
To: /content/usuarios.csv
100% 582/582 [00:00<00:00, 2.15MB/s]


In [4]:
frutas_1   = sc.textFile("frutas.csv")
header_1   = frutas_1.first()
frutas   = frutas_1.filter(lambda line: line != header_1).flatMap(lambda row: [ row.split(",") ])

compras_1  = sc.textFile("compras.csv")
header_2   = compras_1.first()
compras  = compras_1.filter(lambda line: line != header_2).flatMap(lambda row: [ row.split(",") ])

usuarios_1 = sc.textFile("usuarios.csv")
header_3   = usuarios_1.first()
usuarios = usuarios_1.filter(lambda line: line != header_3).flatMap(lambda row: [ row.split(",") ])


In [5]:
print(f"Header de las frutas: {header_1.split(',')}")
print(f"Header de las compras: {header_2.split(',')}")
print(f"Header de las usuarios: {header_3.split(',')}")

Header de las frutas: ['id', 'nombre', 'precio_unitario']
Header de las compras: ['id_compra', 'id_usuario', 'id_producto', 'fecha_compra', 'cantidad']
Header de las usuarios: ['id', 'nombre', 'direccion', 'tarjeta_credito']


## Parte 1

### **(0.5 ptos)** Entrega todas las frutas con un precio unitario mayor o igual a 15.


In [13]:
frutas.filter(lambda row: int(row[2]) >= 15).collect()

[['3', 'Uva', '20'],
 ['4', 'Sandía', '50'],
 ['5', 'Melón', '40'],
 ['7', 'Ciruela', '15'],
 ['8', 'Durazno', '17'],
 ['10', 'Guanábana', '30'],
 ['11', 'Mango', '25'],
 ['12', 'Fresa', '18'],
 ['13', 'Kiwi', '22'],
 ['14', 'Papaya', '35'],
 ['15', 'Maracuyá', '28']]

### **(1 pto)** Entrega cada id de compra, junto con el nombre de la persona que la realizó, junto con la fruta que compró. Puedes pensar en retornar una fila por cada fruta en una compra.



Cabe mencionar, que los primeros `map`'s usados, no son del todo necesarios, los use para ordenar mejor las ideas

In [15]:
fruta_id_nombre = frutas.map(lambda fruta: (fruta[0], fruta[1]))
    # --> (id, nombre)

usuario_id_nombre = usuarios.map(lambda usuario: (usuario[0], usuario[1]))
    # --> (id, nombre)

compras_format = compras.map(lambda compra: (compra[1], (compra[2], compra[0]))) 
    # --> id_usu, id_prod, id_compra

In [16]:
compras_format\
    .join(usuario_id_nombre)\
    .map( lambda row: (row[1][0][0], (row[1][0][1], row[1][1])))\
    .join(fruta_id_nombre)\
    .map( lambda row: (int(row[1][0][0]), row[1][0][1], row[1][1]))\
    .sortBy( lambda row: row[0])\
    .collect()

[(1, 'Carlos Rodriguez', 'Maracuyá'),
 (2, 'Isabel Torres', 'Papaya'),
 (3, 'Manuel Herrera', 'Maracuyá'),
 (3, 'Manuel Herrera', 'Fresa'),
 (3, 'Manuel Herrera', 'Guanábana'),
 (4, 'Diego Sánchez', 'Kiwi'),
 (4, 'Diego Sánchez', 'Naranja'),
 (4, 'Diego Sánchez', 'Ciruela'),
 (5, 'Patricia Gómez', 'Durazno'),
 (5, 'Patricia Gómez', 'Guanábana'),
 (6, 'Francisco Martínez', 'Sandía'),
 (6, 'Francisco Martínez', 'Uva'),
 (6, 'Francisco Martínez', 'Fresa'),
 (6, 'Francisco Martínez', 'Plátano'),
 (7, 'Juan Perez', 'Durazno'),
 (8, 'Manuel Herrera', 'Sandía'),
 (8, 'Manuel Herrera', 'Pera'),
 (8, 'Manuel Herrera', 'Durazno'),
 (9, 'Isabel Torres', 'Kiwi'),
 (9, 'Isabel Torres', 'Manzana'),
 (9, 'Isabel Torres', 'Melón'),
 (9, 'Isabel Torres', 'Mango'),
 (10, 'María Fernández', 'Manzana'),
 (10, 'María Fernández', 'Ciruela'),
 (11, 'Francisco Martínez', 'Maracuyá'),
 (11, 'Francisco Martínez', 'Uva'),
 (11, 'Francisco Martínez', 'Pera'),
 (11, 'Francisco Martínez', 'Guanábana'),
 (12, 'Juan 

In [11]:
# Idea descartada pero que no quise borrar por si acaso
# usuario_compra = usuario_id_nombre.join(compras_format).map(
#     lambda row: (row[1][1][0], (row[1][1][1], row[1][0]))
#     ) 
#     # --> id_prod, id_compr, nombre

# sorted(usuario_compra.join(fruta_id_nombre).map(
#     lambda row: (int(row[1][0][0]), row[1][0][1], row[1][1])
#     ).collect())
#     # --> id_compra, comprador, fruta

### **(1 pto)** Entrega el el id de cada compra, junto al monto de cada compra.

In [17]:
compras_format = compras.map(lambda row: (row[2], (row[0], row[4])))
    # --> id_producto, (id_compra, cantidad)

frutas_format = frutas.map(lambda row: (row[0], row[2]))
    # --> id_producto, precio_unidad

compras_format.join(frutas_format)\
    .map(lambda row: (int(row[1][0][0]), int(row[1][0][1]) * int(row[1][1])))\
    .reduceByKey(lambda r1, r2: r1 + r2)\
    .sortBy(lambda row: row[0])\
    .collect()

[(1, 28),
 (2, 210),
 (3, 704),
 (4, 360),
 (5, 188),
 (6, 295),
 (7, 119),
 (8, 299),
 (9, 619),
 (10, 40),
 (11, 564),
 (12, 856),
 (13, 215),
 (14, 266),
 (15, 530),
 (16, 206),
 (17, 248),
 (18, 316),
 (19, 261),
 (20, 102),
 (21, 852),
 (22, 120),
 (23, 411),
 (24, 92),
 (25, 55),
 (26, 427),
 (27, 213),
 (28, 1028),
 (29, 60),
 (30, 542),
 (31, 320),
 (32, 284),
 (33, 150),
 (34, 16),
 (35, 287),
 (36, 535),
 (37, 460),
 (38, 190),
 (39, 784),
 (40, 140),
 (41, 454),
 (42, 704),
 (43, 224),
 (44, 482),
 (45, 294),
 (46, 398),
 (47, 287),
 (48, 50),
 (49, 452),
 (50, 350),
 (51, 122),
 (52, 307),
 (53, 16),
 (54, 402),
 (55, 342),
 (56, 352),
 (57, 300),
 (58, 143),
 (59, 223),
 (60, 460),
 (61, 812),
 (62, 694),
 (63, 80),
 (64, 226),
 (65, 100),
 (66, 480),
 (67, 184),
 (68, 120),
 (69, 368),
 (70, 194),
 (71, 371),
 (72, 53),
 (73, 280),
 (74, 150),
 (75, 56),
 (76, 200),
 (77, 75),
 (78, 76),
 (79, 320),
 (80, 360),
 (81, 180),
 (82, 148),
 (83, 530),
 (84, 22),
 (85, 50),
 (8

### **(1.5 ptos)** Para cada día, entrega el id y el monto de la compra más cara.

In [18]:
compras_format = compras.map(lambda row: (row[2], (row[0], row[3], row[4])))
    # --> id_producto, (id_compra, fecha_compra, cantidad)

frutas_format = frutas.map(lambda row: (row[0], row[2]))
    # --> id_producto, precio_unidad

compra_fruta_precio_fecha = compras_format.join(frutas_format).map(lambda row: (*row[1][0], row[1][1]))
    # --> id_compra, fecha_compra, cantidad, precio_unitario

compra_fruta_precio_fecha\
    .map(lambda row: ((int(row[0]), row[1]), int(row[2]) * int(row[3])))\
    .reduceByKey( lambda r1, r2: r1 + r2)\
    .map( lambda row: (row[0][1], (row[0][0], row[1])))\
    .groupByKey()\
    .mapValues(list)\
    .map(lambda day_row: (day_row[0], *max(day_row[1], key=lambda pair: pair[1])))\
    .sortBy(lambda day: day[0])\
    .collect()
    # --> (fecha, id_compra, monto)

[('2023-02-01', 3, 704),
 ('2023-02-02', 9, 619),
 ('2023-02-03', 12, 856),
 ('2023-02-04', 15, 530),
 ('2023-02-05', 21, 852),
 ('2023-02-06', 28, 1028),
 ('2023-02-07', 36, 535),
 ('2023-02-08', 39, 784),
 ('2023-02-09', 42, 704),
 ('2023-02-10', 49, 452),
 ('2023-02-11', 54, 402),
 ('2023-02-12', 60, 460),
 ('2023-02-13', 61, 812),
 ('2023-02-14', 66, 480),
 ('2023-02-15', 71, 371),
 ('2023-02-16', 74, 150),
 ('2023-02-17', 80, 360),
 ('2023-02-18', 83, 530),
 ('2023-02-19', 91, 483),
 ('2023-02-20', 95, 151),
 ('2023-02-21', 96, 400),
 ('2023-02-22', 103, 552),
 ('2023-02-23', 112, 499),
 ('2023-02-24', 113, 590),
 ('2023-02-25', 122, 625),
 ('2023-02-26', 130, 587),
 ('2023-02-27', 137, 780),
 ('2023-02-28', 142, 489)]

## Parte 2

Ahora, pensando en un entorno distribuido donde hay muchos datos, explica cómo se resuelve cada uno de las consultas. En particular, queremos que te enfoques en cómo Spark saca ventaja de contar con varios "workers" en un entorno distribuido.

Como fuente de información para algunas de las siguientes respuestas se uso ChatGPT. Algunas definiciones quedaron tan buenas que las coloqué entre comillas. Se complementó con la [documentación de spark](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html).

`Spark` saca la ventaja principalmente en los pasos en que particiona la información. Esto permite varias cosas, por un lado, trabajar con menos datos hace los procesos mucho más rápidos en cada computador de manera independiente. Por otro lado, el trabajar con menos datos permite que todas las operaciones puedan ser realizadass en memoria (lo que también guarda relación con la anterior). 

Supongamos que quiero obtener lo vendido por día en un año en un supermercado con mucho público, utilizando `spark` podríamos trabajar en particiones por día en distintos computadores, lo que haría que en vez de trabajar ver la suma de las ventas de 365 días en un solo computador, se vea la venta de cada día en un solo computador. Podremos tener pérdida de tiempo entre la comunicación de computadores, pero al trabajar con tantos computadores en paralelo es como si se tuviese un procesador y memoria combinada muchísimo más potente.

### Consulta 1

`frutas.filter(lambda row: int(row[2]) >= 15).collect()`



Esta consulta puede resolverse dividiendo el conjunto del RDD en varias máquinas, en cada una ejecutar la función para ver si el elemento cumple la condición (un filtro aplicando la funcióna a cada fila y quedandome con los que cumplen). Luego, se vuelven a unir los elementos que sí hayan cumplido con la condición.

### Consulta 2

```
fruta_id_nombre   = frutas.map(lambda fruta: (fruta[0], fruta[1]))

usuario_id_nombre = usuarios.map(lambda usuario: (usuario[0], usuario[1]))

compras_format    = compras.map(lambda compra: (compra[1], (compra[2], compra[0]))) 

compra_usuario    = compras_format.join(usuario_id_nombre).map(
    lambda row: (row[1][0][0], (row[1][0][1], row[1][1])))

sorted(compra_usuario.join(fruta_id_nombre).map(
    lambda row: (int(row[1][0][0]), row[1][0][1], row[1][1])
    ).collect())
```

Los 3 `map`'s iniciales sirven para trabajar con menos columnas de los datos y usar el fomato que permite unir por llaves de manera más fácil. 

Luego, para crear `compra_usuario`, se hace un `join` entre las compras y los usuarios que se les configuró el formato con anterioridad. El `join` trabaja sobre los `id` de las frutas. Luego se realiza un `map` para dejar la salida en un formato que sea fácil de trabajar después. 
Para esta consulta, `spark` divide la información de entrada en particiones más pequeñas según la clave con bajo la que se hace el join. Luego, en cada partición se hace un mapeo para unir los diferentes RDD involucrados según la clave. Luego, se vuelven a unir en un mismo RDD todos los datos procesados que fueron separados en las particiones.

### Consulta 3

```
compras_format = compras.map(lambda row: (row[2], (row[0], row[4])))

frutas_format = frutas.map(lambda row: (row[0], row[2]))

compras_format.join(frutas_format)\
    .map(lambda row: (int(row[1][0][0]), int(row[1][0][1]) * int(row[1][1])))\
    .reduceByKey(lambda r1, r2: r1 + r2)\
    .collect()
```

Al igual que antes, se hacen dos mapeos para dejar los datos en un formato con mayor facilidad de ser trabajados. Luego, al igual que antes, se hace un `join` sobre el identificador de las frutas entre las compras y las frutas (se explicó antes cómo funciona en profundidad el `join`). Luego, se hace un `map` sobre los datos unidos, en los que se multiplica la cantidad de frutas en la compra por su precio, obteniendo una nueva tupla `(id_compra, pagado_por_fruta_x_en_esa_compra)`. Finalmente, se aplica la operación `reduceByKey`, la que funciona
> agrupando los valores de un RDD por clave y luego aplicando una función de reducción a esos valores. Primero, los datos se dividen en particiones y se asigna una clave a cada elemento. Luego, los datos se reorganizan para asegurar que los elementos con la misma clave estén en la misma máquina. Finalmente, se aplica la función de reducción a los valores correspondientes a cada clave para obtener un resultado reducido por clave (ChatGPT)

### Consulta 4

```
compras_format = compras.map(lambda row: (row[2], (row[0], row[3], row[4])))

frutas_format = frutas.map(lambda row: (row[0], row[2]))

compra_fruta_precio_fecha = compras_format.join(frutas_format).map(lambda row: (*row[1][0], row[1][1]))

compra_fruta_precio_fecha\
    .map(lambda row: ((int(row[0]), row[1]), int(row[2]) * int(row[3])))\
    .reduceByKey( lambda r1, r2: r1 + r2)\
    .map( lambda row: (row[0][1], (row[0][0], row[1])))\
    .groupByKey()\
    .mapValues(list)\
    .map(lambda day_row: (day_row[0], *max(day_row[1], key=lambda pair: pair[1])))\
    .sortBy(lambda day: day[0])\
    .collect()
```

Luego de las primeras dos filas en que mediante un `map` dejamos los datos en un formato más liviano y fácil y de manipular, se raliza un `join` entre ambos RDD para así obtener tuplas con la información que nos interesa. Luego, al igual que antes, unimos por clave los datos, obteniéndose el monto total de cada compra. A este RDD obtenido, lo dejamos en el formato con los datos necesarios. A continuación, `groupByKey` particiona los datos mediante la clave, dejando a los elementos con llaves iguales en mismos clusters, los cuales agrupan los valores de las claves en un RDD, teniendo un RDD por clave, este es cambiado a formato de lista con `mapValues(list)`, que es un mapeo a cada linea aplicando la función `list`. Finalmente, se aplica un `map` que de esta lista por llave obtiene el que tiene la tupla con el mayor monto para ese día (y el id de la compra). Finalmente, mediante `sortBy`
>los datos se dividen en particiones, se reorganizan agrupando los elementos con el mismo valor de clasificación, se realiza la ordenación dentro de cada partición y finalmente se fusionan los datos en un nuevo RDD ordenado (ChatGPT)