# Guía práctica de uso de MapReduce

En la sesión práctica se presentarán los siguientes contenidos:

* Revisión del `clúster` de Hadoop levantado con `docker-compose`.
* Cálculo de la nota media con MapReduce.
* Clasificación de La Liga de fútbol con MapReduce.

# Revisión del `clúster` de Hadoop levantado con `docker-compose`

Levantamos la infraestructura del `clúster` de Hadoop que ya se utilizó en la guía práctica anterior. Para
ello abrimos un terminal y ejecutamos:

```bash
docker-compose -f hadoop.yml up
```

Deberíamos ver algo así:

![docker-compose up](./img/docker-compose.png)

En otro terminal comprobamos que se han levantado los seis contenedores ejecutando:

```bash
docker-compose -f hadoop.yml ps
```

Deberíamos ver algo similar a esto:

![docker-compose ps](./img/docker-compose-ps.png)

Como vemos, hay seis contenedores con las siguientes configuraciones:

* namenode: es el maestro del sistema de ficheros distribuido HDFS. También tiene el servidor de Jupyter en el que crearemos los `notebooks`.
* yarnmaster: es el ResourceManager de YARN.
* guia_datanode_1, guia_datanode_2, guia_datanode_3, guia_datanode_4: son 4 contenedores que realizan la función de `datanodes` de HDFS y simultáneamente de `nodeManagers` de YARN. Se puede controlar el número de contenedores creados modificando el parámetro `replicas` del servicio `datanode` en el fichero `hadoop.yml`.

En los siguientes apartados vamos a explicar brevemente como acceder por Web y por línea de comandos a estos contenedores. 

## Interfaz Web de HDFS

