
# **Contar palabras con Map Reduce**

El siguiente código implementa un ejemplo de contar palabras utilizando Map Reduce, una librería de multiprocesamiento para Python, Google Drive, y un conjunto de datos de la librería pública de Seattle.

Este código fue originalmente publicado en https://github.com/ChemaSarmiento/Big_Data_UP

El conjunto de datos utilizado está disponible en https://data.seattle.gov/Community/Checkouts-by-Title/tmmm-ytt6

In [None]:
'''
Primeramente importamos las librerías necesarias para poder ejecutar el código
'''

# Paralelismo, concurrencia y aprovechar al maximo todos los procesadores en 
#   una maquina
import multiprocessing as mp
# Usar funcionalidades dependientes del sistema operativo
import os
# Acceso aleatoria a lineas de texto; permite obtener cualquier línea de un 
#   archivo fuente Python
import linecache
# Permite que al llamar a un objeto este se comporte como funcion
from functools import partial

In [None]:
'''
Antes de comenzar, revisamos el número de procesadores a los que tenemos acceso
'''
print("No. de Procesadores: {}".format(mp.cpu_count()))

No. de Procesadores: 2


**Primeramente hay que hacer que Google Colab pueda acceder a Google Drive**

Para ello, hay que seguir los siguiente pasos:

1. Hacer clic en la parte izquierda de la interfaz de Google Colab, en el símbolo de carpeta para desplegar el menú de Archivos

2. Hacer clic en el icono de la carpeta con el símbolo de Google Drive

3. Permitir a Google Colab acceder a Google Drive

4. Verificar que tenemos acceso a Google Drive al corroborar que podemos visualizar nuestros archivos





**Recomendable**

Se recomienda generar una carpeta dedicada para la práctica

Pasos a seguir:

1. Ubicar la carpeta denominada MyDrive

2. Colocando el cursor sobre la carpeta, hacer clic sobre los tres puntos que se visualizan en la parte derecha

3. Dar clic sobre nueva carpeta

4. Dar nombre a la nueva carpeta, tal como "BigDataExercices"

5. En esta carpeta, almacenaremos el conjunto de datos que hemos descargado de Seattle

In [None]:
'''
Ahora, tenemos que hacer disponible el conjunto de datos con los que vamos a 
trabajar.

Para ello, copiaremos el conjunto de datos desde donde los tenemos almacenados
a un repositorio temporal para que Google Colab los pueda acceder

Si no pudimos descargar los datos desde la página principal, podemos 
clonar/copiar el conjunto de datos desde el siguiente enlace:

https://drive.google.com/drive/folders/1JKe-FxNJqM2RabYRvm8p6Q_Vi7geBBuT?usp=share_link

'''

# este comando nos ayuda a copiar el archivo a la máquina local de Colab
#   Importante, necesitamos iniciar el comando con el signo de exclamacion y 
#     finalizarlo con un espacio y el punto final

!cp /content/drive/MyDrive/BigDataExercices/checkouts_seattle.zip .

In [None]:
'''
Si copiamos/clonamos los datos desde el enlace de Google Drive, entonces tenemos
que descomprimir el archivo para obtener el CSV
'''

!unzip /content/checkouts_seattle.zip

Archive:  /content/checkouts_seattle.zip
  inflating: checkouts-by-title.csv  
  inflating: __MACOSX/._checkouts-by-title.csv  


In [None]:
'''
Verificamos que el conjunto de datos se encuentre en la maquina local de Colab
'''

# the ls command shows a list of all the files in a directory
# -l is an option to show the list in large format
# -h is an option to show the sizes of the files in 'human readable' format
!ls -lh 

total 12G
-rw-r--r-- 1 root root 8.9G Apr 23 19:31 checkouts-by-title.csv
-rw------- 1 root root 2.8G Apr 29 02:21 checkouts_seattle.zip
drwx------ 7 root root 4.0K Apr 29 02:16 drive
drwxr-xr-x 2 root root 4.0K Apr 29 02:26 __MACOSX
drwxr-xr-x 1 root root 4.0K Apr 27 13:35 sample_data


In [None]:
# echamos un vistazo a la parte final del conjunto de datos
!tail checkouts-by-title.csv

