**Recuerde no agregar o quitar celdas en este notebook, ni modificar su tipo. Si lo hace, el sistema automaticamente lo calificará con cero punto cero (0.0)**

Ordene el archivo por letra y valor (3ra columna).

In [None]:
%%writefile input.txt
B   1999-08-28   14
E   1999-12-06   12
E   1993-07-21   9
C   1991-02-12   13
E   1995-04-25   2
A   1992-08-22   14
B   1999-06-11   121
E   1993-01-27   9
E   1999-09-10   11
E   1990-05-03   16
E   1994-02-14   10
A   1988-04-27   121
A   1990-10-06   10
E   1985-02-12   16
E   1998-09-14   2
B   1994-08-30   17
A   1997-12-15   13
B   1995-08-23   1
B   1998-11-22   131
B   1997-04-09   14
E   1993-12-27   18
E   1999-01-14   15
A   1992-09-19   8
B   1993-03-02   14
B   1999-10-21   13
A   1990-08-31   12
C   1994-01-25   10
E   1990-02-09   18
A   1990-09-26   8
A   1993-05-08   16
B   1995-09-06   14
E   1991-02-18   141
A   1993-01-11   14
A   1990-07-22   0
C   1994-09-09   15
C   1994-07-27   104
D   1990-10-10   15
A   1990-09-05   11
B   1991-10-01   9
A   1994-10-25   13

## Mapper

In [None]:
%%writefile mapper.py
#! /usr/bin/env python

##
## Esta es la función que mapea la entrada a parejas (clave, valor)
##
import sys
import re


##
## Se usa una clase iterable para implementar el mapper.
##

class Mapper:
    
    def __init__(self, stream):
        ## 
        ## almacena el flujo de entrada como una
        ## variable del objeto
        ##
        self.stream = stream
    
    def emit(self, key, value):
        ##
        ## escribe al flujo estándar de salida
        ##
        sys.stdout.write("{}\t{}\n".format(key, value))
            
    def status(self, message):
        ##
        ## imprime un reporte en el flujo de error
        ## no se debe usar el stdout, ya que en este 
        ## unicamente deben ir las parejas (key, value)
        ##
        sys.stderr.write('reporter:status:{}\n'.format(message))
        
    def map(self):

        word_counter = 0
        
        for word in self:
            
            ##
            ## imprime un mensaje indicando la palabra procesada
            ##
            #self.status('procesando ' + word[1])
            
            ##
            ## cuenta la cantidad de palabras procesadas
            ##
            ##
            ## por cada palabra del flujo de datos
            ## emite la pareja (word, 1)
            ##
            self.emit(key=word[0], value=word[1])
            
    def __iter__(self):
        ##
        ## itera sobre cada linea de código recibida
        ## a través del flujo de entrada
        ##
        for line in self.stream:
            ##
            ## itera sobre cada palabra de la línea
            ## (en los ciclos for, retorna las palabras
            ## una a una)
            ##
            #yield line[0]
            #yield line
            groups = re.search("([A-Z]{1}).*[0-9]{2}   ([0-9]*)$",line)
            #groups = re.search("(.{2}\..{1})$",line)
            letra = groups.group(1)
            #letra = line[0]
            valor1 = groups.group(2)
            #valor2 = groups.group(2)
            vector =  [letra,valor1]
            #print(vector)
            yield vector
            #for word in line.split():
                
                #fecha = word[1]
                #yield fecha
                #for content in fecha.split('-'):
                    #mes=content[1]
                    #yield mes
                ##
                ## retorna la palabra siguiente en el
                ## ciclo for
                ##
                #yield word
    

if __name__ == "__main__": 
    ##
    ## inicializa el objeto con el flujo de entrada
    ##
    mapper = Mapper(sys.stdin)
    
    ##
    ## ejecuta el mapper
    ##
    mapper.map()

## Reducer

In [None]:
%%writefile reducer.py
#!/usr/bin/env python

import sys
import itertools
from operator import itemgetter

class Reducer:
    
    def __init__(self, stream):
        self.stream = stream
        
    def emit(self, key, value):
        sys.stdout.write("{}\t{}\n".format(key, value)) 

    def reduce(self):
        ##
        ## Esta función reduce los elementos que 
        ## tienen la misma clave
        ##     
        for key, group in itertools.groupby( sorted(self, key=itemgetter(0,1), reverse=False), lambda x: x):
        
            for _, val in group:
                valor=int(val)
                
            #claves.append(key)
            #valores.append(valor)
            self.emit(key=_, value=valor)

    def __iter__(self):
        
        for line in self.stream:
            ##
            ## Lee el stream de datos y lo parte 
            ## en (clave, valor)
            ##
            key, val = line.split("\t") 
            #array = [float(val[0]),float(val[1])]
            out = int(val)
            ##
            ## retorna la tupla (clave, valor)
            ## como el siguiente elemento del ciclo for
            ##
            yield (key, out)


if __name__ == '__main__': 
  
    reducer = Reducer(sys.stdin)
    reducer.reduce()

## Ejecución

In [None]:
%%bash
rm -rf output
STREAM=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar
chmod +x mapper.py
chmod +x reducer.py
hadoop jar $STREAM -input input.txt -output output  -mapper mapper.py -reducer reducer.py
cat output/part-00000

In [None]:
!rm -rf mapper.py reducer.py output input.txt

---

Para realizar la evaluación automática de este libro:

* Abra un Terminal.
* Asegurece que esat en la misma carpeta que contiene este notebook.
* Salve el notebook.
* Ejecute el siguiente comando:

      ./gradetool 08-Taller.ipynb

---