# Plantilla para la Tarea online BDA02

# Alberto Diéguez Álvarez:

En esta tarea deberás completar las celdas que están incompletas. Se muestra el resultado esperado de la ejecución. Se trata de que implementes un proceso MapReduce que produzca ese resultado. Puedes implementar el proceso MapReduce con el lenguaje y librería que prefieras (`Bash`, Python, `mrjob` ...). Los datos de entrada del proceso son meros ejemplos y el proceso que implementes debería funcionar con esos y cualquier otro fichero de entrada que tenga la misma estructura.

## 1.- Partiendo del fichero de `notas.txt`, calcula la nota más alta obtenida por cada alumno con un proceso MapReduce.

Es decir, que si tenemos el fichero de notas:

In [1]:
%%writefile notas.txt
pedro 6 7
luis 0 4
ana 7
pedro 8 1 3
ana 5 6 7
ana 10
luis 3

Overwriting notas.txt


Se espera obtener el siguiente resultado:

![solución 1](./img/1.png)

Leemos cada línea, extraemos el nombre y tratamos la nota.

In [2]:
%%writefile mapperNotas.py
#!/usr/bin/python3

import sys

# Leemos línea a línea de la entrada estándar
for line in sys.stdin:  
    # Extraemos el nombre y las notas
    name, *marks = line.split()
    marks = list(map(int, marks))
    
    # Emitimos cada nota con el nombre del estudiante como clave
    for mark in marks:
        print(f'{name}\t{mark}')

Overwriting mapperNotas.py


In [3]:
! chmod ugo+x mapperNotas.py

In [4]:
! cat notas.txt | ./mapperNotas.py

pedro	6
pedro	7
luis	0
luis	4
ana	7
pedro	8
pedro	1
pedro	3
ana	5
ana	6
ana	7
ana	10
luis	3


En el siguiente script comprobamos que el nombre no se repite y que la nota es la más alta.

In [5]:
%%writefile reducerNotas.py
#!/usr/bin/python3

import sys

prev_name=''
max_mark = -1

# Leemos línea a línea de la entrada estándar
for line in sys.stdin: 
    
    name, mark = line.split()
    mark = float(mark)
    
    # Si el nombre es igual al de la anterior línea o es la primera iteración, acumulamos la suma de notas y el nḿero de notas
    if not prev_name or prev_name == name:                
        max_mark  = max(max_mark, mark)
    
    # Cuando el nombre sea diferente, emitimos el nombre anterior y la nota más alta
    else:
        print(f'"{prev_name}"\t{max_mark:.1f}')
        max_mark = mark
        
    prev_name=name
           
# Emitimos el nombre y la nota más alta del último nombre
print(f'"{prev_name}"\t{max_mark:.1f}')

Overwriting reducerNotas.py


In [6]:
! chmod ugo+x reducerNotas.py

In [7]:
! cat notas.txt | ./mapperNotas.py | sort | ./reducerNotas.py

"ana"	10.0
"luis"	4.0
"pedro"	8.0


En los siguientes dos scripts lo que hago es borrar los archivos, así al ejecutar el libro entero no me da error.

In [8]:
! hadoop fs -rm /user/root/notas.txt

Deleted /user/root/notas.txt


In [9]:
! hadoop fs -rm -r /user/root/output/

Deleted /user/root/output


In [10]:
! hadoop fs -copyFromLocal notas.txt /user/root/

In [11]:
! hadoop fs -ls /user/root

Found 2 items
-rw-r--r--   3 root supergroup         61 2025-01-07 16:13 /user/root/notas.txt
drwxr-xr-x   - root supergroup          0 2025-01-07 11:14 /user/root/tmp


In [12]:
! mapred streaming \
    -files /media/notebooks/mapperNotas.py,/media/notebooks/reducerNotas.py \
    -input /user/root/notas.txt \
    -output /user/root/output \
    -mapper mapperNotas.py \
    -reducer reducerNotas.py

