## Procesamiento de un corpus, de forma secuencial o paralela

Se van a procesar las frase el  corpus Brown de texto  que  es proporcionado por el módulo nltk.corpus

<strong>Módulos que se van a importar</strong>

In [7]:
import pandas as pd
import multiprocessing as mp
import numpy as np
import time
import datetime as DT
#modulo para usar expresiones regulares
import re
from multiprocessing import Pool
#importación del corpus brown
from nltk.corpus import brown


In [5]:
# import nltk
# nltk.download('brown')

[nltk_data] Downloading package brown to
[nltk_data]     C:\Users\U029987\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping corpora\brown.zip.


True

<strong>Funciones que se van a definir</strong><br/>
Se definen las funciones que se van a utilizar. Obsérvese que se pasa como parámetro la columna text del DataFrame y por tanto, las funciones que se quieren aplicar deben ejecutarse con apply para que se apliquen  a cada fila.<br/>
    Observa la funcion reemplazar_comillas e implementa las demás usanddo  las funciones que se indican en el comentario

In [8]:
def construye_textos():
    """ 
        Funcion que construye el DataFrame para su procesamiento, on una frase por fila
    Returns:
        Lista de con textos de corpus brown
    """
    return [" ".join(np.random.permutation(sents)) for sents in brown.sents()]

def construye_datafame_texto(nombre_columna="text"):
    """ 
        Funcion que crea un dataframe a partir de una lista de textos y 
        pone como nombre de la columna el pasado por argumentos. En caso de no pasar ninguna
        por default es 'text'.
    """
    lista = construye_textos()
    df = pd.DataFrame(lista, columns= [nombre_columna] )
    return df


def reemplazar_comillas(df, nombre_columna='text'):
    """ Funcion que reemplaza comillas dobles en un dataframe. Pasamos el nombre de la columna
        como parámetro.  En caso de no pasar ninguna por default es 'text'.
    """
    df[nombre_columna] = df[nombre_columna].apply(lambda text: text.replace("``",'"'))
    return df

def a_minusculas(df, nombre_columna = "text"):
    """ Funcion que convierte todas las palabras a minúsculas en un dataframe.
        Pasamos el nombre de la columna como parámetro,  En caso de no pasar ninguna
        por default es 'text'.
    """
    df[nombre_columna] = df[nombre_columna].apply(lambda text: text.lower())
    return df

def contar_palabras(df, nombre_columna='text'):
    """ funcion que cuenta palabras de cada fila del dataframae y añade una columna con el resultado
    Args:
        df: Dataframe
        nombre_columna: Nombre de la columna de la que queremos contar palabras.  En caso de no pasar ninguna
        por default es 'text'.

    Returns:
        dataframe con una columna 'num_palabras' y el numero de palabras
    """
    df['num_palabras'] = df[nombre_columna].apply(lambda text: len(re.split(r"(?:\s+)|(?:,)|(?:\-)",text)))
    return df
    




<strong>Función que se va a aplicar</strong><br/>
Esta función invocará a las definidas y devolverá el dataframe modificado

In [8]:
def procesar_df(df):
  """ Función que se aplicará al df para procesarlo. En este caso se ha decidido hardcodear la columna para que no haya problemas de iteración en el pool.map

  Args:
      df (_type_): Dataframe original
      nombre_columna (_type_): Nombre de la columna de texto. En caso de no pasar ninguna
        por default es 'text'.

  Returns:
     salida_df : 
  """
  # Se hace copia del dataframe para no modificarlo
  salida_df = df.copy()

  # Reemplaza las comillas
  salida_df['text'] = reemplazar_comillas(salida_df, 'text')

  # Pasa el texto a minusculas
  salida_df['text']  = a_minusculas(salida_df, 'text')

  # cuenta el número de palabras y construye columna nueva con nombre num_palabras
  salida_df = contar_palabras(salida_df, 'text')
     

  # Elimina los textos demasiado largos
  texto_largo_para_eliminar = salida_df[salida_df['num_palabras'] > 50]
  salida_df.drop(texto_largo_para_eliminar.index, inplace=True)

  # Elimina los textos demasiado cortos
  texto_corto_para_eliminar = salida_df[salida_df['num_palabras'] < 10]
  salida_df.drop(texto_corto_para_eliminar.index, inplace=True)    

  # Reinicializa los indices
  salida_df.reset_index(drop=True, inplace=True)

  return salida_df



