En este archivo puedes escribir lo que estimes conveniente. Te recomendamos detallar tu solución y todas las suposiciones que estás considerando. Aquí puedes ejecutar las funciones que definiste en los otros archivos de la carpeta src, medir el tiempo, memoria, etc.

# Pregunta 1

## 1. Usando Pandas

### 1.1. Desarrollo

In [None]:
# Imports
import pandas as pd

# Lectura archivo JSON
file_path = "farmers-protest-tweets-2021-2-4.json"
df = pd.read_json(file_path, lines=True)

In [None]:
# Se revisa el tipo de datos de algunas columnas
df.info()

In [None]:
# Se revisa formato de columna "date"
print(df["date"])

In [None]:
# Se crea funcion para aplanar columnas
def aplanar_columnas(df, columnas):
    for col in columnas:
        col_df = pd.json_normalize(df[col])
        # Se renombra columnas
        col_df.columns = [f"{col}_{subcol}" for subcol in col_df.columns] 
        # Concatenacion con DF original
        df = pd.concat([df.drop(col, axis=1), col_df], axis=1)
    return df

columnas = ['user']
df = aplanar_columnas(df, columnas)

In [None]:
# Se verifica que se haya creado las columnas correctamente
df.info()

In [None]:
# Se revisa columna a utilizar para analisis
print(df['user_username'])

In [None]:
# Transformación de formato fecha
df['date'] = df['date'].dt.date

In [None]:
# Se obtiene el top 10 de días con más registros
top_days= df.groupby('date').size().sort_values(ascending=False).head(10)

In [None]:
# Se crea lista para resultado final
top_usernames_per_day = []

# Itera sobre cada uno de los top_days
for day in top_days.index:
    df_day = df[df['date'] == day]
    # Cantidad de username repetidos para cada dia
    username_counts = df_day['user_username'].value_counts()
    # Encuentra el máximo valor
    max_count = username_counts.max()
    most_frequent_username = username_counts[username_counts == max_count].index[0]    
    # Guarda el dia y el username en una lista
    top_usernames_per_day.append((day, most_frequent_username))

print(top_usernames_per_day)

### 1.2. Solución

In [1]:
# Imports
import pandas as pd
from typing import List, Tuple
from datetime import datetime

file_path = "farmers-protest-tweets-2021-2-4.json"

def q1_pandas(file_path: str) -> List[Tuple[datetime.date, str]]:
    df = pd.read_json(file_path, lines=True)
    # Se crea funcion para aplanar columnas
    def aplanar_columnas(df, columnas):
        for col in columnas:
            col_df = pd.json_normalize(df[col])
            # Se renombra columnas
            col_df.columns = [f"{col}_{subcol}" for subcol in col_df.columns] 
            # Concatenacion con DF original
            df = pd.concat([df.drop(col, axis=1), col_df], axis=1)
        return df

    columnas = ['user']
    df = aplanar_columnas(df, columnas)
    # Transformación de formato fecha
    df['date'] = df['date'].dt.date
    # Se obtiene el top 10 de días con más registros
    top_days= df.groupby('date').size().sort_values(ascending=False).head(10)
    # Se crea lista para resultado final
    top_usernames_per_day = []

    # Itera sobre cada uno de los top_days
    for day in top_days.index:
        df_day = df[df['date'] == day]
        # Cantidad de username repetidos para cada dia
        username_counts = df_day['user_username'].value_counts()
        # Encuentra el máximo valor
        max_count = username_counts.max()
        most_frequent_username = username_counts[username_counts == max_count].index[0]    
        # Guarda el dia y el username en una lista
        top_usernames_per_day.append((day, most_frequent_username))

    # Imprime la lista de resultados
    return(top_usernames_per_day)    

# q1_pandas(file_path)

## 2. Usando PySpark

### 2.1. Desarrollo

In [None]:
# Imports
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, row_number
from pyspark.sql.window import Window

In [None]:
# Iniciar pyspark
spark = SparkSession.builder \
    .appName("MyApp") \
    .getOrCreate()

file_path = "farmers-protest-tweets-2021-2-4.json"
# Leer el archivo JSON
df = spark.read.json(file_path)


In [None]:
# Se revisa columna user
df.select("user").show(truncate=False)

In [None]:
# Se aplana columna user.username
df = df.withColumn("user_username", col("user.username"))

In [None]:
# Se revisa columna date
df.select("date").show(truncate=False)

In [None]:
# Se transforma para ver solo el dia
df = df.withColumn("date", to_date(col("date")))

In [None]:
df.printSchema()

In [None]:
from pyspark.sql.functions import rank, count, desc, max

# Se agrupa por fecha y se cuentan los registros para cada dia
date_counts = df.groupBy("date").count()

# Se buscan los top 10 dias con mas registros
top_days = date_counts.orderBy(desc("count")).limit(10)

# Se une el dataFrame original con los top días para filtrar solo esos días
df_top_days = df.join(top_days, "date")

# Cuenta la cantidad de cada username por día
username_counts = df_top_days.groupBy("date", "user_username").count()

