## Un flujo de tareas para procesar vectores

El código siguiente simplemente suma los elementos de los vectores que tiene en la entrada. Fíjate que ahora la entrada está distribuida en varios directorios y ficheros. Además, la salida la ponemos en un directorio concreto. Observa además que estamos usando el formato JSON en todas las etapas.

In [59]:
%%writefile files/vectsum.py
from mrjob.job import MRJob, JSONProtocol
import json

class Vectsum(MRJob):
    OUTPUT_PROTOCOL   = JSONProtocol
    INPUT_PROTOCOL    = JSONProtocol
    INTERNAL_PROTOCOL = JSONProtocol

    def mapper(self, id, line):
        index, val = line
        yield id, val
        
    def reducer(self, key, values):
        yield key, sum(values)
        
if __name__ == '__main__':
    Vectsum.run()

Overwriting files/vectsum.py


In [60]:
%%script /opt/anaconda/bin/python files/vectsum.py -q --output-dir data/vectors/R data/vectors/V1 data/vectors/V2
--

"v1"	49
"v2"	29


Ahora calculamos el promedio de las sumas de los elementos de todos los vectores. Para ello, usamos la salida de la tarea anterior.

In [67]:
%%writefile files/avg.py
from mrjob.job import MRJob, JSONProtocol
import json
import numpy as np

class Avg(MRJob):
    OUTPUT_PROTOCOL   = JSONProtocol
    INPUT_PROTOCOL    = JSONProtocol
    INTERNAL_PROTOCOL = JSONProtocol

    def mapper(self, id, val):
        yield "mean", val
        
    def reducer(self, key, values):
        yield key, np.mean([i for i in values])

if __name__ == '__main__':
    Avg.run()

Overwriting files/avg.py


In [68]:
%%script /opt/anaconda/bin/python files/avg.py -q --output-dir data/vectors/mean data/vectors/R 
--

"mean"	39.0


Finalmente, dividimos todos los elementos de los vectores iniciales por la media obtenida. Fíjate en dos detalles:
- Cómo pasamos opciones desde la línea de comando a la tarea map reduce
- Cómo usamos el comando `awk` de Unix para extraer la información del fichero de salida anterior.

In [63]:
%%writefile files/norm.py
from mrjob.job import MRJob, JSONProtocol
import json
import numpy as np

class Norm(MRJob):
    OUTPUT_PROTOCOL   = JSONProtocol
    INPUT_PROTOCOL    = JSONProtocol
    INTERNAL_PROTOCOL = JSONProtocol

    def configure_options(self):
        super(Norm, self).configure_options()
        self.add_file_option('--total')
        
    def mapper(self, id, line):
        index, val = line
        yield id, [index, val*1./float(self.options.total)]
        
    def reducer(self, key, values):
        for i in values:
            yield key, i
        
if __name__ == '__main__':
    Norm.run()

Overwriting files/norm.py


In [66]:
!/opt/anaconda/bin/python files/norm.py -q --total `awk '{print $2;}' data/vectors/mean/*` data/vectors/V1 data/vectors/V2

"v1"	[0, 0.10256410256410256]
"v1"	[10, 0.05128205128205128]
"v1"	[2, 0.15384615384615385]
"v1"	[4, 0.20512820512820512]
"v1"	[5, 0.23076923076923078]
"v1"	[6, 0.48717948717948717]
"v1"	[7, 0.02564102564102564]
"v2"	[0, 0.10256410256410256]
"v2"	[10, 0.02564102564102564]
"v2"	[2, 0.15384615384615385]
"v2"	[3, 0.3076923076923077]
"v2"	[4, 0.05128205128205128]
"v2"	[5, 0.05128205128205128]
"v2"	[7, 0.05128205128205128]
