In [None]:
#! pip install memory-profiler
#! pip install emoji
#! pip install py-spy
#! pip install pyspark
#! pip install apache-beam[gcp]
! pip install -r ../requirements.txt

# Data Engineer Challenge
## Recursos
* Debido a que solo cuento con el computador de la empresa, no puedo instalar elementos adicionales sin levantar un ticket. Por esta razón, utilicé Google Colab para resolver el desafío.
* Actualmente no tengo acceso a una nube con créditos para resolver el desafío y verificar los resultados. Sin embargo, mencionaré las herramientas de la nube que habría utilizado en cada caso.


In [None]:
import os
file_path = 'farmers-protest-tweets-2021-2-4.json'
os.environ['FILE_PATH'] = file_path

## Q1
### Aclaraciones
* Este caso lo abordé como un problema de BI donde no se requiere un tiempo de respuesta en tiempo real. Por lo tanto, utilicé Spark para optimizar tanto el uso de memoria como el tiempo de procesamiento.

* En este ejercicio donde tuve recursos limitados se comprueba que las soluciones optimizando tiempo se aprovecharían mas en ambientes distribuidos, y que la optimización de memoria funciona mejor en ambientes con recursos limitados, tal como DataProc

* No modifique el formato de la funcion que se me indicó, pero me hubiera gustado agregar un parametro para pasar la session de spark como parametro y que esta se hiciera fuera de la funcion.

* En un servicio en la nube hubiera cargado los datos a bucket de cloud storage, luego ingestado a un tabla de BQ, aplicar filtros de lipienza y calidad del dato segun corresponda y finalmente relizado una consulta que me diera el resultado, esta se podria pasar a DBT para realizar el respectivo versionado y ejecucion periodica.

In [None]:
import q1_time

#### q1_time

* Se puso en cache el df para evitar ser recalculado, aunque la escritura en memoria afecta gravente el tiempo de ejecucion
* Se hace un una reduccion por llaves para minimizar shuffles e intentar con esto reducir el tiempo, sin embargo el proceso es costo y requiere mas tiempo del que se ahorra al momento de ejecutar el filtro isin
* se hizo broadcast a la columna fecha del df top_dates para minizar shuffles

*   La logica utilizada fue la misma para los dos casos:
 1. Lectura del archivo como Json
 2. Transformar la columna date a tipo timestamp y luego a tipo date
 3. Contar tweets por fecha y orfanizar de forma descendente
 4. Tomar los 10 primeros
 5. Filtrar solo los dias con mas tweets
 6. Contar agrupando por dia y nombre de usuario
 7. Agrega una columna con la funcion de ventana particionada por fecha y ordena por el conteo previo
 8. seleccionar columnas necesarias
 9. Capturar y retornar cada linea como una tupla

 ##### Explicación codigo

``` py
    # Iniciar la sesion de Spark
    spark = SparkSession.builder.appName("TweetAnalysis").getOrCreate()

    # Leer el archivo como Json
    dfRaw = spark.read.json(file_path)

    # Lectura y preprocesamiento del dataframe
    df = dfRaw.select(

      # Seleccionar la columna del nombre de usuario dentro de usuario
      col("user.username").alias("username"),

      # Transforma la columna 'date', a timestamp y luego a date
      to_date(to_timestamp(col("date"), "yyyy-MM-dd'T'HH:mm:ssXXX")).alias("date")
      )
    
    # Cachear df para evitar recalcularlo
    df.cache()
    
    # Top 10 días con más tweets (usando reduceByKey para minimizar shuffles)
    top_dates = (

      # Convertir a RDD
      df.rdd

        # Mapear cada fila a una tupla
        .map(lambda row: (row.date, 1))

        # Agrupar por fecha y sumar los conteos de tweets para cada fecha
        .reduceByKey(lambda a, b: a + b)

        # Ordenar las fechas por conteo de tweets en orden descendente
        .sortBy(lambda item: item[1], ascending=False)

        #Tomar los primeros 10
        .take(10)
        )
    
    # Broadcast de top_dates para evitar shuffles en el filtro
    top_dates_broadcast = spark.sparkContext.broadcast([date for date, _ in top_dates])

    # Filtrar por top días y calcular el usuario con más tweets
    day_user = (

      #Filtra las fechas con respecto a las fechas que se tomaron antes
      df.filter(col("date").isin(top_dates_broadcast.value))

        #Agrupa por fecha y usuario
        .groupBy("date", "username")

        #Hace conteo
        .count()

        #Agrega una columna, con la funcion de ventana particionada por fecha y ordenada por el conteo previo
        .withColumn("rn", row_number().over(Window.partitionBy("date").orderBy(desc("count"))))

        #Filtra el registro con mas tweets
        .filter(col("rn") == 1)

        #Elimina la columna antes creada
        .drop("rn")

        #Selecciona solo fecha y usuario
        .select("date", "username")
        )

    # Unpersist de df para liberar la memoria
    df.unpersist()
    
    # Se capura el resultado y se transforma en una lista de tuplas.
    [tuple(row) for row in day_user.collect()]
```



