# Actividad 2: Procesamiento del corpus Brown

En esta segunda tarea cargamos el corpus *Brown* que proporciona *NLTK* para realizar ciertos procesamientos sobre el mismo y comparar el rendimiento de un procesado secuencial frente a un procesado en paralelo con un *Pool* de procesos. Vamos a explicar una a una las diferentes funciones implementadas para realizar dicho procesamiento.

In [None]:
import pandas as pd
import multiprocessing
import nltk
import numpy as np
import re
import time
from multiprocessing import pool
from nltk.corpus import brown

La función `construye_textos()` simplemente toma las frases contenidas en el corpus *Brown* y genera una permutación aleatoria sobre sus palabras. Los elementos de dicha permutación se unen con espacios mediante `" ".join()` y se almacenan en una lista.  

In [None]:
# Esta función toma cada frase del corpus "brown" y genera una permutación aletoria
# de sus palabras generando como resultado una frase en otro orden distinto al original
# Las frases generadas se devuelven en una lista.
def construye_textos():
    return [" ".join(np.random.permutation(sents)) for sents in brown.sents()]

La función `reemplazar_comillas(texto)` recibe como argumento una columna del DataFrame Pandas que contien el texto permutado y reemplaza las comillas `` por una comilla doble ". Para ello, se hace uso de la función `apply` y una función `lambda` de tal forma que, para cada línea se aplica dicha transformación.

In [None]:
# Esta función reemplaza las comillas `` por ""
def reemplazar_comillas(texto):
    return texto.apply(lambda text: text.replace("``", '"'))

La función `to_lowercase(texto)` recibe como argumento una columa del DataFrame Pandas que contiene el texto permutado y pasa todas las palabras a minúsculas. Para ello, de nuevo, hacemos uso de `apply` y una función `lambda` que invoca la función `lower` sobre cada una de las filas. 

In [None]:
# Esta función convierte todas las palabras a minúsculas
def to_lowercase(texto):
    return texto.apply(lambda text: text.lower())

La función `contar_palabras(texto)` recibe como argumento una columna del DataFrame Pandas que contiene el texto permutado y hace un conteo de todas las palabras que contiene cada fila. Para ello generamos en la variable ``pattern`` un patrón (expresión regular) que aplicamos para particionar cada una de las líneas de dicha columna mediante ``re.split``. Esta última función devuelve una lista sobre la que invocamos `len` para obtener su longitud o lo que es lo mismo, el número de palabras. 

In [None]:
# Esta función cuenta las palabras de cada fila en el Dataframe
# Para ello, utiliza la librería re de expresiones regulares y la función split
# para separar cada línea en base a cierto patrón. La función re.split() devuelve
# una lista, para obtener el número de palabras usaremos len() sobre dicha lista
def contar_palabras(texto):
    pattern = r"(?:\s+)|(?:,)|(?:\-)"
    return texto.apply(lambda x: len(re.split(pattern, x)))

El procesado completo del DataFrame se realiza desde la función `process_df(df)` que recibe dicho DataFrame como argumento de entrada y devuelve otro DataFrame distinto sobre el que se han realizado todas las transformaciones previamente descritas. Como puede observarse, la función comienza realizando una copia del DataFrame recibido como arugmento para no modificarlo. Sobre la copia generada se reemplazan las comillas, se pasan sus palabras a minúsculas, y se le añade una nueva columna `words` que contiene un conteo de las palabras de cada fila.

Finalmente realizamos un procesado adicional en el que eliminamos aquellas filas que contengan texto con un número excesivo o demasiado bajo de palabras. En concreto, eliminaremos aquellas filas que tengan más de 50 o menos de 10 palabras. Para ello, usamos una búsqueda condicional (`res_df[res_df['words'] > 50]` y `res_df[res_df['words'] < 10`) y la función `drop` para eliminaras del nuevo DataFrame. 

In [None]:
# Esta función se encarga de procesar el Dataframe en base a las funciones definidas
# previamente
def process_df(df):
    # Generamos una copia del DataFrame para no modificarlo
    res_df = df.copy(deep = True)
    # Reemplazamos las comillas usando reemplazar_comillas
    res_df['text'] = reemplazar_comillas(res_df['text'])
    # Pasamos el texto a minúsculas
    res_df['text'] = to_lowercase(res_df['text'])
    # Construimos una nueva columna en el Dataframe contando el número de palabras
    # de cada línea
    res_df['words'] = contar_palabras(res_df['text'])
    # Vamos a realizar un procesado adicional para textos demasiado largos o demasiado cortos
    # Vamos a elminar los textos demasiado largos > 50 palabras
    long_text = res_df[res_df['words'] > 50]
    res_df.drop(long_text.index, inplace = True)
    # Por otra parte vamos a eliminar los textos demasiado cortos < 50 palabras
    short_text = res_df[res_df['words'] < 10]
    res_df.drop(short_text.index, inplace = True)
    # Hemos terminado el procesado y devolvemos el DatFrame
    return res_df

En la parte del código que se muestra a continuación realizamos la comparación de los tiempos de ejecución secuenciales con los paralelos. Comenzamos generando un DataFrame con una columna `text` que se obtiene llamando cuatro veces a la función `construye_textos()` y concatenando las listas que devuelve cada una de las llamadas. Una vez generado el Dataframe se realiza una llamada a `process_df()` de forma secuencial (es decir, la función será ejecutada por un solo proceso) y se mide el tiempo de ejecución.

