Para resolver este challenge hice uso de principalmente de dos herramientas para el procesamiento de datos: Pandas y PySpark. Ambas, ampliamente utilizadas en el ámbito de la ingeniería de datos.

Pandas por su parte es ideal para conjuntos de datos de tamaño moderado que caben cómodamente en la memoria de un solo nodo. Si bien su tiempo de ejecución es más lento que Pyspark, tiene una gran capacidad para trabajar con datos estructurados de forma intuitiva y realizar tareas de limpieza, transformación y preparación de datos.

Por otro lado, PySpark es ideal para trabajar con conjuntos de datos de gran escala. Gracias a su arquitectura distribuida aprovecha todos los nodos y si bien esto hace que consuma más memoria, el tiempo de ejecución es significativamente más bajo que herramientas como Pandas. Permite escalar horizontalmente y tiene gran tolerancia a fallos

Por todo esto decidí usar Pandas cuando el tamaño de los datos era manejable en memoria y las operaciones requeridas podían realizarse eficientemente en un solo nodo. Recurrí a PySpark cuando necesitábamos procesar los datos de manera distribuida para obtener resultados en un tiempo mucho menor.

In [None]:
file_path = "farmers-protest-tweets-2021-2-4.json"

In [None]:
import os
from q1_memory import *
from q2_memory import *
from q3_memory import *
from q1_time import *
from q2_time import *
from q3_time import *

In [None]:
base_path = os.getcwd().replace('\\', '/')
file_path = "input/farmers-protest-tweets-2021-2-4.json"

El primer paso que creí necesario para llevar adelante este desarrollo fue desarrollar un primer release, que contenga los desarrollos de las seis funciones y cuyo output sea por supuesto identico entre funciones análogas.

In [None]:
r_q1_memory = q1_memory(f'{base_path}/{file_path}')
print(r_q1_memory)

In [None]:
r_q1_time = q1_time(f'{base_path}/{file_path}')
print(r_q1_time)

In [None]:
r_q2_memory = q2_memory(f'{base_path}/{file_path}')
print(r_q2_memory)

In [None]:
r_q2_time = q2_time(f'{base_path}/{file_path}')
print(r_q2_time)

In [None]:
r_q3_memory = q3_memory(f'{base_path}/{file_path}')
print(r_q3_memory)

In [None]:
r_q3_time = q3_time(f'{base_path}/{file_path}')
print(r_q3_time)

FIN DEL PRIMER RELEASE 
->
Teniendo las seis funciones trabajando correctamente, voy a enfocarme ahora en reducir el uso de memoria de aquellas que lo requieran. Usando la librería memory_profiler pude trackear lo que supe que iba a ocurrir. Hasta este punto los dataframe de pandas se creaban con el siguiente comando: json_df = pd.read_json(file_path, lines=True). El problema con el mismo es que guarda en memoria toda la información contenida en el JSON de una sola vez, aumentando el uso de la misma. Este es el mayor problema de este grupo de funciones. Para resolverlo voy a trabajar con 2 versiones diferentes de cada qn_memory() y analizaré el uso de memoria cargando los datos en chuncks de diferentes tamaños vs el uso de memoria de la funcion previamente escrita.

INICIO DEL AUMENTO EN PERFORMANCE DE MEMORIA ->

In [None]:
from typing import List, Tuple
from datetime import datetime
import pandas as pd
from memory_profiler import profile
import os

base_path = os.getcwd().replace('\\', '/')
file_path = "input/farmers-protest-tweets-2021-2-4.json"