In [None]:
q1_time.q1_time(file_path)

Filename: /content/q1_time.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     5    309.2 MiB    309.2 MiB           1   @memory_profiler.profile
     6                                         def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
     7                                           """
     8                                           Obtiene los usuarios con más tweets por día para los 10 días con mayor actividad.
     9                                           
    10                                           Esta función lee un archivo JSON de tweets, identifica los 10 días con 
    11                                           mayor cantidad de tweets y determina el usuario con más tweets para cada 
    12                                           uno de esos días. La función está optimizada para el tiempo de procesamiento 
    13                                           utilizando Spark para el análisis distribuido de datos.
    14        

[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 14), 'rebelpacifist'),
 (datetime.date(2021, 2, 15), 'jot__b'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 17), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 19), 'Preetm91'),
 (datetime.date(2021, 2, 20), 'MangalJ23056160'),
 (datetime.date(2021, 2, 23), 'Surrypuria')]

In [None]:
!py-spy record -o q1_time.svg -- python -c "import os; import q1_time; q1_time.q1_time(os.environ.get('FILE_PATH'))"

[32m[1mpy-spy[0m[2m>[0m Sampling process 100 times a second. Press Control-C to exit.

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/13 06:42:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[32m[1mpy-spy[0m[2m>[0m 1.03s behind in sampling, results may be inaccurate. Try reducing the sampling rate
[32m[1mpy-spy[0m[2m>[0m 1.34s behind in sampling, results may be inaccurate. Try reducing the sampling rate
[32m[1mpy-spy[0m[2m>[0m 1.72s behind in sampling, results may be inaccurate. Try reducing the sampling rate
[32m[1mpy-spy[0m[2m>[0m 1.33s behind in sampling, results may be inaccurate. Try reducing the sampling rate
[32m[1mpy-spy[0m[2m>[0m 1.01s behind in sampling, results may be inaccurate. Try reducing the sampling rate
[32m[1mpy-spy[0m[2m>[0m 1.16s behind in sampling, results may be 

In [None]:
import q1_memory

### q1_memory
* Se intenta minimizar las variables utilizadas
* Se intenta ser lo mas concreto en las acciones para evitar reprocesos
*   La logica utilizada fue la misma para los dos casos:
 1. Lectura del archivo como Json
 2. Transformar la columna date a tipo timestamp y luego a tipo date
 3. Contar tweets por fecha y orfanizar de forma descendente
 4. Tomar los 10 primeros
 5. Filtrar solo los dias con mas tweets
 6. Contar agrupando por dia y nombre de usuario
 7. Agrega una columna con la funcion de ventana particionada por fecha y ordena por el conteo previo
 8. seleccionar columnas necesarias
 9. Capturar y retornar cada linea como una tupla
 #### Explicación código



``` py
    # Iniciar sesion de spark
    spark = SparkSession.builder.appName("TweetAnalysis").getOrCreate()

    #Leer archivo json
    dfRaw = spark.read.json(file_path)

    # Se extrae la fecha y el usuario, convirtiendo la fecha al formato correcto.
    df = dfRaw.select(

        # Seleccionar la columna del nombre de usuario dentro de usuario  
        col("user.username").alias("username"),

        # Transformar la columna 'date', a timestamp y luego a date
        to_date(to_timestamp(col("date"), "yyyy-MM-dd'T'HH:mm:ssXXX")).alias("date"),
      )

    # Se obtienen los top N días con más tweets.
    top_dates = (

        # agrupar por fecha
        df.groupBy("date")
        
        # Contar agrupando por fecha
        .count()

        #Ordenar por el conteo descendente
        .orderBy(desc("count"))

        #Tomar solo la fecha
        .select("date")

        #tomar los 10 primeros resultados
        .limit(10)

        # Capturarlos y colectarlos
        .rdd.flatMap(lambda x: x)
        .collect()
      )

    # Se define la ventana para obtener el usuario con más tweets por día.
    window_spec = Window.partitionBy("date").orderBy(desc("count"))

    # Se filtra por los top N días, se agrupa por fecha y usuario, se cuenta
    # y se selecciona el usuario con más tweets (rn = 1).
    day_user = (

      # Filtrar solo las fechas con mas tweets, calculado previamente
      df.filter(col("date").isin(top_dates))

      #Agrupar por fecha y usuario
      .groupBy("date", "username")

      # Hacer conteo agrupando por fecha y usuario
      .count()

      # Aplicar la funcion de ventana para organizar de mayor a menor los usuarios que mas con mas tweets ese dia
      .select(col("date"), col("username"), row_number().over(window_spec).alias("rn"))

      # Filtrar el usuario con mas tweets
      .filter(col("rn") == 1)

      # Elimiar la columna utilizada
      .drop("rn")
    )

    # Se capura el resultado y se transforma en una lista de tuplas.
    return [tuple(row) for row in day_user.collect()]
```



In [None]:
q1_memory.q1_memory(file_path)

Filename: /content/q1_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     5    309.2 MiB    309.2 MiB           1   @memory_profiler.profile
     6                                         def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
     7                                           """
     8                                           Obtiene los usuarios con más tweets por día para los 10 días con mayor actividad.
     9                                           
    10                                           Esta función lee un archivo JSON de tweets, identifica los 10 días con 
    11                                           mayor cantidad de tweets y determina el usuario con más tweets para cada 
    12                                           uno de esos días. La función está optimizada para la memoria usada en el  
    13                                           procesamiento utilizando Spark para el análisis distribuido de datos.