packageJobJar: [] [/app/hadoop-3.3.1/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar] /tmp/streamjob3439386721766760603.jar tmpDir=null
2025-01-07 16:13:19,557 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at yarnmaster/172.20.0.2:8032
2025-01-07 16:13:19,633 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at yarnmaster/172.20.0.2:8032
2025-01-07 16:13:19,732 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1736255952742_0035
2025-01-07 16:13:19,878 INFO mapred.FileInputFormat: Total input files to process : 1
2025-01-07 16:13:19,906 INFO mapreduce.JobSubmitter: number of splits:2
2025-01-07 16:13:19,968 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1736255952742_0035
2025-01-07 16:13:19,968 INFO mapreduce.JobSubmitter: Executing with tokens: []
2025-01-07 16:13:20,055 INFO conf.Configuration: resource-types.xml not found
2025-01-07 16:13:20,055

In [13]:
! hadoop fs -cat /user/root/output/*

"ana"	10.0
"luis"	4.0
"pedro"	8.0


Añado una foto con la salida del script.
![image](./img/alberto_1.png)

## 2.- Usando un proceso MapReduce muestra las 10 palabras más utilizadas en `El Quijote`.

Lo primero será descargar El Quijote:

In [14]:
! wget -O '2000-0.txt' https://www.gutenberg.org/files/2000/2000-0.txt

--2025-01-07 16:13:33--  https://www.gutenberg.org/files/2000/2000-0.txt
Resolving www.gutenberg.org (www.gutenberg.org)... 152.19.134.47, 2610:28:3090:3000:0:bad:cafe:47
Connecting to www.gutenberg.org (www.gutenberg.org)|152.19.134.47|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2226045 (2.1M) [text/plain]
Saving to: ‘2000-0.txt’


2025-01-07 16:13:34 (2.47 MB/s) - ‘2000-0.txt’ saved [2226045/2226045]



Al igual que hicimos en la primera práctica, eliminamos aquellas líneas que son metadata y no forman parte de la obra. Sobrescribimos el fichero sin esas líneas.

In [15]:
with open('2000-0.txt') as f:
    lines = f.readlines()

head = 24
tail = 360
book = lines[head:-tail]

with open('2000-0.txt', 'w') as f:
    for line in book:
        f.write(f"{line}\n")


El resultado debería ser el mismo que el que obtuvimos en la primera práctica.

![solución 2](./img/2.png)

Creamos el mapper que inicia el contador, convierte a minúsculas y al final muestra la frecuencia de las palabras.

In [16]:
%%writefile mapperWords.py
#!/usr/bin/python3

import sys
import re
from collections import Counter

# Inicializamos un contador para las palabras
word_count = Counter()

# Leemos línea por línea de la entrada estándar
for line in sys.stdin:
    # Convertir la línea a minúsculas y eliminar caracteres no alfabéticos
    line = re.sub(r'[^a-zA-Z\s]', '', line.lower())
    # Dividir la línea en palabras
    words = line.split()
    
    # Actualizamos el contador de palabras
    word_count.update(words)

# Después de procesar todas las líneas, emitimos la frecuencia de las palabras
for word, count in word_count.items():
    print(f'{count}\t{word}')


Overwriting mapperWords.py


In [17]:
! chmod ugo+x mapperWords.py

Hago esto en el código porque si no me da una lista enorme. Hago esto solo para este ejemplo.

In [18]:
! cat 2000-0.txt | ./mapperWords.py 2>/dev/null | head -n 5

8265	el
29	ingenioso
76	hidalgo
2714	don
2241	quijote


Creamos el reduce que acumula la frecuencia de las palabras y las ordena.

In [19]:
%%writefile reducerWords.py
#!/usr/bin/python3
import sys

# Creamos un diccionario para almacenar las palabras y sus frecuencias
word_count = {}

# Leemos línea a línea de la entrada estándar
for line in sys.stdin:
    # Limpiamos los espacios en blanco antes y después de la línea
    line = line.strip()
    
    # Si la línea no está vacía, procesamos
    if line:
        try:
            count, word = line.split("\t")
            count = int(count)
            
            # Acumulamos la frecuencia de cada palabra
            if word in word_count:
                word_count[word] += count
            else:
                word_count[word] = count
        except ValueError:
            print(word_count)
            continue

# Convertimos las palabras y frecuencias a una lista de tuplas
sorted_words = sorted(word_count.items(), key=lambda x: x[1], reverse=True)

# Imprimimos la salida en el formato esperado: lista de pares [frecuencia, palabra]
output = "["
output += ", ".join([f"[{freq}, '{word}']" for word, freq in sorted_words[:10]])
output += "]"
print(output)



Overwriting reducerWords.py


In [20]:
! chmod ugo+x reducerWords.py

In [21]:
! cat 2000-0.txt | ./mapperWords.py | sort | ./reducerWords.py

[[20767, 'que'], [18410, 'de'], [18271, 'y'], [10492, 'la'], [9875, 'a'], [8284, 'en'], [8265, 'el'], [6346, 'no'], [4769, 'los'], [4768, 'se']]


In [22]:
! hadoop fs -rm /user/root/2000-0.txt

rm: `/user/root/2000-0.txt': No such file or directory


In [23]:
! hadoop fs -rm -r /user/root/output/

Deleted /user/root/output


In [44]:
! hadoop fs -copyFromLocal 2000-0.txt /user/root/

In [25]:
! hadoop fs -ls /user/root

Found 3 items
-rw-r--r--   3 root supergroup    2205995 2025-01-07 16:13 /user/root/2000-0.txt
-rw-r--r--   3 root supergroup         61 2025-01-07 16:13 /user/root/notas.txt
drwxr-xr-x   - root supergroup          0 2025-01-07 11:14 /user/root/tmp


In [45]:
! mapred streaming \
    -files /media/notebooks/mapperWords.py,/media/notebooks/reducerWords.py \
    -input /user/root/2000-0.txt \
    -output /user/root/output \
    -mapper mapperWords.py \
    -reducer reducerWords.py

packageJobJar: [] [/app/hadoop-3.3.1/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar] /tmp/streamjob4200082116501785518.jar tmpDir=null
2025-01-07 16:33:27,678 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at yarnmaster/172.20.0.2:8032
2025-01-07 16:33:27,755 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at yarnmaster/172.20.0.2:8032
2025-01-07 16:33:27,855 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1736255952742_0045
2025-01-07 16:33:27,994 INFO mapred.FileInputFormat: Total input files to process : 1
2025-01-07 16:33:28,019 INFO mapreduce.JobSubmitter: number of splits:2
2025-01-07 16:33:28,085 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1736255952742_0045
2025-01-07 16:33:28,085 INFO mapreduce.JobSubmitter: Executing with tokens: []
2025-01-07 16:33:28,173 INFO conf.Configuration: resource-types.xml not found
2025-01-07 16:33:28,173

In [27]:
! hadoop fs -cat /user/root/output/*

[[20767, 'que'], [18410, 'de'], [18271, 'y'], [10492, 'la'], [9875, 'a'], [8284, 'en'], [8265, 'el'], [6346, 'no'], [4769, 'los'], [4768, 'se']]	


Añado una foto con la salida del script.
![image](./img/alberto_2.png)

## 3.- Muestra la clasificación de temporada 2021/2022 de La Liga pero únicamente de los puntos obtenidos como visitante.

En [esta Web](https://resultados.as.com/resultados/futbol/primera/2021_2022/clasificacion/) puedes consultar cuántos puntos obtuvo cada equipo fuera de casa.

Empezamos descargando el fichero de resultados de la temporada 2021/2022 y renombrándolo a `laliga2122.csv`.

In [28]:
! wget -O laliga2122.csv https://www.football-data.co.uk/mmz4281/2122/SP1.csv

--2025-01-07 16:13:57--  https://www.football-data.co.uk/mmz4281/2122/SP1.csv
Resolving www.football-data.co.uk (www.football-data.co.uk)... 217.160.0.246
Connecting to www.football-data.co.uk (www.football-data.co.uk)|217.160.0.246|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 172174 (168K) [text/csv]
Saving to: ‘laliga2122.csv’


2025-01-07 16:13:57 (1.09 MB/s) - ‘laliga2122.csv’ saved [172174/172174]



Se espera este resultado:

![solución 3](./img/3.png)

In [29]:
! hadoop fs -rm -r /user/root/*

Deleted /user/root/2000-0.txt
Deleted /user/root/notas.txt
Deleted /user/root/output
Deleted /user/root/tmp


Mostramos las dos primeras líneas del fichero. Observa que la primera línea es la cabecera y la siguiente es la información sobre un partido de fútbol. Ambas líneas tienen los campos separados por comas (es lo que significa csv: "comma-separated values").

In [30]:
! head -2 laliga2122.csv

Div,Date,Time,HomeTeam,AwayTeam,FTHG,FTAG,FTR,HTHG,HTAG,HTR,HS,AS,HST,AST,HF,AF,HC,AC,HY,AY,HR,AR,B365H,B365D,B365A,BWH,BWD,BWA,IWH,IWD,IWA,PSH,PSD,PSA,WHH,WHD,WHA,VCH,VCD,VCA,MaxH,MaxD,MaxA,AvgH,AvgD,AvgA,B365>2.5,B365<2.5,P>2.5,P<2.5,Max>2.5,Max<2.5,Avg>2.5,Avg<2.5,AHh,B365AHH,B365AHA,PAHH,PAHA,MaxAHH,MaxAHA,AvgAHH,AvgAHA,B365CH,B365CD,B365CA,BWCH,BWCD,BWCA,IWCH,IWCD,IWCA,PSCH,PSCD,PSCA,WHCH,WHCD,WHCA,VCCH,VCCD,VCCA,MaxCH,MaxCD,MaxCA,AvgCH,AvgCD,AvgCA,B365C>2.5,B365C<2.5,PC>2.5,PC<2.5,MaxC>2.5,MaxC<2.5,AvgC>2.5,AvgC<2.5,AHCh,B365CAHH,B365CAHA,PCAHH,PCAHA,MaxCAHH,MaxCAHA,AvgCAHH,AvgCAHA
SP1,13/08/2021,20:00,Valencia,Getafe,1,0,H,1,0,H,4,22,2,4,24,15,1,9,6,3,1,1,2.55,3,3.1,2.65,3,2.95,2.65,2.9,3.05,2.7,3.03,3.11,2.55,3,3,2.63,3,3,2.73,3.2,3.23,2.64,3.01,3.06,2.62,1.5,2.75,1.5,2.75,1.51,2.65,1.49,0,1.82,2.11,1.83,2.11,1.88,2.13,1.81,2.08,2.37,3,3.3,2.45,3,3.25,2.4,2.95,3.4,2.47,3.04,3.48,2.35,3,3.3,2.45,3,3.3,2.57,3.1,3.58,2.42,3,3.34,2.75,1.44,2.84,1.48,2.84,1.51,2.68,1.47,-0.25,2.06

Para entender el significado de cada campo, la Web tiene un fichero de [metadata](https://www.football-data.co.uk/notes.txt). En la imagen se muestran los campos relevantes para el proceso que queremos realizar.

Concretamente, los campos 4º y 5º contienen los nombres de los equipos local y visitante respectivamente y el campo 8º informa cuál de ellos obtuvo la victoria. Así 'H' significa que ganó el equipo local, 'A' que lo hizo el visitante y 'D' que empataron. Sabiendo que el equipo que gana obtiene 3 puntos, el que pierde 0 puntos y si empatan ambos equipos se llevan 1 punto, podemos calcular la clasificación final de la liga.

Este ejercicio lo vamos a resolver únicamente con mrjob. En este caso hemos tenido que hacer uso del método steps de mrjob que permite definir etapas. En este ejercicio es necesario ya que tenemos dos reductores, uno para calcular la suma de los puntos de un equipo y otro para calcular la clasificación. Esta es una descripción del proceso MapReduce:

La primera etapa comienza con el mapper al que hemos llamado mapper_points que lo que hace es procesar cada línea que corresponde a un partido y extraer los equipos que se enfrentan. Emite como clave el nombre del equipo y como valor los puntos que ha obtenido.
El combiner_points es un combinador. Es un proceso que hace una función parecida al reductor y que permite optimizar el funcionamiento ya que hace agregaciones parciales e intermedias antes de enviarlas al reductor.
El reducer_points recibe como clave cada equipo y como valor un iterador con los puntos que ha obtenido ese equipo. Emite como clave None y como valor una tupla que contiene el nombre del equipo y la suma de los puntos. Al emitir una clave None todos las tuplas emitidas serán procesadas en un único reductor en la próxima etapa. Es muy importante asegurar que el volumen de datos que reciba ese reductor sea pequeño.
La segunda etapa sólo consta de un reductor llamado reducer_classification. Este reductor ignora la clave ya que no contiene información útil y como valor recibe un iterador de tuplas equipo,puntos emitido por el reductor de la primera etapa, reducer_points. Lo que hace es emitir una clave nula con los equipos ordenados por puntos de mayor a menor.

Creamos el script en python con mapper, combiner y reducer.
Simplemente sumamos los puntos si el equipo es visitante. Luego los ordenamos.

In [46]:
%%writefile laligaMR.py
#!/usr/bin/python3

from mrjob.job import MRJob
from mrjob.step import MRStep
    
class LaLigaMR(MRJob):
        
    # Mapper: En esta etapa aún no hay clave (_), el valor lo recibimos en la variable line
    def mapper_points(self, _, line):
        #Por cada línea, esta se divide en los campos que forman las columnas
        _, _, _, home_team, away_team, _, _, result, *rest = line.split(',')
        
        # Si es la cabecera no emitimos nada
        if home_team == "HomeTeam":
            return
        
        if result == 'D':            
            yield away_team, 1
        elif result == 'H':
            yield away_team, 0
        else:
            yield away_team, 3
            
    def combiner_points(self, team, points):
        yield team, sum(points)
            
    def reducer_points(self, team, points):
        yield None, (team, sum(points))
        
    def reducer_classification(self, _, points):
        yield None, sorted(points, key=lambda t: t[1], reverse=True)
        
    def steps(self):
        return [
            MRStep(mapper=self.mapper_points,
                   combiner=self.combiner_points,
                   reducer=self.reducer_points),
            MRStep(reducer=self.reducer_classification)
        ]
         
if __name__=='__main__':
    LaLigaMR.run()


Overwriting laligaMR.py


In [47]:
! chmod ugo+x laligaMR.py

In [48]:
! python3 laligaMR.py -r hadoop laliga2122.csv

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /app/hadoop-3.3.1/bin...
Found hadoop binary: /app/hadoop-3.3.1/bin/hadoop
Using Hadoop version 3.3.1
Looking for Hadoop streaming jar in /app/hadoop-3.3.1...
Found Hadoop streaming jar: /app/hadoop-3.3.1/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar
Creating temp directory /tmp/laligaMR.root.20250107.153847.801214
uploading working dir files to hdfs:///user/root/tmp/mrjob/laligaMR.root.20250107.153847.801214/files/wd...
Copying other local files to hdfs:///user/root/tmp/mrjob/laligaMR.root.20250107.153847.801214/files/
Running step 1 of 2...
  packageJobJar: [/tmp/hadoop-unjar6735978285436285139/] [] /tmp/streamjob1510403203801731.jar tmpDir=null
  Connecting to ResourceManager at yarnmaster/172.20.0.2:8032
  Connecting to ResourceManager at yarnmaster/172.20.0.2:8032
  Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1736255952742_0

## 4.- Muestra la diferencia de goles entre el equipo que más goles ha marcado y el que menos goles ha marcado en la temporada 2021/2022 de La Liga.

Se espera que el proceso MapReuce produzca una salida similar a la siguiente:

![solución 4](./img/4.png)

Creamos el script. Sumamos todos los goles, tanto marcados como visitanto como jugando en casa.
Los ordenamos, hallamos la diferencia y mostramos.

In [35]:
%%writefile laligaMRDiff.py
#!/usr/bin/python3

from mrjob.job import MRJob
from mrjob.step import MRStep

class LaLigaMR(MRJob):
        
    # Mapper: Extraemos goles de cada equipo en casa y fuera
    def mapper(self, _, line):
        # Extraemos las columnas necesarias (asegurándonos de saltar las cabeceras)
        _, _, _, home_team, away_team, home_goals, away_goals, *rest = line.split(',')
        
        # Si la línea es la cabecera, no la procesamos
        if home_team == "HomeTeam":
            return

        # Emitimos los goles marcados por cada equipo
        yield home_team, int(home_goals)  # Goles de casa
        yield away_team, int(away_goals)  # Goles de visitante
        
    # Combiner: Suma de los goles de cada equipo (para reducir la cantidad de datos enviados al reducer)
    def combiner(self, team, goals):
        yield team, sum(goals)
    
    # Reducer: Sumamos los goles por equipo y encontramos los equipos con más y menos goles
    def reducer(self, team, goals):
        total_goals = sum(goals)
        yield None, (team, total_goals)

    # Reducer final para ordenar los resultados y calcular la diferencia de goles
    def reducer_final(self, _, teams_goals):
        sorted_teams = sorted(teams_goals, key=lambda t: t[1], reverse=True)
        
        # El primer equipo tiene la mayor cantidad de goles y el último la menor cantidad
        max_goals_team = sorted_teams[0]
        min_goals_team = sorted_teams[-1]
        
        # Calculamos la diferencia de goles
        diff_goals = max_goals_team[1] - min_goals_team[1]
        
        yield f"{max_goals_team[0]} vs {min_goals_team[0]}", f'diferencia de goles {diff_goals}'

    def steps(self):
        return [
            MRStep(mapper=self.mapper,
                   combiner=self.combiner,
                   reducer=self.reducer),
            MRStep(reducer=self.reducer_final)
        ]
         
if __name__=='__main__':
    LaLigaMR.run()


Writing laligaMRDiff.py


In [36]:
! chmod ugo+x laligaMRDiff.py

In [37]:
! python3 laligaMRDiff.py -r hadoop laliga2122.csv

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /app/hadoop-3.3.1/bin...
Found hadoop binary: /app/hadoop-3.3.1/bin/hadoop
Using Hadoop version 3.3.1
Looking for Hadoop streaming jar in /app/hadoop-3.3.1...
Found Hadoop streaming jar: /app/hadoop-3.3.1/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar
Creating temp directory /tmp/laligaMRDiff.root.20250107.151438.686876
uploading working dir files to hdfs:///user/root/tmp/mrjob/laligaMRDiff.root.20250107.151438.686876/files/wd...
Copying other local files to hdfs:///user/root/tmp/mrjob/laligaMRDiff.root.20250107.151438.686876/files/
Running step 1 of 2...
  packageJobJar: [/tmp/hadoop-unjar4790187191258124331/] [] /tmp/streamjob887234798919385259.jar tmpDir=null
  Connecting to ResourceManager at yarnmaster/172.20.0.2:8032
  Connecting to ResourceManager at yarnmaster/172.20.0.2:8032
  Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1

## 5.- Calcula la racha de los últimos cinco partidos de cada equipo en la clasificación final de La Liga en la temporada 2021/2022.

[Observa](https://www.google.com/search?q=clasificacion+liga+2021+2022&oq=clasificacion+liga+2021+2022#sie=lg) que las últimas columnas de la clasificación muestran cuál ha sido el resultado de los últimos 5 partidos de cada equipo.

![clasificacion](./img/clasificacion.png)

Se trata de que muestres la clasificación final junto con los resultados de los últimos 5 partidos. Este ejercicio es un poco más difícil y laborioso que los otros. Si usas `mrjob` probablemente te sea útil utilizar [ordenación secundaria por valor](https://mrjob.readthedocs.io/en/latest/job.html#secondary-sort), aunque también se puede resolver sin hacer uso de ella.

Se espera este resultado:

![solución 5](./img/5.png)

Este me ha costado bastante. He probado mil maneras y al final encontré una que funciona. No se me ordenaban los 5 ultimos resultados, me los ponia mezclados y más problemas que he tenido.
Hay algunas partes del código que tengo que estudiarlas más, ya que no las entiendo del todo.


In [38]:
%%writefile laligaMRLast5Matches.py
#!/usr/bin/python3

from mrjob.job import MRJob
from mrjob.step import MRStep
from datetime import datetime
    
class laligaMRLast5Matches(MRJob):

    # Mapper: En esta etapa aún no hay clave (_), el valor lo recibimos en la variable line
    def mapper_points(self, _, line):
        # Por cada línea, esta se divide en los campos que forman las columnas
        _, date, _, home_team, away_team, _, _, result, *rest = line.split(',')
        
        # Si es la cabecera no emitimos nada
        if home_team == "HomeTeam":
            return
        
        # Convertir la fecha para ordenar correctamente
        date = datetime.strptime(date, "%d/%m/%Y").strftime("%Y/%m/%d")
        
        # Emitimos la fecha y los puntos para cada equipo
        if result == 'D':            
            yield home_team, (date, 1)
            yield away_team, (date, 1)
        elif result == 'H':
            yield home_team, (date, 3)
            yield away_team, (date, 0)
        else:
            yield home_team, (date, 0)
            yield away_team, (date, 3)
            
    def combiner_points(self, team, values):
        # Combina los puntos y fechas por equipo
        combined_points = list(values)
        yield team, combined_points
            
    def reducer_points(self, team, values):
        # Aplanamos y ordenamos por fecha
        all_games = sorted([item for sublist in values for item in sublist], key=lambda x: x[0])

        # Obtenemos los puntos de los últimos 5 partidos
        last_five_points = [points for date, points in all_games[-5:]]
        last_five_points.reverse()
        # Calculamos el total de puntos
        total_points = sum(points for date, points in all_games)

        yield None, (team, total_points, last_five_points)
    
    def reducer_classification(self, _, points):
        # Ordenamos los equipos por los puntos totales
        yield None, sorted(points, key=lambda t: t[1], reverse=True)
            
    def steps(self):
        return [
            MRStep(mapper=self.mapper_points,
                   combiner=self.combiner_points,
                   reducer=self.reducer_points),
            MRStep(reducer=self.reducer_classification)
        ]
         
if __name__=='__main__':
    laligaMRLast5Matches.run()





Overwriting laligaMRLast5Matches.py


In [39]:
! chmod ugo+x laligaMRLast5Matches.py

In [40]:
! python3 laligaMRLast5Matches.py -r hadoop laliga2122.csv

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /app/hadoop-3.3.1/bin...
Found hadoop binary: /app/hadoop-3.3.1/bin/hadoop
Using Hadoop version 3.3.1
Looking for Hadoop streaming jar in /app/hadoop-3.3.1...
Found Hadoop streaming jar: /app/hadoop-3.3.1/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar
Creating temp directory /tmp/laligaMRLast5Matches.root.20250107.151517.162001
uploading working dir files to hdfs:///user/root/tmp/mrjob/laligaMRLast5Matches.root.20250107.151517.162001/files/wd...
Copying other local files to hdfs:///user/root/tmp/mrjob/laligaMRLast5Matches.root.20250107.151517.162001/files/
Running step 1 of 2...
  packageJobJar: [/tmp/hadoop-unjar5092812142297854453/] [] /tmp/streamjob8165514791368388460.jar tmpDir=null
  Connecting to ResourceManager at yarnmaster/172.20.0.2:8032
  Connecting to ResourceManager at yarnmaster/172.20.0.2:8032
  Disabling Erasure Coding for path: /tmp/hadoop-yarn/st