@profile
def q1_memory_old(file_path: str) -> List[Tuple[datetime.date, str]]:

    # Crea un dataframe de pandas a partir del json.
    json_df = pd.read_json(file_path, lines=True)

    user_df = json_df['user'].apply(pd.Series)
    json_df = pd.concat([json_df, user_df], axis=1)

    # Convierte el formato de fecha a YYYY-MM-DD
    json_df['date'] = pd.to_datetime(json_df['date']).dt.date

    # Agrupa por fecha y filtra todas excepto aquellas con mayor cantidad de tweets / dia.
    date_df = json_df.groupby(['date']).size().reset_index(name='count').sort_values(by='count').tail(10)

    # Remueve del dataframe original todos los registros donde la fecha sea diferente a aquellas donde hubo más actividad.
    json_df = json_df[json_df.date.isin(date_df['date'])]

    # Agrupa por fecha y usuario, y cuenta el número de tweets por fecha y usuario
    grouped_df = json_df.groupby(['date', 'username']).size().reset_index(name='count')

    # Encuentra el usuario con más tweets por fecha
    max_tweets_df = grouped_df.loc[grouped_df.groupby('date')['count'].idxmax()]

    # Ordena los resultados por fecha filtrando aquellas donde hayan menos actividad.
    max_tweets_df = max_tweets_df.sort_values(by='date').tail(10)

    # Recopila los resultados
    return list(zip(max_tweets_df['date'], max_tweets_df['username']))


@profile
def q1_memory_dev(file_path: str) -> List[Tuple[datetime.date, str]]:

    df_concat = pd.DataFrame(columns=['date', 'username'])
    for json_df in pd.read_json(file_path, lines=True, chunksize=100):
        # Se extraen los key/value del json anidado y se guarda en un nuevo df.
        user_df = json_df['user'].apply(pd.Series)

        # Se remueven todas las columnas excepto date para reducir la carga en memoria de la variable.
        json_df = json_df['date']

        # Se remueven todas las columnas excepto username para reducir la carga en memoria de la variable.
        user_df = user_df['username']

        # Ambos dataframes se unen nuevamente.
        json_df = pd.concat([json_df, user_df], axis=1)

        df_concat = pd.concat([df_concat, json_df])

    json_df = df_concat

    # Se elimina el df para liberar espacio en memoria.
    del user_df
    del df_concat

    # Convierte el formato de fecha a YYYY-MM-DD
    json_df['date'] = pd.to_datetime(json_df['date']).dt.date

    # Agrupa por fecha y filtra todas excepto aquellas con mayor cantidad de tweets / dia.
    date_df = json_df.groupby(['date']).size().reset_index(name='count').sort_values(by='count').tail(10)

    # Remueve del dataframe original todos los registros donde la fecha sea diferente a aquellas donde hubo más actividad.
    json_df = json_df[json_df.date.isin(date_df['date'])]

    # Se elimina el df para liberar espacio en memoria.
    del date_df

    # Agrupa por fecha y usuario, y cuenta el número de tweets por fecha y usuario
    json_df = json_df.groupby(['date', 'username']).size().reset_index(name='count')

    # Encuentra el usuario con más tweets por fecha
    json_df = json_df.loc[json_df.groupby('date')['count'].idxmax()]

    # Ordena los resultados por fecha filtrando aquellas donde hayan menos actividad.
    json_df = json_df.sort_values(by='date').tail(10)

    # Recopila los resultados
    return list(zip(json_df['date'], json_df['username']))

In [None]:
t = q1_memory_old(f'{base_path}/{file_path}')
print(t)