Digital,OverDrive,AUDIOBOOK,2016,3,1,"Malice Domestic, Volume 4: An Anthology of Original Traditional Mystery Stories (unabridged)",,Carolyn G. Hart,"Fiction, Mystery, Short Stories","Phoenix Books, Inc.",2009
Physical,Horizon,BOOK,2016,3,2,Minifred goes to school / Mordicai Gerstein.,,"Gerstein, Mordicai","Cats Juvenile fiction, Animals Infancy Juvenile fiction, Obedience Juvenile fiction, Schools Fiction","HarperCollins,",2009.
Physical,Horizon,VIDEODISC,2016,3,19,"Alléluia [videorecording] / Doppelganger Releasing ; produit par Panique, Radar Films, Savage Film ; coproduit par Versus Production, One-Eyed ; scenario, Fabrice Du Welz, Vincent Tavier ; adaptation & dialogues, Romain Protat ; un film de Fabrice Du Welz.",,,"Murder Drama, Womanizers Drama, Seduction Drama, Man woman relationships Drama, Thrillers Motion pictures, Feature films, Fiction films, Horror films, Motion pictures French","Doppelganger Releasing,",[2015]
Physical,Horizon,BOOK,2016,3,1,No-fear home improvement.,,,

In [None]:
# generamos un ssubconjunto de datos con el primer 1 millon de datos
!head -n 1000000 checkouts-by-title.csv > checkouts1M.csv

In [None]:
%%time 

'''
Verificamos el tamaño total del conjunto de datos original
'''

# wc <- word count, para hacer conteo en general de caracteres
# -l hace que cuente las líneas
!wc -l checkouts-by-title.csv

42474213 checkouts-by-title.csv
CPU times: user 215 ms, sys: 38.9 ms, total: 254 ms
Wall time: 40.1 s


In [None]:
%%time

'''
Verificamos el tamaño total del sub-conjunto de datos
'''

!wc -l checkouts1M.csv

1000000 checkouts1M.csv
CPU times: user 11.7 ms, sys: 0 ns, total: 11.7 ms
Wall time: 1.11 s


In [None]:
'''
Empezaremos a trabajar con el sub-conjunto de datos generado
'''

# asignamos una variable que contenga nuestro sub-conjunto de datos
filename = "checkouts1M.csv"

# asignamos una variable para el numero de lineas que queremos procesar
lines = 1000000

## **Codificacion en forma Secuencial**

In [None]:
%%time
wordCount = {}
counter = 0

for i in range(1,lines):
  ln = linecache.getline(filename, i)
  try:    
    words = ln.split(',')[6].split(" ")  
    for w in words:
      if w.upper() not in wordCount.keys():
        wordCount[w.upper()] = 0
      wordCount[w.upper()] += 1
  except Exception as e:
    print("null title {} ".format(str(e)))      
  counter += 1
  if counter % 1000000 == 0:
    print("From {start} in line {counter}".format(start=0, counter=counter))

CPU times: user 6.99 s, sys: 539 ms, total: 7.53 s
Wall time: 7.59 s


# **Codificacion en Paralelo**

In [None]:
'''
Primero definimos los parametros requeridos
'''

# se define el numero de threads a ocupar, puede ser por el numero disponible
#   dictado por la maquina o se pueden definir manualmente
threads = mp.cpu_count()
threads = 4
print('No. de threads: {}'.format(threads))

# definimos el numero de partes en que dividiremos el sub-conjunto
split_size = lines // threads if lines // threads > 0 else 1
print('Tamaño de datos por cada parte: {}'.format(split_size))

# obtenemos las partes con el tamaño requerido para ser procesados de manera 
#   independiente por cada procesador
#   lines dicta donde va a iniciar cada parte del sub-conjunto de datos
parts = [x for x in range(0, lines, split_size)]
print('Partes: {}'.format(parts))

No. de threads: 4
Tamaño de datos por cada parte: 250000
Partes: [0, 250000, 500000, 750000]


In [None]:
'''
Definimos la funcion que nos permite procesar el conteo de palabras
'''
def countWordsInTitles(filename, chunksize, start):
  wordCount = {}
  counter = 0
  print('Comenzando desde: {}'.format(start))
  for current_line in range(start, (start+chunksize)+1):
    ln = linecache.getline(filename, current_line)
    try:
      words = ln.split(",")[6].split(" ")
      #print('Palabras encontradas en la linea: {}'.format(words))
      for w in words:
        if w.upper() not in wordCount.keys():
          wordCount[w.upper()] = 0
        wordCount[w.upper()] += 1
    except Exception as e:
      print('Lista vacia: {} '.format(str(e)))      
    counter += 1
    if counter % 1000000 == 0:
      print('Desde {} en la linea {}'.format(start, counter))
  return wordCount

In [None]:
%%time

'''
En esta parte definimos el Map
'''

# distribuimos los datos de entrada a traves de procesos
pool = mp.Pool(processes=threads)

# asignamos nuestra funcion como recurrente para su procesado que tomara el
#   sub-conjunto de datos y el numero en que debe iniciar cada parte
func = partial(countWordsInTitles, filename, split_size)