A continuación, se repite la tarea pero usando un `Pool` de procesos. Para ello, empezamos creando la variable `NUM_CORES` que contendrá el número de procesos que se asignarán al `Pool`. En este contexto podemos proceder de dos maneras diferentes. Por una parte, podemos hacer uso de la función `cpu_count()` del módulo `multiprocessing` que nos devolvería el número de procesadores lógicos a los que tiene acceso el intérprete de Python. Sin embargo, es posible que este número de procesadores lógicos no se corresponda con el número de procesadores físicos que tiene el sistema en el que se ejecute el código. Podemos optar por tanto, por fijar manualmente la variable `NUM_CORES` al número de procesadores físicos. En el caso que exponemos a continuación se ha optado por esta última opción y se ha fijado `NUM_CORES = 8`. 

Una vez elegido el número de procesos, se utiliza la función `np.array_split(brown_df, NUM_CORES)` para particionar nuestro DataFrame en tantos *chunks* o segmentos como procesos vayamos a utilizar. De esta forma, garantizamos que a cada proceso se le asigne una parte del DataFrama además de tratar de balancear la carga de trabajo realizando un particionado con el mismo número de filas por segmento. Finalmente, se genera el `Pool`de procesos y se hace uso de la función `pool.map` para que cada proceso aplique la función `process_df()` sobre el segmento que le corresponde. Se hace uso finalmente de la función `concat` de `pandas` para concatenar en un único DataFrame los DataFrames procesados que devuelve cada proceso. 

El código que aquí se adjunta no puede ser ejecutado en formato *notebook*. Sin embargo, a nivel local se lanzó la ejecución de este script, repiendo un total de 100 veces las tomas de tiempo tanto para el procesado secuencial como para el paralelo. Una vez realizadas todas las medidas, se calculó la media para obtener los siguientes tiempos:

- Tiempo medio de ejecución en secuencial: 2.6212 s
- Tiempo medio de ejecución en paralelo: 0.76166 s

En este caso, podemos observar como obtenemos un tiempo menor utilizando un `Pool` de ocho procesos. Podemos calcular la aceleración la siguiente expresión:

$$ Aceleracion = \frac{T_{ej\_secuencial}}{T_{ej\_paralela}} = \frac{2.6212}{0.76166} = 3.4414 $$

Vemos como obtenemos una aceleración del 3.4414 es decir, el proceso paralelizado es 3.4414 veces más rápido que el proceso secuencial. Si bien estamos lejos de obtener una aceleración $n$, es decir, una aceleración igual al número de procesos del `Pool`, sí que obtenemos una aceleración lo suficientemente significativa como para suponer una ventaja frente al procesado en secuencial. De todo esto también puede deducirse que, para el procesamiento de largos conjuntos de datos que contienen lenguaje natural, el paralelizado de dichas tareas puede acelerar significativamente la velocidad de procesado. En nuestro caso, hemos trabajado con un DataFrame de 229360 filas y con 8 procesos. Para DataFrames más grandes (con un número mayor de filas) y con computadores con mayor capadidad de proceso (entendida en este contexto como un número mayor de unidades de proceso) se podría llegar a obtener una aceleración todavía más significativa. 

In [None]:
# Comenzamos construyendo el DataFrame con el corpus "BROWN"
# El DataFrame tendrá en principio una única columna 'text' en la que
# cada fila será una frase del corpus brown permutada aleatoriamente
print("Generando nuevo DataFrame...\n\n")
brown_df = pd.DataFrame({'text': construye_textos() + construye_textos() + construye_textos() + construye_textos()})
print(f"Generado el nuevo Dataframe -> \n {brown_df.head()}\n\n")
# Comenzamos planteando el procesado del Dataframe en secuencial
t0 = time.time()
processed_df = process_df(brown_df)
print(f"Tiempo en secuencial -> {time.time() - t0}")
print(f"{processed_df.head()}\n\n")
# Seguimos planteando el procesado del DataFrame de forma paralelizada
# Para empezar dividimos el DataFrame en tantos "chunks" como procesadores
# tenga nuestra máquina. La función multiprocessing.cpu_countS() devuelve el 
# número de "procesadores lógicos" a los que tiene acceso el intérprete de Python.
# La máquina en la que se desarrolló este código se contaba con 8 procesadores físicos
# por lo que tomaremos este número para NUM_CORES con el objetivo de mejorar el 
# rendimiento del código paralelizado.
NUM_CORES =  8 #multiprocessing.cpu_count()
print(f"Iniciando procesado en paralelo usando un Pool de {NUM_CORES} procesos\n\n")
df_chunks = np.array_split(brown_df, NUM_CORES)
t0 = time.time()
# Cada proceso del Pool procesa una de las partes en las que se ha dividido el DataFrame original.
# Finalmente se usa pd.concat() para ir acumulando los resultados que produce cada proceso
with multiprocessing.Pool(NUM_CORES) as pool:
    processed_df_pool = pd.concat(pool.map(process_df, df_chunks), ignore_index = True)
print(f"Tiempo en paralelo -> {time.time() - t0}\n")
print(f"{processed_df_pool.head()}\n\n")

En este último fragmento de código se muestran algunas de las frases seleccionadas aleatoriamente tanto del DataFrame obtenido de forma secuencial como del obtenido de forma paralela. 

In [None]:
# Mostramos algunas frases extraidas aleatoriamente del procesado secuencial
print("----------------EJEMPLOS DEL PROCESADO SECUENCIAL----------------")
for i in range(5):
    print(f"{processed_df.iloc[np.random.randint(0, len(processed_df))]['text']}\n")
# Mostramos algunas frases extraidas aleatoriamente del procesado paralelo
print("----------------EJEMPLOS DEL PROCESADO PARALELO----------------")
for i in range(5):
    print(f"{processed_df_pool.iloc[np.random.randint(0, len(processed_df_pool))]['text']}\n")