Ejecucion de la funcion q1_memory. Version del primer release.

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    11   1437.8 MiB   1437.8 MiB           1   @profile
    12                                         def q1_memory_old(file_path: str) -> List[Tuple[datetime.date, str]]:
    13                                         
    14                                             # Crea un dataframe de pandas a partir del json.
    15   2530.1 MiB   1092.2 MiB           1       json_df = pd.read_json(file_path, lines=True)
    16                                         
    17   2650.9 MiB    120.8 MiB           1       user_df = json_df['user'].apply(pd.Series)
    18   2667.8 MiB     16.9 MiB           1       json_df = pd.concat([json_df, user_df], axis=1)
    19                                         
    20                                             # Convierte el formato de fecha a YYYY-MM-DD
    21   2668.0 MiB      0.2 MiB           1       json_df['date'] = pd.to_datetime(json_df['date']).dt.date
    22                                         
    23                                             # Agrupa por fecha y filtra todas excepto aquellas con mayor cantidad de tweets / dia.
    24   2668.0 MiB      0.0 MiB           1       date_df = json_df.groupby(['date']).size().reset_index(name='count').sort_values(by='count').tail(10)
    25                                         
    26                                             # Remueve del dataframe original todos los registros donde la fecha sea diferente a aquellas donde hubo más actividad.
    27   2664.8 MiB     -3.2 MiB           1       json_df = json_df[json_df.date.isin(date_df['date'])]
    28                                         
    29                                             # Agrupa por fecha y usuario, y cuenta el número de tweets por fecha y usuario
    30   2668.1 MiB      3.3 MiB           1       grouped_df = json_df.groupby(['date', 'username']).size().reset_index(name='count')
    31                                         
...
    39   2668.1 MiB      0.0 MiB           1       return list(zip(max_tweets_df['date'], max_tweets_df['username']))

In [None]:
t = q1_memory_dev(f'{base_path}/{file_path}')
print(t)

Ejecucion de la funcion q1_memory. Version de desarrollo para aumentar la performance.

Chuncksize = 100

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    42    146.7 MiB    146.7 MiB           1   @profile
    43                                         def q1_memory_dev(file_path: str) -> List[Tuple[datetime.date, str]]:
    44                                         
    45    146.7 MiB      0.0 MiB           1       df_concat = pd.DataFrame(columns=['date', 'username'])
    46    162.1 MiB   -397.7 MiB        1176       for json_df in pd.read_json(file_path, lines=True, chunksize=100):
    47                                                 # Crea un dataframe de pandas a partir del json.
    48                                         
    49                                                 # Se extraen los key/value del json anidado y se guarda en un nuevo df.
    50    162.1 MiB   -418.3 MiB        1175           user_df = json_df['user'].apply(pd.Series)
    51                                         
    52                                                 # Se remueven todas las columnas excepto date para reducir la carga en memoria de la variable.
    53    162.1 MiB   -421.6 MiB        1175           json_df = json_df['date']
    54                                         
    55                                                 # Se remueven todas las columnas excepto username para reducir la carga en memoria de la variable.
    56    162.1 MiB   -423.2 MiB        1175           user_df = user_df['username']
    57                                         
    58                                                 # Ambos dataframes se unen nuevamente.
    59    162.1 MiB   -423.2 MiB        1175           json_df = pd.concat([json_df, user_df], axis=1)
    60                                         
    61    162.1 MiB   -421.9 MiB        1175           df_concat = pd.concat([df_concat, json_df])
    62                                         
...
    91    159.4 MiB      0.0 MiB           1       return list(zip(json_df['date'], json_df['username']))



Chuncksize = 1000

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    42    158.4 MiB    158.4 MiB           1   @profile
    43                                         def q1_memory_dev(file_path: str) -> List[Tuple[datetime.date, str]]:
    44                                         
    45    158.4 MiB      0.0 MiB           1       df_concat = pd.DataFrame(columns=['date', 'username'])
    46    182.1 MiB   -152.9 MiB         119       for json_df in pd.read_json(file_path, lines=True, chunksize=1000):
    47                                                 # Crea un dataframe de pandas a partir del json.
    48                                         
    49                                                 # Se extraen los key/value del json anidado y se guarda en un nuevo df.
    50    182.2 MiB    -12.1 MiB         118           user_df = json_df['user'].apply(pd.Series)
    51                                         
    52                                                 # Se remueven todas las columnas excepto date para reducir la carga en memoria de la variable.
    53    182.2 MiB    -81.6 MiB         118           json_df = json_df['date']
    54                                         
    55                                                 # Se remueven todas las columnas excepto username para reducir la carga en memoria de la variable.
    56    182.2 MiB    -80.1 MiB         118           user_df = user_df['username']
    57                                         
    58                                                 # Ambos dataframes se unen nuevamente.
    59    182.2 MiB    -60.8 MiB         118           json_df = pd.concat([json_df, user_df], axis=1)
    60                                         
    61    183.1 MiB      0.2 MiB         118           df_concat = pd.concat([df_concat, json_df])
    62                                         
