En este archivo puedes escribir lo que estimes conveniente. Te recomendamos detallar tu solución y todas las suposiciones que estás considerando. Aquí puedes ejecutar las funciones que definiste en los otros archivos de la carpeta src, medir el tiempo, memoria, etc.

## LATAM Data Engineer Challenge
---
Hola! gracias por leer!  
A continuación paso a detallar los pasos realizados y todo lo pensado para este desafio.

> #### Tech usadas  
> - PySpark v3.5.1  
> - Python v3.9.13

### Proceso mental
Bueno, después de revisar la estructura del dataset y revisar la documentación, pensé que lo primero que debía hacer era buscar una manera de sacar todos  
los **quotedTweets** ya que finalmente correpsonden a otro tweet, por ende mi idea era también tenerlos en la data para procesar todo. Y quité los duplicados por **id**.  

Siguiendo, y como quería usar pyspark creé una clase que instanciara a **Spark** para dejar ahí toda la configuración y cualquier método que me fuera útil.  
Caba descatar que todo el proceso lo corrí en local, por ende fui probando distintas configuraciones de **Spark**, según ciertos errores que me iban arrojando las ejecuciones de código.  

Principalmente sufrí con un par de errores del tipo: "_Py4JJavaError: An error occurred while calling {ABC}_". Esto ocurría casi todas las veces que usé UDFs,  
busqué varias soluciones pero no dí con ninguna así que opté por no usar UDFs.  


  
   
referencias
- [Instalar PySpark en Windows](https://medium.com/@dipan.saha/getting-started-with-pyspark-day-1-37e5e6fdc14b)
- [Documentación de PySpark config](https://spark.apache.org/docs/3.0.2/configuration.html)


---

Primero inicializo los nombres de archivos y las rutas, además de preguntar si es que existe el .zip para descomprimirlo en la carpeta **data**

In [1]:
import os
import cProfile
import pstats
from zipfile import ZipFile

# declaro nombres de archivo y rutas
json_file = "farmers-protest-tweets-2021-2-4.json"
parquet_file = "tweets.parquet"
data_folder_path = os.path.abspath(os.path.join(os.getcwd(), '..', 'data'))
json_path = f"{data_folder_path}\\{json_file}"
parquet_path = f"{data_folder_path}\\{parquet_file}"

# checkea si existe la carpeta data, sino, la crea
if not os.path.exists(data_folder_path):
    os.makedirs(data_folder_path)

# checkea si existe el archivo tweets.json.zip, si no, checkea si existe el archivo json, si no, lanza excepción
if not os.path.exists(f"{data_folder_path}\\tweets.json.zip"):
    if not os.path.exists(f"{data_folder_path}\\{json_file}"):
        raise FileNotFoundError("Por favor descargue el archivo 'tweets.json.zip' y colóquelo en la carpeta 'data'")

# si no existe el archivo json, descomprime el archivo zip
if not os.path.exists(f"{data_folder_path}\\{json_file}"):
    with ZipFile(f"{data_folder_path}\\tweets.json.zip", 'r') as zObject: 
        zObject.extractall(path=f"{data_folder_path}")

Luego pregunta si es que existe el archivo parquet, sino, lo crea  

La función **json_to_parquet** toma el json principal (el de 389mb) y lo "desnestea". Esto va a buscar si existe algún elemento **quotedTweet**,  
y si existe une sus datos con el dataframe principal, esto para crear un gran dataset con todos los tweets, luego de ellos lo pasa a formato **parquet**.  

```python
def json_to_parquet(ruta_del_json: str, ruta_del_parquet: str) -> None
```

In [2]:
from helpers.helperFuncs import json_to_parquet
# si no encuentra el archivo parquet, lo crea
if not os.path.exists(f"{parquet_path}"):
    json_to_parquet(json_path, parquet_path)

---  

#### Pregunta 1
Las top 10 fechas donde hay más tweets. Mencionar el usuario (username) que más publicaciones tiene por cada uno de esos días. Debe incluir las siguientes funciones:
```python
def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
```
```python
def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
```
```python
Returns: 
[(datetime.date(1999, 11, 15), "LATAM321"), (datetime.date(1999, 7, 15), "LATAM_CHI"), ...]
```

In [3]:
from q1_memory import q1_memory
q1_mem_result = q1_memory(f"{parquet_path}")
q1_mem_result

Filename: f:\GitHub\DataEngineer\latam_de_challenge\src\q1_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     8     72.9 MiB     72.9 MiB           1   @profile
     9                                         def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    10                                             # Inicializacion de Spark
    11     75.0 MiB      2.1 MiB           1       spark = SparkClass("Q1: Memory")
    12                                             # Carga de datos
    13                                             #json_data = spark.load_json(file_path)
    14                                             #df = extract_all_tweets(json_data, "q1_memo")
    15     75.0 MiB      0.0 MiB           1       df = spark.load_parquet(file_path).select("id", "username", "date")
    16                                         
    17                                             # obtengo las top 10 fechas con mas tweets
    18     75.0 MiB 

[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 17), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 18), 'rebelpacifist'),
 (datetime.date(2021, 2, 15), 'jot__b'),
 (datetime.date(2021, 2, 23), 'Surrypuria'),
 (datetime.date(2021, 2, 19), 'Preetm91'),
 (datetime.date(2021, 2, 19), 'KaurDosanjh1979')]