# aplicamos la funcion para hacer el conteo de manera recurrente
#   aqui obtendremos los diccionarios con cada uno de los conteos de las 
#   palabras por cada procesador
res = pool.map(func, parts)

# liberamos recursos
pool.close()

# esperamos a la ejecucion de todos los procesos
pool.join()

Comenzando desde: 250000Comenzando desde: 0Comenzando desde: 500000
Comenzando desde: 750000

Lista vacia: list index out of range 

CPU times: user 224 ms, sys: 82 ms, total: 306 ms
Wall time: 5.95 s


**¿Hubo mejora?**


In [None]:
print('Total del resultados del Mapper: {}'.format(len(res)))

Total del resultados del Mapper: 4


In [None]:
# revisamos uno de los resultados del mapper
res[0]

{'TITLE': 105,
 '"EMBRACING': 1,
 'LOVE.': 62,
 'VOLUME': 2088,
 '3': 1130,
 '&': 10862,
 '4': 771,
 '/': 134087,
 'STORY': 5634,
 'AND': 42039,
 'ART': 3359,
 'BY': 73402,
 'YOUKA': 1,
 'NITTA': 2,
 ';': 69244,
 '[TRANSLATION': 696,
 'PENNY': 139,
 'HER': 638,
 'DOLL': 69,
 'POSSESSION': 13,
 ':': 52872,
 'A': 42600,
 'ROMANCE': 239,
 'A.S.': 15,
 'BYATT.': 6,
 'BOSTON': 172,
 '"MUY': 2,
 'BUENO': 4,
 'THREE': 1054,
 'GENERATIONS': 24,
 'OF': 50208,
 'AUTHENTIC': 96,
 'MEXICAN': 90,
 'FLAVOR': 52,
 'RECIPES': 1607,
 'QB': 4,
 'BLITZ': 9,
 '[TEXT': 2104,
 'WRITTEN': 6747,
 'STEPHANIE': 312,
 'PETERS].': 1,
 'MY': 3900,
 'LEAD': 81,
 'DOG': 851,
 'WAS': 678,
 'LESBIAN:': 1,
 'MUSHING': 1,
 'ACROSS': 165,
 'ALASKA': 110,
 'IN': 18448,
 'THE': 107438,
 'IDITAROD—THE': 1,
 "WORLD'S": 528,
 'MOST': 917,
 'GRUELING': 1,
 'RACE': 279,
 '"BARRIO': 3,
 'CUBA': 79,
 '[VIDEORECORDING]': 20084,
 'LAGUNA': 15,
 'PRODUCTIONS': 2971,
 'PRESENTS': 932,
 'AN': 5708,
 'ICAIC/FINE': 2,
 '"QUADRIVIUM': 2,

In [None]:
'''
Una vez terminado el Map, definimos el Reduce
'''

# generamos un diccionario que concentrara los resultados reducidos
reduced = {}

# iteramos cada elemento del mapper
for i in res:
  # recorremos a traves de las llaves
  for key in i.keys():
    # si no existe, entonces la creamos con el primer valor que encuentra 
    #   para esa llave
    if key not in reduced.keys():
      reduced[key] = i[key]
    # si existe, entonces sumamos los valores
    else:
      reduced[key] = reduced[key] + i[key]

In [None]:
'''
observamos el total de llaves concentradas en cada parte del mapper
'''
for i in range(len(res)):
  print('Total de llaves en la parte {}: {}'.format(i+1,len(res[i].keys())))

Total de llaves en la parte 1: 153758
Total de llaves en la parte 2: 156178
Total de llaves en la parte 3: 156310
Total de llaves en la parte 4: 155134


In [None]:
# observamos el numero total de llaves reducidas
print('Total de Llaves reducidas: {}'.format(len(reduced.keys())))

Total de Llaves reducidas: 227903


In [None]:
# visualizamos las llaves (diferentes) reducidas
reduced.keys()



In [None]:
'''
Podemos visualizar el conteo de cada llave/palabra tanto en el total, despues
de la reduccion, como en cada una de las partes antes de ser reducida
'''

desired_word = 'AMERICAN'

desired_word_counter_after_reduce = 0

print('La palabra {} aparece un numero total de {}'.format(desired_word,reduced[desired_word]))
for i in range(len(res)):
  print('{} aparece {} veces en la parte {}'.format(desired_word,res[i][desired_word],i+1))
  desired_word_counter_after_reduce += res[i][desired_word]



La palabra AMERICAN aparece un numero total de 9760
AMERICAN aparece 2491 veces en la parte 1
AMERICAN aparece 2390 veces en la parte 2
AMERICAN aparece 2490 veces en la parte 3
AMERICAN aparece 2389 veces en la parte 4