...
    91    179.4 MiB      0.0 MiB           1       return list(zip(json_df['date'], json_df['username']))

In [None]:
from typing import List, Tuple
import pandas as pd
from memory_profiler import profile
import os

base_path = os.getcwd().replace('\\', '/')
file_path = "input/farmers-protest-tweets-2021-2-4.json"


@profile
def q2_memory_old(file_path: str) -> List[Tuple[str, int]]:
    # Lee los datos JSON en un DataFrame de Pandas
    json_df = pd.read_json(file_path, lines=True, encoding='utf-8')

    # Se filtran del dataframe todas las columnas excepto la deseada para obtener los datos.
    json_df = json_df['content'].to_frame() 

    # Esta expresion regular hace match con todos los codigos Unicode existentes para emojis.
    emoji_pattern = r'[\U0001F000-\U0001FFFF]'

    # Aplica la regex para extraer emojis y crea una nueva columna 'emojis'
    json_df['content'] = json_df['content'].str.findall(emoji_pattern)

    # Se eliminan los registros cuyo lenght == 0, es decir, aquellos que no contenian emojis.
    json_df = json_df[json_df['content'].apply(len) > 0]

    # Se aplica un flatten a las listas de registros para que cada registro corresponda a un emoji.
    json_df = json_df.explode('content')

    # Se agrupan los registros por emoji, contando la frecuencia de cada uno.
    json_df = json_df.groupby('content').size().reset_index(name='frequency')

    # Ordena los resultados por frecuencia y selecciona los primeros 15 resultados
    json_df = json_df.sort_values(by='frequency', ascending=False).head(15)

    # Ya que dentro de los codigos Unicode matcheados existían Emoji's Modifiers (modificadores de color por ejemplo) los cuales no son propiamente emojis, se filtran antes de retornar los valores correspondientes.
    json_df = json_df[json_df['content'].apply(lambda x: ord(x) not in (45, 65039, 127995, 127997))][0:10]

    # Se aplica el formato de salida deseado al dataframe.
    return list(zip(json_df['content'], json_df['frequency']))


@profile
def q2_memory_dev(file_path: str) -> List[Tuple[str, int]]:
    df_concat = pd.DataFrame(columns=['content'])

    for json_df in pd.read_json(file_path, lines=True, encoding='utf-8', chunksize=100):
        # Se filtran del dataframe todas las columnas excepto la deseada para obtener los datos.
        json_df = json_df['content'].to_frame() 
        df_concat = pd.concat([df_concat, json_df])

    json_df = df_concat

    # Se elimina el df para liberar ese espacio en memoria.
    del df_concat

    # Esta expresion regular hace match con todos los codigos Unicode existentes para emojis.
    emoji_pattern = r'[\U0001F000-\U0001FFFF]'

    # Aplica la regex para extraer emojis y crea una nueva columna 'emojis'
    json_df['content'] = json_df['content'].str.findall(emoji_pattern)

    # Se eliminan los registros cuyo lenght == 0, es decir, aquellos que no contenian emojis.
    json_df = json_df[json_df['content'].apply(len) > 0]

    # Se aplica un flatten a las listas de registros para que cada registro corresponda a un emoji.
    json_df = json_df.explode('content')

    # Se agrupan los registros por emoji, contando la frecuencia de cada uno.
    json_df = json_df.groupby('content').size().reset_index(name='frequency')

    # Ordena los resultados por frecuencia y selecciona los primeros 15 resultados
    json_df = json_df.sort_values(by='frequency', ascending=False).head(15)

    # Ya que dentro de los codigos Unicode matcheados existían Emoji's Modifiers (modificadores de color por ejemplo) los cuales no son propiamente emojis, se filtran antes de retornar los valores correspondientes.
    json_df = json_df[json_df['content'].apply(lambda x: ord(x) not in (45, 65039, 127995, 127997))][0:10]

    # Se aplica el formato de salida deseado al dataframe.
    return list(zip(json_df['content'], json_df['frequency']))