<strong>Programa principal</strong><br/>
Se crea el dataframe a partir del corpus y se invoca a la función sobre todo el dataframe según modo secuencial.<br/>
En paralelo, se divide en trozos el dataframe para que sea procesado por distintos hilos.\
Para poder utilizar el pool con el kernel de Jupyter, necesitamos que la función a iterar esté en un archivo python externo, así que guardamos las funciones en funciones_pool.py y las llamamos de ahí.

In [10]:
from funciones_pool import procesar_df
from funciones_pool import construye_textos

if __name__=="__main__":
    
    #Creamos el dataframe con el doble de textos que en el caso inicial, para que los procesadores trabajen un poco más. 
    dataframe_brown = pd.DataFrame({
    'text': construye_textos() + construye_textos() + construye_textos() + construye_textos() + construye_textos() + construye_textos() + construye_textos() + construye_textos()
    })
    # print(dataframe_brown)
    print("================================================================\n")
    ####################################################
    # Se empieza el contador y la ejecución secuencial
    ###################################################
    comienzo = DT.datetime.now()
    df_procesado = procesar_df(dataframe_brown)
    acabo = DT.datetime.now()
    
    ####################################################
    # Se imprime el dataframe y el tiempo tomado
    ####################################################
    #print(df_procesado)
    print("================================================================\n")
    tiempo_total = acabo-comienzo
    print('El tiempo total secuencial ha sido de %s' %tiempo_total)
    print("================================================================\n")
    
    # Muestro ahora la fila 1
    #print(df_procesado.loc()[0])
    print("================================================================\n")
    
    ####################################################
    # Imprimimos por pantalla el número de nucleos para comparar
    ####################################################
    print(mp.cpu_count())
    
    ####################################################
    # Empezamos la ejecución paralela.
    # Haremos distintas pruebas con el número de dataframes y el número de nodos en el pool. 
    ####################################################
    trozos_df_2 = np.array_split(dataframe_brown, 2)
    trozos_df_4 = np.array_split(dataframe_brown, 4)
    trozos_df_8 = np.array_split(dataframe_brown, 8)
    trozos_df_12 = np.array_split(dataframe_brown, 12)
    trozos_df_16 = np.array_split(dataframe_brown, 16)
    trozos_df_24 = np.array_split(dataframe_brown, 24)
    
    ####################################################
    # Haremos pruebas dividiendo el df en 2,4,8,12,16 y 24 splits
    # y tomando de 2 a 8 cores
    ####################################################
    print("================================================================\n")  
    for i in range (2,9):
        for j in [2,4,8,12,16,24]:
            comienzo = DT.datetime.now()
            # Comienzo el pool
            pool = Pool(i)
            trozos_df = np.array_split(dataframe_brown, j)
            # Genero el df
            df_final = pd.concat(pool.map(procesar_df, trozos_df), ignore_index=True)
            acabo = DT.datetime.now()
            # Cierro el pool
            pool.close()
            pool.join()
            # Calculo el tiempo 
            tiempo_total = acabo-comienzo
            print('El tiempo total con {0} cores y {1} splits ha sido de {2}'.format(i,j,tiempo_total))
            print("================================================================\n")
    
    # print(df_procesado)
    # print(df_procesado.loc()[0])



El tiempo total secuencial ha sido de 0:00:03.918996


8

El tiempo total con 2 cores y 2 splits ha sido de 0:00:06.281770

El tiempo total con 2 cores y 4 splits ha sido de 0:00:05.455031

El tiempo total con 2 cores y 8 splits ha sido de 0:00:04.809310

El tiempo total con 2 cores y 12 splits ha sido de 0:00:04.992543