In [4]:
from q1_time import q1_time

with cProfile.Profile() as pr:
    q1_time_result = q1_time(f"{parquet_path}")

q1_stats = pstats.Stats(pr).strip_dirs().sort_stats('time')
q1_stats.dump_stats('./stats/q1_time')
q1_stats.print_stats()
q1_time_result

         45670 function calls (45222 primitive calls) in 2.196 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      569    1.765    0.003    1.765    0.003 {method 'recv_into' of '_socket.socket' objects}
        8    0.355    0.044    0.355    0.044 {method 'acquire' of '_thread.lock' objects}
      555    0.018    0.000    0.018    0.000 {method 'sendall' of '_socket.socket' objects}
      555    0.005    0.000    1.794    0.003 clientserver.py:499(send_command)
     1517    0.003    0.000    0.003    0.000 protocol.py:214(smart_decode)
       47    0.002    0.000    0.002    0.000 {built-in method nt.stat}
      556    0.002    0.000    1.767    0.003 {method 'readline' of '_io.BufferedReader' objects}
     1057    0.002    0.000    0.002    0.000 inspect.py:1593(_shadowed_dict)
      206    0.002    0.000    0.004    0.000 java_gateway.py:1336(__init__)
      555    0.002    0.000    1.798    0.003 java_gateway.py:1015

[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 17), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 18), 'rebelpacifist'),
 (datetime.date(2021, 2, 15), 'jot__b'),
 (datetime.date(2021, 2, 23), 'Surrypuria'),
 (datetime.date(2021, 2, 19), 'Preetm91'),
 (datetime.date(2021, 2, 19), 'KaurDosanjh1979')]

#### Pregunta 2
Los top 10 emojis más usados con su respectivo conteo. Debe incluir las siguientes funciones:
```python
def q2_time(file_path: str) -> List[Tuple[str, int]]:
```
```python
def q2_memory(file_path: str) -> List[Tuple[str, int]]:
```
```python
Returns: 
[("✈️", 6856), ("❤️", 5876), ...]
```

In [5]:
'''from q2_memory import q2_memory
q2_mem_result = q2_memory(f"{parquet_path}")
q2_mem_result'''

'from q2_memory import q2_memory\nq2_mem_result = q2_memory(f"{parquet_path}")\nq2_mem_result'

In [6]:
'''from q2_time import q2_time

with cProfile.Profile() as pr:
    q2_time_result = q2_time(f"{parquet_path}")

q2_stats = pstats.Stats(pr).strip_dirs().sort_stats('time')
q2_stats.dump_stats('./stats/q2_time')
q2_stats.print_stats()
q2_time_result'''