In [None]:
t = q2_memory_old(f'{base_path}/{file_path}')
print(t)

Ejecucion de la funcion q2_memory. Version del primer release.

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    10   3611.9 MiB   3611.9 MiB           1   @profile
    11                                         def q2_memory_old(file_path: str) -> List[Tuple[str, int]]:
    12                                             # Lee los datos JSON en un DataFrame de Pandas
    13   4691.5 MiB   1079.6 MiB           1       json_df = pd.read_json(file_path, lines=True, encoding='utf-8')
    14                                         
    15                                             # Se filtran del dataframe todas las columnas excepto la deseada para obtener los datos.
    16   4685.3 MiB     -6.3 MiB           1       json_df = json_df['content'].to_frame() 
    17                                         
    18                                             # Esta expresion regular hace match con todos los codigos Unicode existentes para emojis.
    19   4685.3 MiB      0.0 MiB           1       emoji_pattern = r'[\U0001F000-\U0001FFFF]'
    20                                         
    21                                             # Aplica la regex para extraer emojis y crea una nueva columna 'emojis'
    22   4456.3 MiB   -229.0 MiB           1       json_df['content'] = json_df['content'].str.findall(emoji_pattern)
    23                                         
    24                                             # Se eliminan los registros cuyo lenght == 0, es decir, aquellos que no contenian emojis.
    25   4460.1 MiB      3.8 MiB           1       json_df = json_df[json_df['content'].apply(len) > 0]
    26                                         
    27                                             # Se aplica un flatten a las listas de registros para que cada registro corresponda a un emoji.
    28   4462.8 MiB      2.7 MiB           1       json_df = json_df.explode('content')
    29                                         
    30                                             # Se agrupan los registros por emoji, contando la frecuencia de cada uno.
...
    40   4463.1 MiB      0.0 MiB           1       return list(zip(json_df['content'], json_df['frequency']))

In [None]:
t = q2_memory_dev(f'{base_path}/{file_path}')
print(t)

Ejecucion de la funcion q2_memory. Version de desarrollo para aumentar la performance.


Resultados con un chuncksize = 100:
Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    43   1375.8 MiB   1375.8 MiB           1   @profile
    44                                         def q2_memory_dev(file_path: str) -> List[Tuple[str, int]]:
    45   1375.8 MiB      0.0 MiB           1       df_concat = pd.DataFrame(columns=['content'])
    46                                         
    47   1376.4 MiB -156771.2 MiB        1176       for json_df in pd.read_json(file_path, lines=True, encoding='utf-8', chunksize=100):
    48                                                 # Se filtran del dataframe todas las columnas excepto la deseada para obtener los datos.
    49   1376.4 MiB -156676.4 MiB        1175           json_df = json_df['content'].to_frame() 
    50   1376.4 MiB -156615.9 MiB        1175           df_concat = pd.concat([df_concat, json_df])
    51                                         
    52   1281.0 MiB    -95.4 MiB           1       json_df = df_concat
    53                                         
    54                                             # Se elimina el df para liberar ese espacio en memoria.
    55   1281.0 MiB      0.0 MiB           1       del df_concat
    56                                         
    57                                             # Esta expresion regular hace match con todos los codigos Unicode existentes para emojis.
    58   1281.0 MiB      0.0 MiB           1       emoji_pattern = r'[\U0001F000-\U0001FFFF]'
    59                                         
    60                                             # Aplica la regex para extraer emojis y crea una nueva columna 'emojis'
    61   1281.9 MiB      0.9 MiB           1       json_df['content'] = json_df['content'].str.findall(emoji_pattern)
    62                                         
    63                                             # Se eliminan los registros cuyo lenght == 0, es decir, aquellos que no contenian emojis.
