# Práctica 1: Irene J. Ventura Farias

##  Ejercicio1.3: Mejorando el país con mejores clientes.

Devuelve el numero de clientes valorados con "bueno" por país.

### Diseño MapReduce
#### ¿Cuántos pasos MapReduce son necesarios?
Para la solución de este problema  se van a realizar tres pasos: 
- Paso 1: un mapper y un reduce (Reduce_Count_Bueno).
- Paso 2: un reducer (Reducer_Max_Country)
#### ¿Qué hace cada función de cada paso?
- _Mapper_: extrae los datos más relevantes de la información que recibimos
    - Caso Paises: se obtiene el nombre completo del país.
    - Caso Clientes: solo considera los clientes que califican 'bueno' a un país, contabilizando al cliente como 1 voto.
- _Reducer_Count_Bueno: Obtiene el nombre completo y contabiliza el total de votos de cada país
- _Reducer_Max_Country: Obtiene el máximo país con mayor votos
#### ¿Qué datos se pasan de una función a la siguiente?
- _Mapper_: 
    - Caso Paises: Sigla_País, [Símbolo='A', Nombre Completo del País]
    - Caso Clientes: Sigla_País, [Símbolo='B', 1]
- _Reducer_Count_Bueno: None , [contador_clientes, [Nombre Completo del País]]
- _Reducer_Max_Country:  [contador_clientes, [Nombre Completo del País]] siendo el contador como el máximo general

In [1]:
! mkdir -p mrjob/join

In [2]:
import os
os.chdir("/media/notebooks/mrjob/join")

In [3]:
! pwd

/media/notebooks/mrjob/join


## Los ficheros countries.csv y clients.csv deben estar descargados en la carpeta /media/notebooks/mrjob/join

In [4]:
cat countries.csv

Name,Code
Afghanistan,AF
Åland Islands,AX
Albania,AL
Algeria,DZ
American Samoa,AS
Andorra,AD
Angola,AO
Anguilla,AI
Antarctica,AQ
Antigua and Barbuda,AG
Argentina,AR
Armenia,AM
Aruba,AW
Australia,AU
Austria,AT
Azerbaijan,AZ
Bahamas,BS
Bahrain,BH
Bangladesh,BD
Barbados,BB
Belarus,BY
Belgium,BE
Belize,BZ
Benin,BJ
Bermuda,BM
Bhutan,BT
"Bolivia, Plurinational State of",BO
"Bonaire, Sint Eustatius and Saba",BQ
Bosnia and Herzegovina,BA
Botswana,BW
Bouvet Island,BV
Brazil,BR
British Indian Ocean Territory,IO
Brunei Darussalam,BN
Bulgaria,BG
Burkina Faso,BF
Burundi,BI
Cambodia,KH
Cameroon,CM
Canada,CA
Cape Verde,CV
Cayman Islands,KY
Central African Republic,CF
Chad,TD
Chile,CL
China,CN
Christmas Island,CX
Cocos (Keeling) Islands,CC
Colombia,CO
Comoros,KM
Congo,CG
"Congo, the Democratic Republic of the",CD
Cook Islands,CK
Costa Rica,CR
Côte d'Ivoire,CI
Croatia,HR
Cuba,CU
Curaçao,CW
Cyprus,CY
Czech Republic,CZ
Denmark,DK
Djibouti,DJ
Dominica,DM
Dominican Republic,DO
Ecuador,EC
Egypt,EG
El Salvad

In [5]:
cat clients.csv

Bertram Pearcy  ,bueno,SO
Steven Ulman  ,regular,ZA
Enid Follansbee  ,malo,GS
Candie Jacko  ,malo,SS
Alana Zufelt  ,regular,ES
Craig Pinkett  ,malo,LK
Carson Levey  ,bueno,GU
Reanna Calabrese  ,regular,GT
Elliott Kosak  ,malo,GG
Yuette Steinman  ,bueno,GN
Grisel Wines  ,regular,GW
Kathryne Dieguez  ,regular,AE
Donna Raabe  ,malo,GB
Norine Mundt  ,bueno,US
Brittaney Amaro  ,bueno,ES
Penni Husted  ,bueno,ES
Delmer Semon  ,malo,IT
Lennie Dunkerson  ,bueno,CA
Mayra Bobb  ,regular,IT
Altagracia Merced  ,regular,CA
Verda Belgrave  ,malo,GB
Jonnie Urban  ,malo,US
Chung Frankum  ,malo,ES
Vincenzo Samples  ,regular,TT
Dominick Barkan  ,bueno,GU
Carisa Ellingwood  ,bueno,TR
Garret Wess  ,regular,TM
Zoraida Muise  ,bueno,GU
Samantha Cusson  ,bueno,PT
Jenine Greenburg  ,regular,PR
Geri Paddock  ,bueno,QA
Antonia Klosterman  ,regular,RE
Moriah Galey  ,malo,RO
Nyla Eckard  ,malo,GB
Arlean Harries  ,malo,US
Kenyatta Lippold  ,malo,ES
Samuel Knipe  ,malo,MV
Jamison Starner  ,malo,ML
Joel Blye  ,regula

In [6]:
%%writefile mrjob-ejercicio.py
import sys, os, re
from mrjob.job import MRJob
from mrjob.step import MRStep