# Busca el username con mayor cantidad de registros para cada dia
max_username_count = username_counts.groupBy("date").agg(max("count").alias("max_count"))

# Genera un dataframe con la informacion requerida
top_dates_usernames = username_counts.join(
    max_username_count,
    (username_counts["date"] == max_username_count["date"]) & 
    (username_counts["count"] == max_username_count["max_count"]),
    "right"
)

# Join con toda la informacion que ordenar de mayor a menor cantidad de registros 
top_usernames_per_day= top_days.join(top_dates_usernames, "date", "left")

# Recolecta los resultados como una lista de tuplas
top_usernames_per_day_list = [(row['date'], row['user_username']) for row in top_usernames_per_day.collect()]

print(top_usernames_per_day_list)

### 2.2. Solución

In [9]:
# Imports
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, rank, count, desc, max
from pyspark.sql.window import Window

file_path = "farmers-protest-tweets-2021-2-4.json"

def q1_pyspark(file_path: str) -> List[Tuple[datetime.date, str]]:
    # Iniciar pyspark
    spark = SparkSession.builder \
        .appName("MyApp") \
        .getOrCreate()
    # Leer el archivo JSON
    df = spark.read.json(file_path)
    # Se aplana columna user.username
    df = df.withColumn("user_username", col("user.username"))

    # Se transforma para ver solo el dia
    df = df.withColumn("date", to_date(col("date")))

    # Se agrupa por fecha y se cuentan los registros para cada dia
    date_counts = df.groupBy("date").count()

    # Se buscan los top 10 dias con mas registros
    top_days = date_counts.orderBy(desc("count")).limit(10)

    # Se une el dataFrame original con los top días para filtrar solo esos días
    df_top_days = df.join(top_days, "date")

    # Cuenta la cantidad de cada username por día
    username_counts = df_top_days.groupBy("date", "user_username").count()

    # Busca el username con mayor cantidad de registros para cada dia
    max_username_count = username_counts.groupBy("date").agg(max("count").alias("max_count"))

    # Genera un dataframe con la informacion requerida
    top_dates_usernames = username_counts.join(
        max_username_count,
        (username_counts["date"] == max_username_count["date"]) & 
        (username_counts["count"] == max_username_count["max_count"]),
        "right"
    )

    # Join con toda la informacion que ordenar de mayor a menor cantidad de registros 
    top_usernames_per_day= top_days.join(top_dates_usernames, "date", "left")

    # Recolecta los resultados como una lista de tuplas
    top_usernames_per_day_list = [(row['date'], row['user_username']) for row in top_usernames_per_day.collect()]

    return top_usernames_per_day_list

# q1_pyspark(file_path)

## 3. Tiempo de ejecución

### 3.1. Pandas

In [3]:
import time

start_time = time.time()

q1_pandas(file_path)

end_time = time.time()
print(f"Tiempo de ejecución: {end_time - start_time} segundos")

Tiempo de ejecución: 9.113893270492554 segundos


### 3.2. PySpark

In [4]:
import time

start_time = time.time()

q1_pyspark(file_path)

end_time = time.time()
print(f"Tiempo de ejecución: {end_time - start_time} segundos")

Tiempo de ejecución: 11.244032382965088 segundos


Se observa que usando Pandas se obtiene un menor tiempo de ejecución (Pandas: 9.1 segundos, Pyspark: 11.24 segundos). Probablemente el tiempo de ejecución usando pandas podria disminuir mas si se optimiza el codigo

## 4. Uso de memoria

In [5]:
!pip install memory_profiler



In [6]:
%load_ext memory_profiler

### 4.1. Pandas

In [7]:
%memit -r 1 q1_pandas(file_path)

peak memory: 3009.22 MiB, increment: 2845.61 MiB


### 4.2. PySpark

In [8]:
%memit -r 1 q1_pyspark(file_path)

peak memory: 172.51 MiB, increment: 0.13 MiB


Memoria Pandas: 3009.22 MiB, Memoria Pyspark: 172.51 MiB

## 5. Conclusion

Se obtuvo que usando la libreria Pandas el tiempo de ejecución del código fue menor que usando la libreria PySpark, mientras que la memoria en uso fue menor utilizando PySpark.


La librería Pandas almacena la información en la memoria ram de la maquina, mientras que la libreria PySpark puede utilizar tanto la memoria ram como el almacenamiento en disco para procesar datos, por lo que al usar Pandas hay mas memoria en uso.

Con respecto al tiempo de ejecución, pandas es mas eficiente que pyspark al trabajar con volumenes de datos que caben en la memoria de una maquina, lo cual se ve reflejado en una disminucion en el tiempo de ejecución.

Para las preguntas 2 y 3, se considerará procesar los datos con Pandas para optimizar el tiempo de ejecución y procesar los datos con PySpark para optimizar la memoria en uso

# Pregunta 2

Esta pregunta puede resolverse usando Pandas para optimizar el tiempo de ejecucion y usando Pyspark para optimizar el uso de memoria