...
    79   1284.5 MiB      0.0 MiB           1       return list(zip(json_df['content'], json_df['frequency']))



Resultados con un chuncksize = 1000:
Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    43   1372.7 MiB   1372.7 MiB           1   @profile
    44                                         def q2_memory_dev(file_path: str) -> List[Tuple[str, int]]:
    45   1372.7 MiB      0.0 MiB           1       df_concat = pd.DataFrame(columns=['content'])
    46                                         
    47   1401.0 MiB   -322.2 MiB         119       for json_df in pd.read_json(file_path, lines=True, encoding='utf-8', chunksize=1000):
    48                                                 # Se filtran del dataframe todas las columnas excepto la deseada para obtener los datos.
    49   1401.0 MiB   -326.8 MiB         118           json_df = json_df['content'].to_frame() 
    50   1401.0 MiB   -292.8 MiB         118           df_concat = pd.concat([df_concat, json_df])
    51                                         
    52   1401.0 MiB      0.0 MiB           1       json_df = df_concat
    53                                         
    54                                             # Se elimina el df para liberar ese espacio en memoria.
    55   1401.0 MiB      0.0 MiB           1       del df_concat
    56                                         
    57                                             # Esta expresion regular hace match con todos los codigos Unicode existentes para emojis.
    58   1401.0 MiB      0.0 MiB           1       emoji_pattern = r'[\U0001F000-\U0001FFFF]'
    59                                         
    60                                             # Aplica la regex para extraer emojis y crea una nueva columna 'emojis'
    61   1401.9 MiB      0.9 MiB           1       json_df['content'] = json_df['content'].str.findall(emoji_pattern)
    62                                         
    63                                             # Se eliminan los registros cuyo lenght == 0, es decir, aquellos que no contenian emojis.
...
    79   1405.3 MiB      0.0 MiB           1       return list(zip(json_df['content'], json_df['frequency']))

In [None]:

import pandas as pd
from typing import List, Tuple
from memory_profiler import profile
import os

base_path = os.getcwd().replace('\\', '/')
file_path = "input/farmers-protest-tweets-2021-2-4.json"

# Funcion original

@profile
def q3_memory_old(file_path: str) -> List[Tuple[str, int]]:
    # Lee los datos JSON y los carga en un DataFrame de Pandas.
    json_df = pd.read_json(file_path, lines=True)

    # Se filtran todas las columnas menos la que contiene la información deseada a la vez que se eliminan los valores nulos de la misma.
    json_df = json_df['mentionedUsers'].dropna()

    # Se realiza un flatten a la lista de jsons contenida en la columna mentionedUsers y se extraen de la misma solo los username.
    json_df = json_df.explode('mentionedUsers').apply(lambda x: x['username']).to_frame() 

    # Agrupando por nombre de usuario, se realiza un count para saber la cantidad de menciones que cada usuario tuvo en el set de datos.
    grouped_df = json_df.groupby(['mentionedUsers']).size().reset_index(name='count')

    # Se ordenan los valores en orden descendente y se toman sólo los primeros diez registros, que corresponden a los usuarios más citados.
    grouped_df = grouped_df.sort_values(by='count', ascending=False).head(10)

    # Se ordenan los resultados según el formato requerido.
    return list(zip(grouped_df['mentionedUsers'], grouped_df['count']))