class MRJoin(MRJob):

  # Realiza la ordenacion secundaria
    SORT_VALUES = True

    """El mapper se encarga de extraer la siglas del pais como clave. Como valor para el caso de los paises
    nos quedamos con su nombre completo y en cuanto a los clientes filtramos los que hayan calificado
    como 'bueno' a un paísy asignamos 1 como contador"""
    def mapper(self, _, line):
        splits = line.rstrip("\n").split(",")

        if len(splits) == 2: # datos de paises
            symbol = 'A' # ordenamos los paises antes que los datos de personas
            country2digit = splits[1]
            yield country2digit, [symbol, splits[0]]
                
        else: #  datos de personas
            if(splits[1]=='bueno'):
                symbol = 'B'
                country2digit = splits[2]
                yield country2digit, [symbol, 1] #contabiliza como 1 voto
                
    
    """El reducer obtiene una lista de clave (siglas del país) y el valor es una lista de listas que contiene
    el nombre completo del país (A) y cuantas veces han votado como bueno (B)"""
    def reducer_count_bueno(self, key, values):
        
        for value in values:
            if value[0] == 'A': #dato del país
                count = 0 # iniciamos el contador
                country_name = value[1] #asigamos el nombre completo
                
            if value[0] == 'B':
                count +=1
                
        # Comprobamos que el pais tenga al menos un  voto. Devolvemos el numero de votos y el nombre completo del país
        if count >0:
            yield None,[count, [country_name]] 
            
    """Buscamos todos los paises con mayor votos calificados como 'bueno' """        
    def reducer_max_countries(self, key, values):
        max_country = 0
        
        for c in reversed(list(values)): #recorre la lista de mayor a menor
            # si el actual es mayor o igual al ultimo mayor se actualiza
            if(c[0]>=max_country):
                max_country = c[0]
                yield c
            
            else: break
        
    """Usamos el método step para indicar el orden de ejecución del mapper y los dos reducers"""    
    def steps(self):
        return [MRStep(mapper=self.mapper,                
                       reducer=self.reducer_count_bueno),
                MRStep(reducer=self.reducer_max_countries)]
            
                

if __name__ == '__main__':
    MRJoin.run()

Overwriting mrjob-ejercicio.py


## Ejecutamos primero en local

In [7]:
! python3 mrjob-ejercicio.py /media/notebooks/mrjob/join/countries.csv  \
/media/notebooks/mrjob/join/clients.csv > ouput3local

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/mrjob-ejercicio.root.20211114.122930.551166
Running step 1 of 2...
Running step 2 of 2...
job output is in /tmp/mrjob-ejercicio.root.20211114.122930.551166/output
Streaming final output from /tmp/mrjob-ejercicio.root.20211114.122930.551166/output...
Removing temp directory /tmp/mrjob-ejercicio.root.20211114.122930.551166...


In [8]:
#Comprobamos que la salida coincida con lo solicitado en el ejercicio
! tail ouput3local

3	["Spain"]
3	["Guam"]


## Ejecutamos en el Clúster

In [10]:
! hdfs dfs -mkdir /tmp/mrjoin #Creamos la ruta temporal para los ejercicios de MapReduce
#Asignamos los fichero a la ruta temp/mrjoin en Hadoop
! hdfs dfs -put /media/notebooks/mrjob/join/countries.csv  /tmp/mrjoin
! hdfs dfs -put /media/notebooks/mrjob/join/clients.csv  /tmp/mrjoin

mkdir: `/tmp/mrjoin': File exists
put: `/tmp/mrjoin/countries.csv': File exists
put: `/tmp/mrjoin/clients.csv': File exists


In [11]:
! hdfs dfs -ls  /tmp/mrjoin #Comprobamos que se haya copiado correctamente

Found 2 items
-rw-r--r--   3 root supergroup       1289 2021-11-14 12:55 /tmp/mrjoin/clients.csv
-rw-r--r--   3 root supergroup       4120 2021-11-14 12:55 /tmp/mrjoin/countries.csv


In [12]:
#Reseteamos los ficheros y el directorio de salida
! hdfs dfs -rm /tmp/carpeta/mrjob-join-output/*
! hdfs dfs -rmdir /tmp/carpeta/mrjob-join-output

Deleted /tmp/carpeta/mrjob-join-output/_SUCCESS
Deleted /tmp/carpeta/mrjob-join-output/part-00000


In [13]:
! python3 mrjob-ejercicio.py hdfs:///tmp/mrjoin/* -r hadoop --output-dir hdfs:///tmp/carpeta/mrjob-join-output

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /app/hadoop-3.3.1/bin...
Found hadoop binary: /app/hadoop-3.3.1/bin/hadoop
Using Hadoop version 3.3.1
Looking for Hadoop streaming jar in /app/hadoop-3.3.1...
Found Hadoop streaming jar: /app/hadoop-3.3.1/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar
Creating temp directory /tmp/mrjob-ejercicio.root.20211114.123240.488604
uploading working dir files to hdfs:///user/root/tmp/mrjob/mrjob-ejercicio.root.20211114.123240.488604/files/wd...
Copying other local files to hdfs:///user/root/tmp/mrjob/mrjob-ejercicio.root.20211114.123240.488604/files/
Running step 1 of 2...
  packageJobJar: [/tmp/hadoop-unjar592259833676980320/] [] /tmp/streamjob3771564844690051844.jar tmpDir=null
  Connecting to ResourceManager at yarnmaster/172.18.0.4:8032
  Connecting to ResourceManager at yarnmaster/172.18.0.4:8032
  Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.stag

In [14]:
#Comprobamos si se ha procesado correctamente 
! hdfs dfs -ls  /tmp/carpeta/mrjob-join-output

Found 2 items
-rw-r--r--   3 root supergroup          0 2021-11-14 13:40 /tmp/carpeta/mrjob-join-output/_SUCCESS
-rw-r--r--   3 root supergroup         23 2021-11-14 13:40 /tmp/carpeta/mrjob-join-output/part-00000


In [15]:
#Mostramos el contenido del fichero de salida
! hdfs dfs -tail /tmp/carpeta/mrjob-join-output/part-00000

3	["Spain"]
3	["Guam"]