Se debe analizar la columna content para identificar cada emoji. Estos emojis pueden almacenarse en una lista o identificarse mediante expresiones regulares

Usando una lista se tendria que iterar sobre cada fila y contar la cantidad de emojis para cada registro, almacenandolos en una variable

Finalmente, al tener la cantidad para cada emoji, se puede unir con la lista definida inicialmente para obtener el resultado requerido

# Pregunta 3

## q3_memory

Para optimización de memoria se utilizará PySpark

### Desarrollo

In [3]:
# Imports
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, rank, count, desc, max
from pyspark.sql.window import Window

file_path = "farmers-protest-tweets-2021-2-4.json"

In [4]:
spark = SparkSession.builder \
    .appName("MyApp") \
    .getOrCreate()

In [5]:
df = spark.read.json(file_path)

In [4]:
# Se busca columnas a utilizar
df.printSchema()

root
 |-- content: string (nullable = true)
 |-- conversationId: long (nullable = true)
 |-- date: string (nullable = true)
 |-- id: long (nullable = true)
 |-- lang: string (nullable = true)
 |-- likeCount: long (nullable = true)
 |-- media: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- duration: double (nullable = true)
 |    |    |-- fullUrl: string (nullable = true)
 |    |    |-- previewUrl: string (nullable = true)
 |    |    |-- thumbnailUrl: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- variants: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- bitrate: long (nullable = true)
 |    |    |    |    |-- contentType: string (nullable = true)
 |    |    |    |    |-- url: string (nullable = true)
 |-- mentionedUsers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created: string (nullable = true)
 |    |   

In [16]:
# Se analiza formato para columna mentionedUsers.username
df.select("mentionedUsers.username").show(10,truncate=False)

+---------------------------+
|username                   |
+---------------------------+
|[narendramodi, DelhiPolice]|
|[Kisanektamorcha]          |
|null                       |
|[ReallySwara, rohini_sgh]  |
|null                       |
|null                       |
|null                       |
|null                       |
|[mandeeppunia1]            |
|null                       |
+---------------------------+
only showing top 10 rows



In [10]:
# Se aplana columna
df = df.withColumn("mentionedUsers_username", col("mentionedUsers.username"))

In [13]:
# Se filtran valores no nulos
df_filtered = df.filter(df.mentionedUsers_username.isNotNull())

In [17]:
df_filtered.select("mentionedUsers.username").show(10,truncate=False)

+---------------------------+
|username                   |
+---------------------------+
|[narendramodi, DelhiPolice]|
|[Kisanektamorcha]          |
|[ReallySwara, rohini_sgh]  |
|[mandeeppunia1]            |
|[mandeeppunia1]            |
|[akshaykumar]              |
|[taapsee]                  |
|[PetroleumMin, PMOIndia]   |
|[ArmaanMalik22]            |
|[akshaykumar]              |
+---------------------------+
only showing top 10 rows



In [31]:
from pyspark.sql.functions import explode, split, col, regexp_replace, translate, expr

In [None]:

df_cleaned = df_filtered.withColumn("mentionedUsers_username", regexp_replace(col("mentionedUsers_username"), "[", ""))\
                        .withColumn("mentionedUsers_username", regexp_replace(col("mentionedUsers_username"), "]", ""))\


In [None]:
df_exploded = df_filtered.withColumn("mentionedUser", explode(split(col("mentionedUsers_username"), ",\s*")))

In [None]:
df_count = df_exploded.groupBy("mentionedUser").count()
top_users = df_count.orderBy(col("count").desc()).limit(10)

In [None]:
top_users_list = [(row['mentionedUser'], row['count']) for row in top_users.collect()]

### Resultado

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split, col, regexp_replace,

file_path = "farmers-protest-tweets-2021-2-4.json"

def q3_memory(file_path: str) -> List[Tuple[str, int]]:
    spark = SparkSession.builder \
        .appName("MyApp") \
        .getOrCreate()
    df = spark.read.json(file_path)
    df = df.withColumn("mentionedUsers_username", col("mentionedUsers.username"))
    df_filtered = df.filter(df.mentionedUsers_username.isNotNull())
    df_cleaned = df_filtered.withColumn("mentionedUsers_username", regexp_replace(col("mentionedUsers_username"), "[", ""))\
                            .withColumn("mentionedUsers_username", regexp_replace(col("mentionedUsers_username"), "]", ""))
    df_exploded = df_cleaned.withColumn("mentionedUser", explode(split(col("mentionedUsers_username"), ",\s*")))
    df_count = df_exploded.groupBy("mentionedUser").count()
    top_users = df_count.orderBy(col("count").desc()).limit(10)
    top_users_list = [(row['mentionedUser'], row['count']) for row in top_users.collect()]
    return top_users_list

## q3_time

Para optimización del tiempo de ejecucion se utilizará Pandas

In [50]:
# Imports
import pandas as pd
import json
# Lectura archivo JSON
file_path = "farmers-protest-tweets-2021-2-4.json"
df = pd.read_json(file_path, lines=True)