# Nueva version
@profile
def q3_memory_dev(file_path: str) -> List[Tuple[str, int]]:

    df_concat = pd.DataFrame(columns=['mentionedUsers'])
    for json_df in pd.read_json(file_path, lines=True, chunksize=100):

        #Se filtran todas las columnas menos la que contiene la información deseada a la vez que se eliminan los valores nulos de la misma.
        json_df = json_df['mentionedUsers'].dropna()

        # Se realiza un flatten a la lista de jsons contenida en la columna mentionedUsers y se extraen de la misma solo los username.
        json_df = json_df.explode('mentionedUsers').apply(lambda x: x['username']).to_frame() 

        df_concat = pd.concat([df_concat, json_df])

    # Agrupando por nombre de usuario, se realiza un count para saber la cantidad de menciones que cada usuario tuvo en el set de datos.
    grouped_df = df_concat.groupby(['mentionedUsers']).size().reset_index(name='count')

    # Se ordenan los valores en orden descendente y se toman sólo los primeros diez registros, que corresponden a los usuarios más citados.
    grouped_df = grouped_df.sort_values(by='count', ascending=False).head(10)

    # Se ordenan los resultados según el formato requerido.
    return list(zip(grouped_df['mentionedUsers'], grouped_df['count']))

In [None]:
r_q3_memory = q3_memory_old(f'{base_path}/{file_path}')
print(r_q3_memory)

Ejecucion de la funcion q3_memory. Version del primer release.

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    11    167.8 MiB    167.8 MiB           1   @profile
    12                                         def q3_memory_old(file_path: str) -> List[Tuple[str, int]]:
    13                                             # Lee los datos JSON y los carga en un DataFrame de Pandas.
    14   1406.0 MiB   1238.1 MiB           1       json_df = pd.read_json(file_path, lines=True)
    15                                         
    16                                             # Se filtran todas las columnas menos la que contiene la información deseada a la vez que se eliminan los valores nulos de la misma.
    17   1319.8 MiB    -86.2 MiB           1       json_df = json_df['mentionedUsers'].dropna()
    18                                         
    19                                             # Se realiza un flatten a la lista de jsons contenida en la columna mentionedUsers y se extraen de la misma solo los username.
    20   1321.1 MiB      0.2 MiB      206807       json_df = json_df.explode('mentionedUsers').apply(lambda x: x['username']).to_frame() 
    21                                         
    22                                             # Agrupando por nombre de usuario, se realiza un count para saber la cantidad de menciones que cada usuario tuvo en el set de datos.
    23   1323.1 MiB      2.0 MiB           1       grouped_df = json_df.groupby(['mentionedUsers']).size().reset_index(name='count')
    24                                         
    25                                             # Se ordenan los valores en orden descendente y se toman sólo los primeros diez registros, que corresponden a los usuarios más citados.
    26   1323.4 MiB      0.4 MiB           1       grouped_df = grouped_df.sort_values(by='count', ascending=False).head(10)
    27                                         
    28                                             # Se ordenan los resultados según el formato requerido.
    29   1323.4 MiB      0.0 MiB           1       return list(zip(grouped_df['mentionedUsers'], grouped_df['count']))

In [None]:
r_q3_memory = q3_memory_dev(f'{base_path}/{file_path}')
print(r_q3_memory)

Ejecucion de la funcion q3_memory. Version de desarrollo para aumentar la performance.