'from q2_time import q2_time\n\nwith cProfile.Profile() as pr:\n    q2_time_result = q2_time(f"{parquet_path}")\n\nq2_stats = pstats.Stats(pr).strip_dirs().sort_stats(\'time\')\nq2_stats.dump_stats(\'./stats/q2_time\')\nq2_stats.print_stats()\nq2_time_result'

#### Pregunta 3
El top 10 histórico de usuarios (username) más influyentes en función del conteo de las menciones (@) que registra cada uno de ellos. Debe incluir las siguientes funciones:
```python
def q3_time(file_path: str) -> List[Tuple[str, int]]:
```
```python
def q3_memory(file_path: str) -> List[Tuple[str, int]]:
```
```python
Returns: 
[("LATAM321", 387), ("LATAM_CHI", 129), ...]
```

In [7]:
from q3_memory import q3_memory
q3_mem_result = q3_memory(f"{parquet_path}")
q3_mem_result

Filename: f:\GitHub\DataEngineer\latam_de_challenge\src\q3_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     6     76.7 MiB     76.7 MiB           1   @profile
     7                                         def q3_memory(file_path: str) -> List[Tuple[str, int]]:
     8                                             # Inicializacion de Spark
     9     76.8 MiB      0.1 MiB           1       spark = SparkClass("Q3: Memory")
    10                                             # Carga de datos
    11                                             #json_data = spark.load_json(file_path)
    12                                             #df = extract_all_tweets(json_data, "q3_memo")
    13     76.8 MiB      0.0 MiB           1       df = spark.load_parquet(file_path).select("id", "mentionUser")
    14                                         
    15                                             # hago un explode de los mentionedUsers para abrir el array y luego tomo solo 

[('narendramodi', 2415),
 ('Kisanektamorcha', 1942),
 ('RakeshTikaitBKU', 1723),
 ('PMOIndia', 1489),
 ('RahulGandhi', 1218),
 ('GretaThunberg', 1105),
 ('RaviSinghKA', 1062),
 ('rihanna', 1037),
 ('UNHumanRights', 989),
 ('meenaharris', 975)]

In [8]:
from q3_time import q3_time

with cProfile.Profile() as pr:
    q3_time_result = q3_time(f"{parquet_path}")

q3_stats = pstats.Stats(pr).strip_dirs().sort_stats('time')
q3_stats.dump_stats('./stats/q3_time')
q3_stats.print_stats()
q3_time_result

         32796 function calls (32451 primitive calls) in 1.173 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      397    0.898    0.002    0.898    0.002 {method 'recv_into' of '_socket.socket' objects}
        8    0.216    0.027    0.216    0.027 {method 'acquire' of '_thread.lock' objects}
      390    0.015    0.000    0.015    0.000 {method 'sendall' of '_socket.socket' objects}
      390    0.004    0.000    0.922    0.002 clientserver.py:499(send_command)
     1079    0.002    0.000    0.003    0.000 protocol.py:214(smart_decode)
      391    0.002    0.000    0.900    0.002 {method 'readline' of '_io.BufferedReader' objects}
      813    0.002    0.000    0.002    0.000 inspect.py:1593(_shadowed_dict)
       32    0.002    0.000    0.002    0.000 {built-in method nt.stat}
      390    0.001    0.000    0.926    0.002 java_gateway.py:1015(send_command)
     5741    0.001    0.000    0.002    0.000 {built-in method

[('narendramodi', 2415),
 ('Kisanektamorcha', 1942),
 ('RakeshTikaitBKU', 1723),
 ('PMOIndia', 1489),
 ('RahulGandhi', 1218),
 ('GretaThunberg', 1105),
 ('RaviSinghKA', 1062),
 ('rihanna', 1037),
 ('UNHumanRights', 989),
 ('meenaharris', 975)]