El tiempo total con 2 cores y 16 splits ha sido de 0:00:05.199355

El tiempo total con 2 cores y 24 splits ha sido de 0:00:04.951072

El tiempo total con 3 cores y 2 splits ha sido de 0:00:05.631107

El tiempo total con 3 cores y 4 splits ha sido de 0:00:04.966463

El tiempo total con 3 cores y 8 splits ha sido de 0:00:06.679256

El tiempo total con 3 cores y 12 splits ha sido de 0:00:06.242478

El tiempo total con 3 cores y 16 splits ha sido de 0:00:06.998689

El tiempo total con 3 cores y 24 splits ha sido de 0:00:06.143559

El tiempo total con 4 cores y 2 splits ha sido de 0:00:06.540276

El tiempo total con 4 cores y 4 splits ha sido de 0:00:05.160298

El tiempo 

Pese a que en un principio podemos esperar que el multiprocessing sea siempre más rápido que la ejecución secuencial, no tiene por qué. Hemos de optimizar el código que utilizamos para que el multiprocessing tenga sentido. 

Por mucho que indiques que vas a utilizar 8 cores, si tienes que procesar 2 splits, solo estarán trabajando dos de ellos, que no es lo más adecuado.
Hemos de encontrar un equlibrio entre los cores y los iteradores que utilizaremos, además, los pools tienen un coste computacional también, si los datos sobre los que queremos trabajar no son los suficientemente extensos, será mayor el tiempo que tardemos en arrancar los pools, que el tiempo que necesitamos para recorrerlo de forma secuencial.
En este caso, el mejor tiempo se ha obtenido con 6 cores y 4 splits, y ha sido de 4 segundos 50 milésimas, y el peor tiempo, se ha obtenido con 8 cores y 8 splits, pese a que parecería lo más óptimo.
Se adjunta además un muy buen post de StackOverflow donde se discute sobre el buen uso del Pool:
https://stackoverflow.com/questions/20727375/multiprocessing-pool-slower-than-just-using-ordinary-functions

Para ver un ejemplo donde 8 cores sí mejoran el proceso, vamos a crear un dataset mucho mayor.


In [13]:
from funciones_pool import procesar_df
from funciones_pool import construye_textos

if __name__=="__main__":
     
     dataframe_brown = pd.DataFrame({
     'text': construye_textos() + construye_textos() + construye_textos() + construye_textos() + construye_textos() + construye_textos() + construye_textos() + construye_textos() +\
          construye_textos() + construye_textos() + construye_textos() + construye_textos() + construye_textos() + construye_textos() + construye_textos() + construye_textos() +\
          construye_textos() + construye_textos() + construye_textos() + construye_textos() + construye_textos() + construye_textos() + construye_textos() + construye_textos()
     })

     ####################################################
     # Se empieza el contador y la ejecución secuencial
     ###################################################
     comienzo = DT.datetime.now()
     df_procesado = procesar_df(dataframe_brown)
     acabo = DT.datetime.now()

     ####################################################
     # Se imprime el dataframe y el tiempo tomado
     ####################################################
     #print(df_procesado)
     print("================================================================\n")
     tiempo_total = acabo-comienzo
     print('El tiempo total secuencial ha sido de %s' %tiempo_total)
     print("================================================================\n")

     comienzo = DT.datetime.now()
     # Comienzo el pool
     pool = Pool(8)
     trozos_df = np.array_split(dataframe_brown, 16)
     # Genero el df
     df_final = pd.concat(pool.map(procesar_df, trozos_df), ignore_index=True)
     acabo = DT.datetime.now()
     # Cierro el pool
     pool.close()
     pool.join()
     # Calculo el tiempo 
     tiempo_total = acabo-comienzo
     print('El tiempo total con 8 cores y 16 splits ha sido de {0}'.format(tiempo_total))
     print("================================================================\n")



El tiempo total secuencial ha sido de 0:00:16.910998

El tiempo total con 8 cores y 16 splits ha sido de 0:00:15.769876



Donde ya se puede empezar a apreciar una ligera mejora respecto a la secuencial´.