Resultados con un chuncksize = 100:

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    33    120.2 MiB    120.2 MiB           1   @profile
    34                                         def q3_memory_dev(file_path: str) -> List[Tuple[str, int]]:
    35                                         
    36    120.2 MiB      0.0 MiB           1       df_concat = pd.DataFrame(columns=['mentionedUsers'])
    37    132.4 MiB   -372.9 MiB        1176       for json_df in pd.read_json(file_path, lines=True, chunksize=100):
    38                                         
    39                                                 #Se filtran todas las columnas menos la que contiene la información deseada a la vez que se eliminan los valores nulos de la misma.
    40    132.3 MiB   -427.0 MiB        1175           json_df = json_df['mentionedUsers'].dropna()
    41                                         
    42                                                 # Se realiza un flatten a la lista de jsons contenida en la columna mentionedUsers y se extraen de la misma solo los username.
    43    132.3 MiB -78074.3 MiB      207981           json_df = json_df.explode('mentionedUsers').apply(lambda x: x['username']).to_frame() 
    44                                         
    45    132.4 MiB   -452.1 MiB        1175           df_concat = pd.concat([df_concat, json_df])
    46                                         
    47                                             # Agrupando por nombre de usuario, se realiza un count para saber la cantidad de menciones que cada usuario tuvo en el set de datos.
    48    133.2 MiB      0.8 MiB           1       grouped_df = df_concat.groupby(['mentionedUsers']).size().reset_index(name='count')
    49                                         
    50                                             # Se ordenan los valores en orden descendente y se toman sólo los primeros diez registros, que corresponden a los usuarios más citados.
    51    133.2 MiB      0.1 MiB           1       grouped_df = grouped_df.sort_values(by='count', ascending=False).head(10)
    52                                         
    53                                             # Se ordenan los resultados según el formato requerido.
    54    133.2 MiB      0.0 MiB           1       return list(zip(grouped_df['mentionedUsers'], grouped_df['count']))



Resultados con un chuncksize = 1000:


Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    33    139.0 MiB    139.0 MiB           1   @profile
    34                                         def q3_memory_dev(file_path: str) -> List[Tuple[str, int]]:
    35                                         
    36    139.0 MiB      0.0 MiB           1       df_concat = pd.DataFrame(columns=['mentionedUsers'])
    37    150.5 MiB   -148.0 MiB         119       for json_df in pd.read_json(file_path, lines=True, chunksize=1000):
    38                                         
    39                                                 #Se filtran todas las columnas menos la que contiene la información deseada a la vez que se eliminan los valores nulos de la misma.
    40    150.5 MiB   -242.1 MiB         118           json_df = json_df['mentionedUsers'].dropna()
    41                                         
    42                                                 # Se realiza un flatten a la lista de jsons contenida en la columna mentionedUsers y se extraen de la misma solo los username.
    43    150.5 MiB -383061.2 MiB      206924           json_df = json_df.explode('mentionedUsers').apply(lambda x: x['username']).to_frame() 
    44                                         
    45    150.5 MiB   -195.2 MiB         118           df_concat = pd.concat([df_concat, json_df])
    46                                         
    47                                             # Agrupando por nombre de usuario, se realiza un count para saber la cantidad de menciones que cada usuario tuvo en el set de datos.
    48    149.0 MiB     -1.5 MiB           1       grouped_df = df_concat.groupby(['mentionedUsers']).size().reset_index(name='count')
    49                                         
    50                                             # Se ordenan los valores en orden descendente y se toman sólo los primeros diez registros, que corresponden a los usuarios más citados.
    51    149.0 MiB      0.0 MiB           1       grouped_df = grouped_df.sort_values(by='count', ascending=False).head(10)
    52                                         
    53                                             # Se ordenan los resultados según el formato requerido.
    54    149.0 MiB      0.0 MiB           1       return list(zip(grouped_df['mentionedUsers'], grouped_df['count']))

Con la nueva version del codigo, se logró un importante decremento en el uso de memoria para las tres funciones. Este decremento podría aumentar testeando con diferentes chuncksize evaluando para cada caso cual es el chuck adecuado para cada funcion. Por otra parte, es posible que con algunos ajustes más sobre la estrategia abordada para filtrar y devolver los datos requeridos pueda lograrse un decremento mayor, aunque ya probablemente no tan significativo como lo aquí representado.
En cuanto a las funciones que utilizan pyspark me siento satisfecho con la performance alcanzada, procesando 100.000+ registros en un promedio que va entre los 3-5 segundos.