[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 14), 'rebelpacifist'),
 (datetime.date(2021, 2, 15), 'jot__b'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 17), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 19), 'Preetm91'),
 (datetime.date(2021, 2, 20), 'MangalJ23056160'),
 (datetime.date(2021, 2, 23), 'Surrypuria')]

In [None]:
!py-spy record -o q1_memory.svg -- python -c "import os;  q1_memory; q1_memory.q1_memory(os.environ.get('FILE_PATH'))"

[32m[1mpy-spy[0m[2m>[0m Sampling process 100 times a second. Press Control-C to exit.

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/13 06:50:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[32m[1mpy-spy[0m[2m>[0m 1.00s behind in sampling, results may be inaccurate. Try reducing the sampling rate
[32m[1mpy-spy[0m[2m>[0m 1.38s behind in sampling, results may be inaccurate. Try reducing the sampling rate
[32m[1mpy-spy[0m[2m>[0m 1.87s behind in sampling, results may be inaccurate. Try reducing the sampling rate
[32m[1mpy-spy[0m[2m>[0m 1.85s behind in sampling, results may be inaccurate. Try reducing the sampling rate
[32m[1mpy-spy[0m[2m>[0m 2.11s behind in sampling, results may be inaccurate. Try reducing the sampling rate
[32m[1mpy-spy[0m[2m>[0m 1.65s behind in sampling, results may be 

## Resultados q1

* la creacion de la session de spak agrega un tiempo considerable a cada funcion
* la lectura del json en cada caso es la operacion mas costosa
* La version **optimizada para tiempo es mas lenta** debido a la escritura del cache y la funcion de reduceByKey
* Las dos versiones tienen un consumo de memoria parecido
* Se deberia analizar los recursos utilizados dentro del framework de pyspark para tener mas detalle sobre las transformaciones mas costosas
* Se deberia ver la duracion de las funciones con un dataset muchisimo mas grande para ver la difencia que hace la aplicacion del cache en el df
* los resultados de *py-spy* estan en la carpeta *resultados_py-spy*



## q2

### Consideraciones
* En un entorno de BigData pueden llegar grandes cantidades de datos en Streaming.
* Se utilizo Beam y PySpark para comparar los recursos utilizados, sin embargo al momento de agregar el decorador @memory_profiler.profile a la solucion con Beam el monitoreo de recursos impedia la correcta ejecucion del pipeline por lo que no se agrego
* Los dos soportan grandes cantidades de datos y son escalables
* Beam puede ser facilmente implementado en Data Flow
* PySpark puede ser facilmente implementado en DataProc


In [None]:
import q2_time

#### q2_time

* Se crea un pipeline en beam que escriba el resultado en un archivo de texto que luego sera leido e interpretado como tupla
*   La logica utilizada fue:
 1. capturar linea por linea del contenido del tweet
 2. comprar cada palabra para saber si es emoji
 3. Agrupar por elemento para contarlos en un contador
 4. Tomar los 10 primeros
 5. Escribir el resultado en archivo plano (no se puede transformar directo a tupla)
 6. Leer el archivo y dar la respuesta

 #### Explicación código



``` py

      # Crear el pipeline para no tener que agregar variable para esperar el status de la ejecucion
      with beam.Pipeline() as pipeline:
          
          # 1. Leer los tweets desde el archivo JSON
          tweets = (
              pipeline
              # Leer archivo txt
              | "LeerTweets" >> beam.io.ReadFromText(file_path)
              # Leer cada linea como Json
              | "ParsearJSON" >> beam.Map(lambda line: json.loads(line))
          )

          # 2. Extraer los emojis de cada tweet
          emojis = (
              tweets

              # Recorrer el contenido del tweet palabra por palabra, si una de esas palabras es un emoji entonces devolverla
              | "ExtraerEmojis" >> beam.FlatMap(lambda tweet: [char for char in tweet.get('content', '') if emoji.is_emoji(char)])
          )

          # 3. Contar la frecuencia de cada emoji
          emoji_counts = (
              emojis

              # Contar cuantas veces aparece cada emoji, un conteo por cada elemento
              | "ContarEmojis" >> beam.combiners.Count.PerElement()
              | "FormatearSalida" >> beam.Map(lambda element: element)
          )
          
          # 4. Obtener los 10 emojis más frecuentes
          top_10_emojis = (
              emoji_counts

              # Devuelve los 10 primeros elementos con mas conteos, el conteo esta en la segunda "columna" por eso se requiere agregar lambda para que sepa la funcion para buscar
              | "OrdenarEmojis" >> beam.transforms.combiners.Top.Of(10, key=lambda element: element[1])
          )

          # 5. Escribir los resultados a un archivo de texto
          top_10_emojis | 'EscribirResultados' >> beam.io.WriteToText(f'{folder_path}output.txt', num_shards=1, shard_name_template="")

      # 6. Leer el archivo de texto y evaluarlo literalmente    
      with open(f'{folder_path}output.txt', 'r') as f:
            text_field_content = f.read()
      list_of_tuples = ast.literal_eval(text_field_content)

      # 7. Eliminar el archivo
      shutil.rmtree(folder_path)
      return list_of_tuples

```



In [None]:
q2_time.q2_time(file_path)

In [None]:
!py-spy record -o q2_time.svg -- python -c "import os;  q2_time; q2_time.q2_time(os.environ.get('FILE_PATH'))"

[32m[1mpy-spy[0m[2m>[0m Sampling process 100 times a second. Press Control-C to exit.


[32m[1mpy-spy[0m[2m>[0m Stopped sampling because process exited
[32m[1mpy-spy[0m[2m>[0m Wrote flamegraph data to 'q2_time.svg'. Samples: 3995 Errors: 1


In [None]:
import q2_memory

#### q2_memory

* Se implementan transforamciones usando pyspark con una logica muy parecida a la implementada en beam.
* Se creo una *udf* para hacer la comparacion palabra por palabra y saber si es o no un emoji
*   La logica utilizada fue:
 1. Crear la sesion de spark
 2. Leer el archivo Json
 3. Aplicar a la columna content la UDF que compara palabra por palabra.
 4. Aplicar un explode a lo anterior para obtener una unica columna con todos los emojis
 5. Agrupar por emoji y contarlos
 6. Ordernalos en forma descendente
 7. Tomar los 10 primeros
 8. Colectarlos
 9. converitr fila por fila en una tupla y luego en lista

 #### Explicación código


``` py
      # Crear sesion de spark
      spark = SparkSession.builder.appName("TweetAnalysis").getOrCreate()

      #Leer archivo Json
      dfRaw = spark.read.json(file_path)

      # Define una UDF para extraer los emojis del texto

      # Se crea la funcion que validara los emojis
      def extract_emojis(text):
        
        # Si el texto contiene algo entonces:
          if text is not None:

              # revisa palabra por palabra del texto y valida si es emoji, si es emoji devuelve la palabra
              return [char for char in text if emoji.is_emoji(char)]
          return []

      # Se crea la UDF con la funcion anterior
      extract_emojis_udf = udf(extract_emojis, ArrayType(StringType()))

      # Aplica UDF para extraer emojis y explode para transformarlos en columna
      emoji_counts = (
          dfRaw

          # Se aplica la UDF a la columna content, luego explode para que quede una sola columna con los resultados
          .select(explode(extract_emojis_udf(col("content"))).alias("emoji"))
          
          # Se cuenta agrupando por emoji
          .groupBy("emoji")
          .count()
          
          # Se ordena de forma descendente
          .orderBy(col("count").desc())
      )

      # Obtiene los top 10 y lo convierte a una lista de tuplas
      # Se toman las 10 primeras, se colecta y se transforma fila por fila en una tupla
      top_10_emojis = [tuple(row) for row in emoji_counts.limit(10).collect()]
```



In [None]:
q2_memory.q2_memory(file_path)

Filename: /content/q2_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     7    308.5 MiB    308.5 MiB           1   @memory_profiler.profile
     8                                         def q2_memory(file_path):
     9                                             """
    10                                             Procesa un archivo JSON de tweets, extrae emojis y retorna los 10 mas usados.
    11                                         
    12                                             Args:
    13                                                 file_path (str): La ruta al archivo JSON.
    14                                         
    15                                             Returns:
    16                                                 List[Tuple[str, int]]: Una lista de tuplas (emoji, conteo) representando
    17                                                       los 10 emojis mas usados.
    18                                             

[('🙏', 7286),
 ('😂', 3072),
 ('🚜', 2972),
 ('✊', 2411),
 ('🌾', 2363),
 ('🏻', 2080),
 ('❤', 1779),
 ('🤣', 1668),
 ('🏽', 1218),
 ('👇', 1108)]

In [None]:
!py-spy record -o q2_memory.svg -- python -c "import os;  q2_memory; q2_memory.q2_memory(os.environ.get('FILE_PATH'))"

[32m[1mpy-spy[0m[2m>[0m Sampling process 100 times a second. Press Control-C to exit.

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/13 05:33:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[32m[1mpy-spy[0m[2m>[0m 1.09s behind in sampling, results may be inaccurate. Try reducing the sampling rate
[32m[1mpy-spy[0m[2m>[0m 1.66s behind in sampling, results may be inaccurate. Try reducing the sampling rate
[32m[1mpy-spy[0m[2m>[0m 1.75s behind in sampling, results may be inaccurate. Try reducing the sampling rate
[32m[1mpy-spy[0m[2m>[0m 1.40s behind in sampling, results may be inaccurate. Try reducing the sampling rate
[32m[1mpy-spy[0m[2m>[0m 1.01s behind in sampling, results may be inaccurate. Try reducing the sampling rate
[32m[1mpy-spy[0m[2m>[0m 1.10s behind in sampling, results may be 

## Resultados q2
* En este caso Beam fue mucho mas rapido que Spark.
* Spark estuvo mas de la mitad del tiempo trabajando con hilos (_bootstrap_inner (threading.py:1045))
* Se deberia incrementar la carga para ver como se comporta Beam ante una gran cantidad de datos en Batch
* Se necesitan herramientas puntuales para valorar el consumo de memoria en ambos casos
* los resultados de *py-spy* estan en la carpeta *resultados_py-spy*


## q3

### Consideraciones
* Se utilizo Beam y Librerias propias de Python


In [None]:
import q3_time

#### q3_time

* Se implementa librerias propias de Python
* La lectura se hace linea por linea con el fin de mejorar los tiempos de respuesta y el consumo de memoria
* Se prodria trabajar con hilos dentro de python para mejorar aun mas el rendimiento deacuerdo a los recursos disponibles
*   La logica utilizada fue:
 1. Abrir el archivo
 2. Recorrer el archivo linea
 3. Convertir cada linea en Json
 4. Verificar si el tweet tiene usuarios mencionados
 5. Recorrer los usuarios mencionados
 6. Capturar el nombre de usuario
 7. Agregarlo al contador
 8. Mostrar los 10 mas comunes

 #### Explicación código:


``` py
    # Abrir el archivo
    with open(file_path, 'r', encoding='utf-8') as file:
      
      # Recorer linea por linea
      for line in file:

          # Cargar la linea como Json
          tweet = json.loads(line)

          # Verifica si hay mentionedUsers en el tweet
          # Verifica si es una lista, es decir que no sea null
          if 'mentionedUsers' in tweet and isinstance(tweet['mentionedUsers'], list):
            
            # Recorre los usuarios mencionados en cada Tweet
            for user in tweet['mentionedUsers']:

              # Verifica si el usuario tiene un nombre de usuario
              if 'username' in user:
                  
                  #Lo agrega al contador
                  user_mentions[user['username']] += 1
    user_mentions.most_common(10)
```



In [None]:
q3_time.q3_time(file_path)

Filename: /content/q3_time.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     6    307.5 MiB    307.5 MiB           1   @memory_profiler.profile
     7                                         def q3_time(file_path: str) -> List[Tuple[str, int]]:
     8                                             """
     9                                             Encuentra los 10 usuarios más mencionados en un archivo JSON de tweets.
    10                                         
    11                                             Args:
    12                                                 file_path: La ruta al archivo JSON.
    13                                         
    14                                             Returns:
    15                                                 Una lista de tuplas, donde cada tupla contiene el nombre de usuario y el número de menciones.
    16                                             """
    17    307.5 MiB      0.0 MiB           1    

[('narendramodi', 2265),
 ('Kisanektamorcha', 1840),
 ('RakeshTikaitBKU', 1644),
 ('PMOIndia', 1427),
 ('RahulGandhi', 1146),
 ('GretaThunberg', 1048),
 ('RaviSinghKA', 1019),
 ('rihanna', 986),
 ('UNHumanRights', 962),
 ('meenaharris', 926)]

In [None]:
!py-spy record -o q3_time.svg -- python -c "import os;  q3_time; q3_time.q3_time(os.environ.get('FILE_PATH'))"

[32m[1mpy-spy[0m[2m>[0m Sampling process 100 times a second. Press Control-C to exit.


[32m[1mpy-spy[0m[2m>[0m Stopped sampling because process exited
[32m[1mpy-spy[0m[2m>[0m Wrote flamegraph data to 'q3_time.svg'. Samples: 809 Errors: 0


In [None]:
import q3_memory

#### q3_memory

* Se implementa un pipeline de Beam con una logica muy parecida a la que se utilizo para el caso *q3_time*
* Se escribe un archivo de salida para obtener el resultado del pipeline
*   La logica utilizada fue:
 1. Leer el archivo
 2. Cargar cada linea como Json
 3. Filtrar los solo los tweets que tienen usuarios mencionados
 4. Extraer los nombres de usuarios de la lista de usuarios mencionados
 5. Hacer un conteo por elemento
 6. Tomar los 10 primeros elementos con mas conteos (x[1])
 7. Escribir los resultados en un archivo
 8. Leer el archivo e interpretarlo literalmente
 9. Eliminar el archivo creado

 #### Explicación código:



``` py
    # Crear el pipeline
    with beam.Pipeline() as pipeline:

      # Lee y analiza los tweets, extrae las menciones de usuarios, toma las 10 y escribe el resultado
      (
        pipeline
        | 'ReadTweets' >> beam.io.ReadFromText(file_pattern=file_path)
        | 'ParseTweets' >> beam.Map(lambda line: json.loads(line))
        | 'FilterTweets' >> beam.Filter(lambda tweet: tweet.get('mentionedUsers') is not None)
        | 'ExtractMentions' >> beam.FlatMap(lambda tweet: [user['username'] for user in tweet.get('mentionedUsers')])
        | 'CountMentions' >> beam.combiners.Count.PerElement()
        | 'SortMentions' >> beam.combiners.Top.Of(10, key=lambda x: x[1])
        | 'WriteResults' >> beam.io.WriteToText(f'{folder_path}output.txt', num_shards=1, shard_name_template="")
      )

    # Leer el archivo de texto y evaluarlo literalmente
    with open(f'{folder_path}output.txt', 'r') as f:
        text_field_content = f.read()
    list_of_tuples = ast.literal_eval(text_field_content)

    # Eliminar el archivo
    shutil.rmtree(folder_path)

    return list_of_tuples
```



In [None]:
q3_memory.q3_memory(file_path)



Filename: /content/q3_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     8    307.5 MiB    307.5 MiB           1   @memory_profiler.profile
     9                                         def q3_memory(file_path: str) -> List[Tuple[str, int]]:
    10                                             """
    11                                             Encuentra los 10 usuarios más mencionados en un archivo JSON de tweets usando Apache Beam.
    12                                         
    13                                             Args:
    14                                                 file_path: La ruta al archivo JSON.
    15                                         
    16                                             Returns:
    17                                                 Una lista de tuplas, donde cada tupla contiene el nombre de usuario y el número de menciones.
    18                                             """
    19    307.5 MiB      

[('narendramodi', 2265),
 ('Kisanektamorcha', 1840),
 ('RakeshTikaitBKU', 1644),
 ('PMOIndia', 1427),
 ('RahulGandhi', 1146),
 ('GretaThunberg', 1048),
 ('RaviSinghKA', 1019),
 ('rihanna', 986),
 ('UNHumanRights', 962),
 ('meenaharris', 926)]

In [None]:
!py-spy record -o q3_memory.svg -- python -c "import os;  q3_memory; q3_memory.q3_memory(os.environ.get('FILE_PATH'))"

[32m[1mpy-spy[0m[2m>[0m Sampling process 100 times a second. Press Control-C to exit.


[32m[1mpy-spy[0m[2m>[0m Stopped sampling because process exited
[32m[1mpy-spy[0m[2m>[0m Wrote flamegraph data to 'q3_memory.svg'. Samples: 3347 Errors: 3


## Resultados q3
* A *q3_time* le toma 805 samples para completar, siendo en su mayoria el tiempo de apertura del archivo y la lectura por linea, mientras que a *q3_memory* le toma 3200 con una parte dedicada las librerias requeridas para el funcionamiento, siendo 2500 para el procesamiento como tal
* buena parte del tiempo de *q3_memory* en la lectura del archivo Json y otra en la escritura del archivo de texto
* los resultados de *py-spy* estan en la carpeta *resultados_py-spy*


# Mejoras

* Agregar pruebas unitaras.
* Agregar sistemas de monitoreo, sistema de log.
* Documentar de forma estructurada las 3 funciones (entradas, salidas objetivos).
* Automatizar proceso de despligue CI/CD.
* Validar seguridad.