HDFS tiene una [interfaz Web](http://localhost:9870) que se ha mapeado en el puerto 9870 del equipo anfitrión.
Abrimos un navegador y copiamos en la barra de direcciones:

```
http://localhost:9870
```

Deberíamos ver algo similar a esto:

![hdfs overview](./img/hdfs-overview.png)

En la sección `summary` podemos ver en número de `datanodes`:

![hdfs summary](./img/hdfs-summary.png)

Pulsando sobre en enlace `Live Nodes` o sobre la pestaña `Datanodes` accedemos a la
información de cada `datanode`.

![hdfs datanodes](./img/hdfs-datanodes.png)


## Línea de comandos HDFS

El sistema de ficheros HDFS distribuido admite una serie de comandos que podemos ejecutar desde el terminal. Debemos abrir un terminal en el contenedor de `docker` en el que se ejecuta el `namenode`. Para hacerlo, abrimos otro terminal y ejecutamos:

```bash
docker exec -it namenode bash
```

En el contenedor, con el comando:

```bash
hadoop fs -help
```

se listan todos los comandos que se pueden ejecutar con HDFS:


![hdfs help](./img/hdfs-help.png)

Muchos de los comandos de la relación anterior te resultarán familiares ya que se corresponden con los equivalentes de la `bash` de Linux: `-cat`, `-rm`, `-mkdir`, `-ls`, `-head`, ....

No te preocupes si no sabes para qué se utilizan estos comandos ya que usaremos algunos de ellos en la parte práctica.

En realidad podemos ejecutar HDFS en cualquiera de los nodos del `clúster` de Hadoop. Puedes
probarlo accediendo con `docker exec` a cualquiera de los contenedores y ejecutando desde Jupyter la siguiente instrucción que listará el directorio raíz de sistema distribuido HDFS:

In [7]:
! hadoop fs -ls /

Found 2 items
drwxrwxrwx   - root supergroup          0 2022-12-04 16:18 /tmp
drwxr-xr-x   - root supergroup          0 2022-12-04 16:18 /user


En el uso de HDFS es importante diferenciar el sistema de ficheros local del sistema de ficheros distribuido. Por ejemplo, el siguiente comando es un simple comando de la `Bash` que lista la raíz del directorio local del contenedor. Observa que la salida es diferente de la instrucción anterior.

In [8]:
! ls /

app  boot     dev  hdfs  lib	lib64	media  opt   root  sbin  sys  usr
bin  dataset  etc  home  lib32	libx32	mnt    proc  run   srv	 tmp  var


## Interfaz Web de Yarn

El contenedor [`yarnmaster`](http://localhost:8088) también tiene una interfaz Web. Para acceder a ella debemos abrir en el navegador la siguiente dirección:

```
http://localhost:8088
```

Veremos lo siguiente:

![YARN](./img/yarn.png)

La información presentada no es muy interesante ya que todavía no hemos ejecutado ningún trabajo. Posteriormente, cuando realicemos algún proceso MapReduce, podrás comprobar el estado de ejecución de los trabajos en esta dirección.

# MapReduce

Es el modelo de programación que usa Hadoop. Está inspirado en el paradigma de programación funcional. Permite procesar grandes cantidades de datos al realizar una computación paralela y distribuida. La filosofía de `MapReduce` es que los datos se encuentren distribuidos en los nodos en los que se van a computar, evitando costosas operaciones de copiado de datos.  Básicamente se compone de tres fases:

* Map: En esta fase se transforman y filtran los datos de forma paralela. En esta fase no podemos hacer ninguna suposición de donde se encuentran los datos que se van a procesar ni del nodo que los va a procesar. La salida de la fase `map` es una tupla `clave, valor`.
* Shuffle & sort: Tras la fase `map`, se realiza la reunión de todos los datos que son ordenados según la clave.
* Reduce: Resume los resultados de la fase `map`.

Ninguna de las fases es obligatoria (pude haber un `map` sin `reduce` y un `reduce` sin un `map`). Además, a un proceso MapReduce puede seguir otro proceso MapReduce. Veremos ejemplo de todo ello en la parte práctica.

La descripción anterior puede resultar un tanto abstracta. Por eso vamos a implementar varios ejemplos prácticos que nos permitan entender mejor como funciona MapReduce. Como ya se ha dicho, la implementación nativa de MapReduce está hecha en Java. Para evitar la complejidad que tendría
usar MapReduce con Java, vamos a hacer los ejemplos con [Hadoop Streaming](https://hadoop.apache.org/docs/r1.2.1/streaming.html). Hadoop Streaming permite ejecutar trabajos MapReduce con cualquier proceso capaz de leer de la entrada estándar, con lo que virtualmente podemos usarlo con cualquier lenguaje de programación.

# Cálculo de la nota media con MapReduce

Supongamos que queremos saber la nota media de una serie de alumnos ordenada de mayor a menor nota media. Creamos un fichero llamado `notas.txt` de la siguiente manera:

In [1]:
! echo "pedro 6 7\nluis 0 4\nana 7\npedro 8 1 3\nana 5 6 7\nana 10\nluis 3" > notas.txt

In [2]:
! cat notas.txt

pedro 6 7
luis 0 4
ana 7
pedro 8 1 3
ana 5 6 7
ana 10
luis 3


Como vemos, en cada fila aparece el nombre del alumno y una lista de notas separadas por espacios. Un mismo alumno puede aparecer en varias filas y una fila puede tener varias notas. Se trata de calcular la nota media por alumno.

 Vamos a realizar el mismo proceso de tres formas diferentes:

* Usando la `Bash` de Linux.
* Usando Python.
* Usando una librería de Python para MapReduce denominada `mrjob`.

## Cálculo de la nota media con MapReduce con `scripts` de la `Bash`

En este ejercicio vamos a implementar el `map` y el `reduce` en la `bash`. No te preocupes si no
entiendes alguno de los comandos ya que realizaremos la misma implementación con Python.

Creamos un fichero el fichero que hará la función `map`. Imprimirá en la consola una línea con el nombre del alumno y cada una de las notas que haya obtenido. La clave será el nombre del alumno y su valor la nota obtenida. Hemos separado nombre el nombre del alumno y la nota mediante una coma. El delimitador por defecto que usa Hadoop no es la coma sino el tabulador. Más adelante verás que tenemos que indicar a MapReduce que use como delimitador la coma.

In [259]:
%%writefile mapper.sh
#!/bin/bash

# Leemos línea a línea de la entrada estándar
while read line; do
    
    # Extraemos el nombre de la línea
    name=${line%% *}
    
    # Procesamos nota a nota
    for mark in ${line#* }; do
                  
        # para cada nota emitimos nombre,nota
        echo -e "$name,$mark"
    done    
done

Overwriting mapper.sh


Damos permiso de ejecución y lo probamos desde la `Bash`. Siempre es conveniente que pruebes tus programas antes de enviarlos a Hadoop ya que te será más fácil depurar y corregir los errores.

In [504]:
! chmod +x mapper.sh

Ejecutamos el `script` para comprobar los resultados que produce. Hacemos la ejecución en la `Bash` pero siempre teniendo presente que cuando se ejecute en un `clúster` de Hadoop no se puede garantizar qué nodo procesará cada línea del proceso `map`.

In [505]:
! cat notas.txt | ./mapper.sh 

pedro,6
pedro,7
luis,0
luis,4
ana,7
pedro,8
pedro,1
pedro,3
ana,5
ana,6
ana,7
ana,10
luis,3


## Cálculo de la nota media con MapReduce con `scripts` de `Bash`

Ahora creamos el `reduce`. Tampoco podemos saber qué nodos van a procesar los datos, pero en el caso del `reducer`, tenemos garantizado que todas las claves con el mismo valor serán procesadas en el mismo nodo. La dificultad estriba en que un mismo nodo podría recibir los datos de varias claves, aunque siempre estarán ordenados por clave.

In [294]:
%%writefile reducer.sh
#!/bin/bash
prev_name=
acc=0
n_marks=0

# Leemos línea a línea
while read line; do
    # Extraemos el nombre y la nota
    name=${line%,*}
    mark=${line#*,}
    
    # Si el nombre es igual al de la anterior línea o es la primera iteración, acumulamos la suma de notas y el nḿero de notas
    if [ -z "$prev_name" -o "$prev_name" == "$name" ]; then                
        let n_marks++
        acc=$(($acc + $mark))
    
    # Cuando el nombre sea diferente, emitimos el nombre anterior,la nota media anterior
    else
        echo $prev_name,$(($acc/n_marks))
        acc=$mark
        n_marks=1
    fi
    prev_name=$name
done
           
# Emitimos el nombre y la nota media del último nombre
echo $prev_name,$(($acc/n_marks))

Overwriting reducer.sh


Damos permisos de ejecución.

In [295]:
! chmod +x reducer.sh

Ejecutamos el fichero en la `Bash` antes de ejecutarlo en Hadoop. Entre el proceso `map` y el `reduce`, Hadoop ordenará el fichero por clave y repartirá las claves entre nodos del `clúster`. Nosotros simulamos este proceso con una tubería `sort` intermedia.

In [506]:
! cat notas.txt | ./mapper.sh | sort | ./reducer.sh

ana,7
luis,2
pedro,5


Observa que se ha calculado la nota media sin decimales. Esta es una debilidad de nuestra implementación en la `Bash` que se solucionará posteriormente en la implementación con Python.

Ya casi estamos preparados para ejecutar el proceso en Hadoop. Para poder trabajar con Hadoop, el fichero de datos hay que "subirlo" a HDFS. También hay que indicar el directorio de salida del proceso MapReduce. En posteriores ejecuciones, si vuelves a subir el mismo fichero o a utilizar el mismo directorio de salida, se producirá un error. Por eso lo primero que vamos a hacer es asegurarnos de borrar el fichero de datos y el directorio de salida en caso de que ya existieran. 

In [279]:
! hadoop fs -rm -f -r /user/root/notas.txt /user/root/output

Deleted /user/root/notas.txt
Deleted /user/root/output


Copiamos el fichero de notas del equipo local a HDFS.

In [280]:
! hadoop fs -copyFromLocal notas.txt /user/root/

Listamos el directorio para ver el archivo copiado.

In [239]:
! hadoop fs -ls /user/root

Found 1 items
-rw-r--r--   3 root supergroup         61 2022-12-04 21:09 /user/root/notas.txt


La salida es similar a la que produce el comando `ls -l` en la `Bash`. El número 3 que ves sin embargo tiene un significado diferente. En este caso lo que indica el 3 es que el fichero se ha replicado en tres nodos. 3 es el factor de replicación por defecto en Hadoop.

Podemos examinar el fichero `notas.txt` en HDFS.

In [240]:
! hadoop fs -cat /user/root/notas.txt

pedro 6 7
luis 0 4
ana 7
pedro 8 1 3
ana 5 6 7
ana 10
luis 3


También podemos conocer en qué nodos está almacenado el fichero.

In [241]:
! hdfs fsck /user/root/notas.txt -files -locations -blocks

Connecting to namenode via http://namenode:9870/fsck?ugi=root&files=1&locations=1&blocks=1&path=%2Fuser%2Froot%2Fnotas.txt
FSCK started by root (auth:SIMPLE) from /172.18.0.7 for path /user/root/notas.txt at Sun Dec 04 21:09:07 CET 2022

/user/root/notas.txt 61 bytes, replicated: replication=3, 1 block(s):  OK
0. BP-858413720-172.17.0.2-1624180586568:blk_1073741849_1025 len=61 Live_repl=3  [DatanodeInfoWithStorage[172.18.0.2:9866,DS-7746e6b9-217a-497e-95c9-d05614f224b3,DISK], DatanodeInfoWithStorage[172.18.0.6:9866,DS-3ba3a233-9506-4e45-864c-c7f3852ee857,DISK], DatanodeInfoWithStorage[172.18.0.3:9866,DS-3b152d79-c3ff-4577-8e69-1d13e3976bc7,DISK]]


Status: HEALTHY
 Number of data-nodes:	4
 Number of racks:		1
 Total dirs:			0
 Total symlinks:		0

Replicated Blocks:
 Total size:	61 B
 Total files:	1
 Total blocks (validated):	1 (avg. block size 61 B)
 Minimally replicated blocks:	1 (100.0 %)
 Over-replicated blocks:	0 (0.0 %)
 Under-replicated blocks:	0 (0.0 %)
 Mis-replicated blocks:		

Ya podemos ejecutar el proceso MapReduce con el programa `mapred`. Observa que hemos indicado que vamos a usar la coma como delimitador; le tenemos que dar la ruta de acceso a los ficheros  `map` y `reduce` y qué función tendrán cada uno; por último, tenemos que indicar el directorio de salida.

In [281]:
! mapred streaming \
    -D mapred.textoutputformat.separator="," \
    -files /media/notebooks/mapper.sh,/media/notebooks/reducer.sh \
    -input /user/root/notas.txt \
    -output /user/root/output \
    -mapper mapper.sh \
    -reducer reducer.sh

packageJobJar: [] [/app/hadoop-3.3.1/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar] /tmp/streamjob7345304166545004129.jar tmpDir=null
2022-12-04 21:23:14,526 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at yarnmaster/172.18.0.4:8032
2022-12-04 21:23:14,810 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at yarnmaster/172.18.0.4:8032
2022-12-04 21:23:15,133 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1670167100255_0011
2022-12-04 21:23:15,419 INFO mapred.FileInputFormat: Total input files to process : 1
2022-12-04 21:23:15,491 INFO mapreduce.JobSubmitter: number of splits:2
2022-12-04 21:23:15,517 INFO Configuration.deprecation: mapred.textoutputformat.separator is deprecated. Instead, use mapreduce.output.textoutputformat.separator
2022-12-04 21:23:15,625 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1670167100255_0011
2022-12-04 21:23:1

Tras la ejecución podemos mostrar el contenido del directorio de salida.

In [252]:
! hadoop fs -ls /user/root/output

Found 2 items
-rw-r--r--   3 root supergroup          0 2022-12-04 21:13 /user/root/output/_SUCCESS
-rw-r--r--   3 root supergroup         18 2022-12-04 21:13 /user/root/output/part-00000


Si imprimimos el contendido, comprobamos que se ha calculado la media de notas de cada usuario.

In [282]:
! hadoop fs -cat /user/root/output/*

ana,7,
luis,2,
pedro,5,


## Cálculo de la nota media con MapReduce con `scripts` de `Python`

Podemos traducir los `scripts` de la `Bash` a sintaxis `Python`, que es más fácil e intuitiva. En este caso no vamos a cambiar el delimitador por defecto, que es el tabulador.

In [3]:
%%writefile mapper.py
#!/usr/bin/python3

import sys

# Leemos línea a línea de la entrada estándar
for line in sys.stdin:  
    # Extraemos el nombre y las notas
    name, *marks = line.split()
    
    # Procesamos nota a nota
    for mark in marks:
        print(f'{name}\t{mark}')

Writing mapper.py


Comprobamos que el `mapper` funciona correctamente.

In [4]:
! chmod ugo+x mapper.py

In [6]:
! cat notas.txt | ./mapper.py | sort

ana	10
ana	5
ana	6
ana	7
ana	7
luis	0
luis	3
luis	4
pedro	1
pedro	3
pedro	6
pedro	7
pedro	8


Traducimos también el `reducer` a `Python`. Hay que tener la precaución de convertir las notas a un tipo numérico (`float` en este caso) ya que `split()` las devuelve como cadenas.

In [7]:
%%writefile reducer.py
#!/usr/bin/python3

import sys

prev_name=''
acc=0
n_marks=0

# Leemos línea a línea de la entrada estándar
for line in sys.stdin: 
    
    name, mark = line.split()
    
    # Si el nombre es igual al de la anterior línea o es la primera iteración, acumulamos la suma de notas y el nḿero de notas
    if not prev_name or prev_name == name:                
        n_marks = n_marks + 1
        acc = acc + float(mark)
    
    # Cuando el nombre sea diferente, emitimos el nombre anterior,la nota media anterior
    else:
        print(f'{prev_name}\t{acc/n_marks}')
        acc=float(mark)
        n_marks=1
    prev_name=name
           
# Emitimos el nombre y la nota media del último nombre
print(f'{prev_name}\t{acc/n_marks}')

Writing reducer.py


Damos permisos de ejecución y probamos el `script`. Vemos que se calcula correctamente la media corrigiendo el problema de los decimales.

In [8]:
! chmod ugo+x reducer.py

In [9]:
! cat notas.txt | ./mapper.py | sort | ./reducer.py

ana	7.0
luis	2.3333333333333335
pedro	5.0


Ahora ya podemos ejecutar el proceso MapReduce en Hadoop. Haciendo exactamente lo mismo que en el caso de la implementación con la `Bash`:

* Borramos el fichero `notas.txt` y el directorio de salida.
* Copiamos el ficheros `notas.txt` en HDFS.
* Llamamos al programa `mapred`.
* Examinamos los resultados.

In [10]:
! hadoop fs -rm -f -r /user/root/notas.txt /user/root/output

Deleted /user/root/notas.txt
Deleted /user/root/output


In [11]:
! hadoop fs -copyFromLocal notas.txt /user/root/

In [12]:
! mapred streaming \
    -files /media/notebooks/mapper.py,/media/notebooks/reducer.py \
    -input /user/root/notas.txt \
    -output /user/root/output \
    -mapper mapper.py \
    -reducer reducer.py

packageJobJar: [] [/app/hadoop-3.3.1/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar] /tmp/streamjob8235456726287670683.jar tmpDir=null
2023-01-10 11:04:28,127 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at yarnmaster/172.20.0.6:8032
2023-01-10 11:04:28,447 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at yarnmaster/172.20.0.6:8032
2023-01-10 11:04:28,680 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1673341397441_0002
2023-01-10 11:04:29,110 INFO mapred.FileInputFormat: Total input files to process : 1
2023-01-10 11:04:29,210 INFO mapreduce.JobSubmitter: number of splits:2
2023-01-10 11:04:29,366 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1673341397441_0002
2023-01-10 11:04:29,366 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-01-10 11:04:29,594 INFO conf.Configuration: resource-types.xml not found
2023-01-10 11:04:29,595

In [13]:
! hadoop fs -cat /user/root/output/*

ana	7.0
luis	2.3333333333333335
pedro	5.0


## Cálculo de la nota media con MapReduce con `mrjob`

[`mrjob`](https://mrjob.readthedocs.io/en/latest/) es una librería de Python que facilita enormemente el proceso MapReduce.

En `mrjob` simplemente tenemos que escribir un único fichero que contenga una clase que extienda
`MRJob`y que defina los métodos `mapper` y `reduce`. Ninguno de los dos métodos es obligatorio. 

El método `mapper` inicial será llamado una vez por cada línea con dos parámetros: una clave `None` y el valor con el contenido completo de la línea. El `mapper` debe procesar la línea extrayendo la información que requiera y emitiendo (usando `yield`) una tupla "clave, valor". Aunque no es imprescindible puedes ampliar información sobre los generadores de Python y lo que significa `yield` [aquí](https://realpython.com/introduction-to-python-generators/).

El método `reduce` recibe la clave emitida por el `mapper` y un iterador con los valores de esa clave. Al recibir todos los valores de cada clave en un único parámetro se simplifica enormemente el proceso que realizamos en los apartados anteriores.


In [14]:
%%writefile marksMR.py
#!/usr/bin/python3

from mrjob.job import MRJob
from statistics import mean
    
#Definimos una clase MrJob
class MarksMR(MRJob):
        
    # Mapper: En esta etapa aún no hay clave (_), el valor lo recibimos en la variable line
    def mapper(self, _, line):
        #Por cada línea, esta se divide en los campos que forman las columnas
        name, *marks = line.split()
        for mark in marks:            
            yield name, float(mark)
         
    #Reducer: La clave será el nombre y los valores las notas
    def reducer(self, name, marks):
        yield name, mean(marks)
        
if __name__=='__main__':
    MarksMR.run()

Writing marksMR.py


Damos permisos de ejecución al `script`.

In [15]:
! chmod ugo+x marksMR.py

Y lo probamos localmente.

In [16]:
! python3 marksMR.py notas.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/marksMR.root.20230110.101042.336837
Running step 1 of 1...
job output is in /tmp/marksMR.root.20230110.101042.336837/output
Streaming final output from /tmp/marksMR.root.20230110.101042.336837/output...
"ana"	7.0
"pedro"	5.0
"luis"	2.3333333333333335
Removing temp directory /tmp/marksMR.root.20230110.101042.336837...


Para probarlo en el `clúster` de Hadoop simplemente tenemos que añadir una opción. Observa que no es necesario copiar previamente el fichero en HDFS ya que `mrjob` lo hace por nosotros. 

In [17]:
! python3 marksMR.py -r hadoop hdfs:///user/root/notas.txt

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /app/hadoop-3.3.1/bin...
Found hadoop binary: /app/hadoop-3.3.1/bin/hadoop
Using Hadoop version 3.3.1
Looking for Hadoop streaming jar in /app/hadoop-3.3.1...
Found Hadoop streaming jar: /app/hadoop-3.3.1/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar
Creating temp directory /tmp/marksMR.root.20230110.101118.787064
uploading working dir files to hdfs:///user/root/tmp/mrjob/marksMR.root.20230110.101118.787064/files/wd...
Copying other local files to hdfs:///user/root/tmp/mrjob/marksMR.root.20230110.101118.787064/files/
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar7726622170482410347/] [] /tmp/streamjob7406220662964122941.jar tmpDir=null
  Connecting to ResourceManager at yarnmaster/172.20.0.6:8032
  Connecting to ResourceManager at yarnmaster/172.20.0.6:8032
  Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1673341397441_0

# Clasificación de La Liga de fútbol con MapReduce

Vamos a hacer un ejercicio un poco más complicado con MapReduce. En esta ocasión se trata de calcular la clasificación de la Liga de Fútbol de Primera División Española de la temporada 2021/2022 a partir de los resultados de los partidos que se disputaron.

En [esta Web](https://www.football-data.co.uk/spainm.php) podemos descargar los resultados de los partidos de las últimas temporadas.

![www.football-data.co.uk](./img/www.football-data.co.uk.png)

Descargamos el fichero de resultados de la temporada 2021/2022 y lo renombramos a `laliga2122.csv`.

In [18]:
! wget -O laliga2122.csv https://www.football-data.co.uk/mmz4281/2122/SP1.csv

--2023-01-10 11:13:24--  https://www.football-data.co.uk/mmz4281/2122/SP1.csv
Resolving www.football-data.co.uk (www.football-data.co.uk)... 217.160.0.246, ::ffff:217.160.0.246
Connecting to www.football-data.co.uk (www.football-data.co.uk)|217.160.0.246|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 172174 (168K) [text/csv]
Saving to: ‘laliga2122.csv’


2023-01-10 11:13:25 (1.04 MB/s) - ‘laliga2122.csv’ saved [172174/172174]



Mostramos las dos primeras líneas del fichero. Observa que la primera línea es la cabecera y la siguiente es la información sobre un partido de fútbol. Ambas líneas tienen los campos separados por comas (es lo que significa `csv`: "comma-separated values").

In [19]:
! head -2 laliga2122.csv

Div,Date,Time,HomeTeam,AwayTeam,FTHG,FTAG,FTR,HTHG,HTAG,HTR,HS,AS,HST,AST,HF,AF,HC,AC,HY,AY,HR,AR,B365H,B365D,B365A,BWH,BWD,BWA,IWH,IWD,IWA,PSH,PSD,PSA,WHH,WHD,WHA,VCH,VCD,VCA,MaxH,MaxD,MaxA,AvgH,AvgD,AvgA,B365>2.5,B365<2.5,P>2.5,P<2.5,Max>2.5,Max<2.5,Avg>2.5,Avg<2.5,AHh,B365AHH,B365AHA,PAHH,PAHA,MaxAHH,MaxAHA,AvgAHH,AvgAHA,B365CH,B365CD,B365CA,BWCH,BWCD,BWCA,IWCH,IWCD,IWCA,PSCH,PSCD,PSCA,WHCH,WHCD,WHCA,VCCH,VCCD,VCCA,MaxCH,MaxCD,MaxCA,AvgCH,AvgCD,AvgCA,B365C>2.5,B365C<2.5,PC>2.5,PC<2.5,MaxC>2.5,MaxC<2.5,AvgC>2.5,AvgC<2.5,AHCh,B365CAHH,B365CAHA,PCAHH,PCAHA,MaxCAHH,MaxCAHA,AvgCAHH,AvgCAHA
SP1,13/08/2021,20:00,Valencia,Getafe,1,0,H,1,0,H,4,22,2,4,24,15,1,9,6,3,1,1,2.55,3,3.1,2.65,3,2.95,2.65,2.9,3.05,2.7,3.03,3.11,2.55,3,3,2.63,3,3,2.73,3.2,3.23,2.64,3.01,3.06,2.62,1.5,2.75,1.5,2.75,1.51,2.65,1.49,0,1.82,2.11,1.83,2.11,1.88,2.13,1.81,2.08,2.37,3,3.3,2.45,3,3.25,2.4,2.95,3.4,2.47,3.04,3.48,2.35,3,3.3,2.45,3,3.3,2.57,3.1,3.58,2.42,3,3.34,2.75,1.44,2.84,1.48,2.84,1.51,2.68,1.47,-0.25,2.06

Para entender el significado de cada campo, la Web tiene un fichero de [`metadata`](https://www.football-data.co.uk/notes.txt). En la imagen se muestran los campos relevantes para el proceso que queremos realizar.

![metadata](./img/metadata.png)

Concretamente, los campos 4º y 5º contienen los nombres de los equipos local y visitante respectivamente y el campo 8º informa cuál de ellos obtuvo la victoria. Así 'H' significa que ganó el equipo local, 'A' que lo hizo el visitante y 'D' que empataron. Sabiendo que el equipo que gana obtiene 3 puntos, el que pierde 0 puntos y si empatan ambos equipos se llevan 1 punto, podemos calcular la clasificación final de la liga.

Este ejercicio lo vamos a resolver únicamente con `mrjob`. En este caso hemos tenido que hacer uso del método `steps` de `mrjob` que permite definir etapas. En este ejercicio es necesario ya que tenemos dos reductores, uno para calcular la suma de los puntos de un equipo y otro para calcular la clasificación. Esta es una descripción del proceso MapReduce:

* La primera etapa comienza con el `mapper` al que hemos llamado `mapper_points` que lo que hace es procesar cada línea que corresponde a un partido y extraer los equipos que se enfrentan. Emite como clave el nombre del equipo y como valor los puntos que ha obtenido.
* El `combiner_points` es un combinador. Es un proceso que hace una función parecida al reductor y que permite optimizar el funcionamiento ya que hace agregaciones parciales e intermedias antes de enviarlas al reductor.
* El `reducer_points` recibe como clave cada equipo y como valor un iterador con los puntos que ha obtenido ese equipo. Emite como clave `None` y como valor una tupla que contiene el nombre del equipo y la suma de los puntos. Al emitir una clave `None` todos las tuplas emitidas serán procesadas en un único reductor en la próxima etapa. Es muy importante asegurar que el volumen de datos que reciba ese reductor sea pequeño.
* La segunda etapa sólo consta de un reductor llamado `reducer_classification`. Este reductor ignora la clave ya que no contiene información útil y como valor recibe un iterador de tuplas `equipo,puntos` emitido por el reductor de la primera etapa, `reducer_points`. Lo que hace es emitir una clave nula con los equipos ordenados por puntos de mayor a menor.

In [20]:
%%writefile laligaMR.py
#!/usr/bin/python3

from mrjob.job import MRJob
from mrjob.step import MRStep
    
class LaLigaMR(MRJob):
        
    # Mapper: En esta etapa aún no hay clave (_), el valor lo recibimos en la variable line
    def mapper_points(self, _, line):
        #Por cada línea, esta se divide en los campos que forman las columnas
        _, _, _, home_team, away_team, _, _, result, *rest = line.split(',')
        
        # Si es la cabecera no emitimos nada
        if home_team == "HomeTeam":
            return
        
        if result == 'D':            
            yield home_team, 1
            yield away_team, 1
        elif result == 'H':
            yield home_team, 3
        else:
            yield away_team, 3
            
    def combiner_points(self, team, points):
        yield team, sum(points)
            
    def reducer_points(self, team, points):
        yield None, (team, sum(points))
        
    def reducer_classification(self, _, points):
        yield None, sorted(points, key=lambda t: t[1], reverse=True)
        
    def steps(self):
        return [
            MRStep(mapper=self.mapper_points,
                   combiner=self.combiner_points,
                   reducer=self.reducer_points),
            MRStep(reducer=self.reducer_classification)
        ]
         
if __name__=='__main__':
    LaLigaMR.run()

Writing laligaMR.py


Ejecutamos el `script` y obtenemos la clasificación final. 

In [21]:
! python3 laligaMR.py laliga2122.csv

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/laligaMR.root.20230110.102631.150652
Running step 1 of 2...
Running step 2 of 2...
job output is in /tmp/laligaMR.root.20230110.102631.150652/output
Streaming final output from /tmp/laligaMR.root.20230110.102631.150652/output...
null	[["Real Madrid", 86], ["Barcelona", 73], ["Ath Madrid", 71], ["Sevilla", 70], ["Betis", 65], ["Sociedad", 62], ["Villarreal", 59], ["Ath Bilbao", 55], ["Valencia", 48], ["Osasuna", 47], ["Celta", 46], ["Vallecano", 42], ["Elche", 42], ["Espanol", 42], ["Mallorca", 39], ["Getafe", 39], ["Cadiz", 39], ["Granada", 38], ["Levante", 35], ["Alaves", 31]]
Removing temp directory /tmp/laligaMR.root.20230110.102631.150652...


Podemos comprobar que la clasificación calculada con MapReduce se corresponde con la [oficial de la temporada 2021/222](https://www.google.com/search?q=clasificacion+liga+2021+2022&oq=clasificacion+liga+2021+2022#sie=lg) con la salvedad de los equipos que han obtenido los mismos puntos en los que la posición oficial puede diferir de la que hemos calculado nosotros. Esto es perfectamente lógico ya que en caso de que dos equipos empaten a puntos, se tienen en cuenta criterios auxiliares como los resultados que han obtenido en enfrentamientos directos o la diferencia de goles, que no ha computado nuestro proceso MapReduce.

![clasificación](./img/clasificacion.png)

Comprobada que la ejecución local es correcta, podemos probar la ejecución en el `clúster`:

In [22]:
! python3 laligaMR.py -r hadoop laliga2122.csv

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /app/hadoop-3.3.1/bin...
Found hadoop binary: /app/hadoop-3.3.1/bin/hadoop
Using Hadoop version 3.3.1
Looking for Hadoop streaming jar in /app/hadoop-3.3.1...
Found Hadoop streaming jar: /app/hadoop-3.3.1/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar
Creating temp directory /tmp/laligaMR.root.20230110.102718.695888
uploading working dir files to hdfs:///user/root/tmp/mrjob/laligaMR.root.20230110.102718.695888/files/wd...
Copying other local files to hdfs:///user/root/tmp/mrjob/laligaMR.root.20230110.102718.695888/files/
Running step 1 of 2...
  packageJobJar: [/tmp/hadoop-unjar3788282063341730512/] [] /tmp/streamjob5947904667860205735.jar tmpDir=null
  Connecting to ResourceManager at yarnmaster/172.20.0.6:8032
  Connecting to ResourceManager at yarnmaster/172.20.0.6:8032
  Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_167334139744

In [23]:
%%writefile laligaMR.py
#!/usr/bin/python3

from mrjob.job import MRJob
from mrjob.step import MRStep
from datetime import datetime
    
class LaLigaMR(MRJob):
    
    SORT_VALUES = True
        
    # Mapper: En esta etapa aún no hay clave (_), el valor lo recibimos en la variable line
    def mapper_points(self, _, line):
        #Por cada línea, esta se divide en los campos que forman las columnas
        _, date, _, home_team, away_team, _, _, result, *rest = line.split(',')
        
        # Si es la cabecera no emitimos nada
        if home_team == "HomeTeam":
            return
        
        date = datetime.strptime(date, "%d/%m/%Y").strftime("%Y/%m/%d")

        if result == 'D':            
            yield home_team, (date, 1)
            yield away_team, (date, 1)
        elif result == 'H':
            yield home_team, (date, 3)
            yield away_team, (date, 0)
        else:
            yield home_team, (date, 0)
            yield away_team, (date, 3)
            
    def reducer_points(self, team, points):
        points = list(points)
        points = [p for date, p in points]
        five_latest_points = points[-5:]
        five_latest_points.reverse()
        yield None, (team, sum(points), five_latest_points)
    
    
    def reducer_classification(self, _, points):
            yield None, sorted(points, key=lambda t: t[1], reverse=True)
            
    def steps(self):
        return [
            MRStep(mapper=self.mapper_points, reducer=self.reducer_points),
            MRStep(reducer=self.reducer_classification)
        ]
         
if __name__=='__main__':
    LaLigaMR.run()

Overwriting laligaMR.py


In [24]:
! python3 laligaMR.py laliga2122.csv

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/laligaMR.root.20230110.102837.425932
Running step 1 of 2...
Running step 2 of 2...
job output is in /tmp/laligaMR.root.20230110.102837.425932/output
Streaming final output from /tmp/laligaMR.root.20230110.102837.425932/output...
null	[["Real Madrid", 86, [1, 1, 3, 0, 3]], ["Barcelona", 73, [0, 1, 3, 3, 3]], ["Ath Madrid", 71, [3, 1, 3, 3, 0]], ["Sevilla", 70, [3, 1, 1, 1, 1]], ["Betis", 65, [1, 3, 3, 0, 1]], ["Sociedad", 62, [0, 3, 3, 0, 1]], ["Villarreal", 59, [3, 0, 3, 1, 0]], ["Ath Bilbao", 55, [0, 3, 0, 1, 3]], ["Valencia", 48, [3, 1, 0, 1, 1]], ["Osasuna", 47, [0, 0, 1, 1, 1]], ["Celta", 46, [0, 3, 0, 3, 1]], ["Elche", 42, [3, 0, 0, 0, 1]], ["Espanol", 42, [1, 1, 0, 1, 0]], ["Vallecano", 42, [0, 0, 0, 1, 1]], ["Cadiz", 39, [3, 1, 0, 3, 1]], ["Getafe", 39, [0, 1, 1, 1, 1]], ["Mallorca", 39, [3, 3, 1, 0, 0]], ["Granada", 38, [1, 0, 3, 3, 1]], ["Levante", 35, [3, 3