**Recuerde no agregar o quitar celdas en este notebook, ni modificar su tipo. Si lo hace, el sistema automaticamente lo calificará con cero punto cero (0.0)**

Obtenga las letras asociadas a cada clave para el siguiente archivo.

In [7]:
%%writefile input.txt
0 	 C,F,A,B,D,I,H
1 	 A,H,C,I,F,D,B
2 	 B,H,I,F,G
3 	 C,B,D
4 	 D,C,I,G,H
5 	 B,D,C,H,A
6 	 H,D,C,B,G,F,D
7 	 F,A,G,C,B,D,H,I
8 	 F,A,I,B,D
9 	 F,A,B,D,C,D,G,I
10 	 D,B,A,C,H
11 	 G,D,B,H,I,C,F
12 	 D,D,C,F,B,A,H,G
13 	 F,A,D
14 	 D,A,C,G
15 	 H,A,F,D,B,C,G,I
16 	 A,I,B,D
17 	 C,B,G,A,D,H,F
18 	 I,B,A,H,D,F
19 	 B,D,F,D,I
20 	 C,B,H,F,I,G,D,D
21 	 F,A,B,C,G,D
22 	 I,G,F,C,H,B
23 	 H,F,C,B,D,D,A
24 	 F,D,G,A,H
25 	 G,H,B,C,A,F,I
26 	 G,F,B,A,H,D,D,I
27 	 B,A,H,I,D,G,F
28 	 A,H,D,F,C
29 	 C,D,A,F,G,B,H,D

Writing input.txt


## Mapper

In [8]:
%%writefile mapper.py
#! /usr/bin/env python

##
## Esta es la función que mapea la entrada a parejas (clave, valor)
##
import sys


##
## Se usa una clase iterable para implementar el mapper.
##

class Mapper:
    
    def __init__(self, stream):
        ## 
        ## almacena el flujo de entrada como una
        ## variable del objeto
        ##
        self.stream = stream
    
    def emit(self, key, value):
        ##
        ## escribe al flujo estándar de salida
        ##
        sys.stdout.write("{}\t{}\n".format(key, value))
        
        
    def status(self, message):
        ##
        ## imprime un reporte en el flujo de error
        ## no se debe usar el stdout, ya que en este 
        ## unicamente deben ir las parejas (key, value)
        ##
        sys.stderr.write('reporter:status:{}\n'.format(message))

        
    def counter(self, counter, amount=1, group="ApplicationCounter"):
        ## 
        ## imprime el valor del contador
        ##
        sys.stderr.write('reporter:counter:{},{},{}\n'.format(group, counter, amount))
        
    def map(self):
        import re
        word_counter = 0
        ##
        ## imprime un mensaje a la entrada
        ##
        self.status('Iniciando procesamiento ')
            
        for word in self:
            ##
            ## cuenta la cantidad de palabras procesadas
            ##
            word_counter += 1
            try:
                valor = int(word)
            except ValueError:
                list_letras = word.split(',')
                for letra in list_letras:
                    self.emit(key=letra, value= valor)
                

            ##
            ## por cada palabra del flujo de datos
            ## emite la pareja (word, 1)
            ##
            

        ##
        ## imprime un mensaje a la salida
        ##
        self.counter('num_words', amount=word_counter)
        self.status('Finalizadno procesamiento ')

 
            
            
    def __iter__(self):
        ##
        ## itera sobre cada linea de código recibida
        ## a través del flujo de entrada
        ##
        for line in self.stream:
            ##
            ## itera sobre cada palabra de la línea
            ## (en los ciclos for, retorna las palabras
            ## una a una)
            ##
            for word in line.split():
                ##
                ## retorna la palabra siguiente en el
                ## ciclo for
                ##
                yield word
    

if __name__ == "__main__": 
    ##
    ## inicializa el objeto con el flujo de entrada
    ##
    mapper = Mapper(sys.stdin)
    
    ##
    ## ejecuta el mapper
    ##
    mapper.map()

Writing mapper.py


## Reducer

In [9]:
%%writefile reducer.py
#!/usr/bin/env python

import sys
import itertools

class Reducer:
    
    def __init__(self, stream):
        self.stream = stream
        
    def emit(self, key, value):
        sys.stdout.write("{}\t{}\n".format(key, value)) 

    def reduce(self):
        ##
        ## Esta función reduce los elementos que 
        ## tienen la misma clave
        ##
        for key, group in itertools.groupby(self, lambda x: x[0]):
            list_keys = []
            is_first_num = True
            for _, val in group:
                if is_first_num is True:
                    is_first_num = False
                    list_keys.append(val)
                else:
                    if set([val]).intersection(list_keys):
                        pass
                    else:
                        list_keys.append(val)
                list_keys.sort()
                tuples = tuple(list_keys)
            
            self.emit(key=key, value=','.join(map(str, tuples)))
                
                
                
                        
                

    def __iter__(self):
        
        for line in self.stream:
            ##
            ## Lee el stream de datos y lo parte 
            ## en (clave, valor)
            ##
            key, val = line.split("\t") 
            val = int(val)
            ##
            ## retorna la tupla (clave, valor)
            ## como el siguiente elemento del ciclo for
            ##
            yield (key, val)


if __name__ == '__main__': 
  
    reducer = Reducer(sys.stdin)
    reducer.reduce()

Overwriting reducer.py


## Ejecución

In [10]:
%%bash
rm -rf output
STREAM=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar
chmod +x mapper.py
chmod +x reducer.py
hadoop jar $STREAM -input input.txt -output output  -mapper mapper.py -reducer reducer.py
cat output/part-00000

A	0,1,5,7,8,9,10,12,13,14,15,16,17,18,21,23,24,25,26,27,28,29
B	0,1,2,3,5,6,7,8,9,10,11,12,15,16,17,18,19,20,21,22,23,25,26,27,29
C	0,1,3,4,5,6,7,9,10,11,12,14,15,17,20,21,22,23,25,28,29
D	0,1,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,23,24,26,27,28,29
F	0,1,2,6,7,8,9,11,12,13,15,17,18,19,20,21,22,23,24,25,26,27,28,29
G	2,4,6,7,9,11,12,14,15,17,20,21,22,24,25,26,27,29
H	0,1,2,4,5,6,7,10,11,12,15,17,18,20,22,23,24,25,26,27,28,29
I	0,1,2,4,7,8,9,11,15,16,18,19,20,22,25,26,27


In [11]:
!rm -rf mapper.py reducer.py output input.txt

---

Para realizar la evaluación automática de este libro:

* Abra un Terminal.
* Asegurece que esat en la misma carpeta que contiene este notebook.
* Salve el notebook.
* Ejecute el siguiente comando:

      ./gradetool 11-Taller.ipynb

---