En este archivo puedes escribir lo que estimes conveniente. Te recomendamos detallar tu solución y todas las suposiciones que estás considerando. Aquí puedes ejecutar las funciones que definiste en los otros archivos de la carpeta src, medir el tiempo, memoria, etc.

In [None]:
!pip install memory-profiler==0.61.0 pyspark emoji findspark

Collecting memory-profiler==0.61.0
  Downloading memory_profiler-0.61.0-py3-none-any.whl.metadata (20 kB)
Collecting emoji
  Downloading emoji-2.14.0-py3-none-any.whl.metadata (5.7 kB)
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading memory_profiler-0.61.0-py3-none-any.whl (31 kB)
Downloading emoji-2.14.0-py3-none-any.whl (586 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m586.9/586.9 kB[0m [31m10.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark, memory-profiler, emoji
Successfully installed emoji-2.14.0 findspark-2.0.1 memory-profiler-0.61.0


In [None]:
import findspark
findspark.init()

In [None]:
!echo $SPARK_HOME

/usr/local/lib/python3.10/dist-packages/pyspark


In [None]:
!curl "https://drive.usercontent.google.com/download?id=1ig2ngoXFTxP5Pa8muXo02mDTFexZzsis&confirm=xxx" -o "data/tweets.json.zip"

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 57.5M  100 57.5M    0     0  25.7M      0  0:00:02  0:00:02 --:--:-- 25.7M


In [None]:
!unzip data/tweets.json.zip -d data/

Archive:  data/tweets.json.zip
  inflating: data/farmers-protest-tweets-2021-2-4.json  


In [None]:
file_path = "/content/data/farmers-protest-tweets-2021-2-4.json"

In [None]:
from pyspark.sql import SparkSession
import zipfile
import findspark
import os
from pyspark.sql.functions import desc, to_date, col, sum, rank, when, col, regexp_replace, lower
from pyspark.sql.window import Window

def getSparkInstance(name_app="test_app") -> SparkSession:
    findspark.init()
    spark = SparkSession.builder \
      .appName(name_app) \
      .config("spark.driver.memory", "2g") \
      .config("spark.executor.extraJavaOptions", "--illegal-access=permit") \
      .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    return spark

In [None]:
#crear dataframe solo con las columnas a utilizar, transformando la columna date
#(en formato datetime) a formato date y ademas limpiando el campo username,
#quitando espacios y transformando a minuscula para su conteo.
try:
  spark = getSparkInstance(name_app="test_app")
  data = spark.read.json(file_path).select('date','user.username') \
  .withColumn("date", to_date("date")) \
  .withColumn("username", when(col("username").isNotNull(), regexp_replace(lower('username'), ' ', '')).otherwise(None) )
except (FileNotFoundError, IOError) as e:
  print(f'Error archivo no existe en el path: {file_path}, error -> {e}')


In [None]:
data.show(10)

+----------+---------------+
|      date|       username|
+----------+---------------+
|2021-02-24|arjunsinghpanam|
|2021-02-24|     prdeepnain|
|2021-02-24| parmarmaninder|
|2021-02-24|  anmoldhaliwal|
|2021-02-24|     kotiapreet|
|2021-02-24|      babli_708|
|2021-02-24|varinde17354019|
|2021-02-24|    bitnamsingh|
|2021-02-24|  anmoldhaliwal|
|2021-02-24|      satthiara|
+----------+---------------+
only showing top 10 rows



In [None]:
#Get schema and datatypes of data
data.printSchema()

root
 |-- date: date (nullable = true)
 |-- username: string (nullable = true)



In [None]:
#calcular y añadir a nueva columna la cantidad de tweets de cada usuario
df_conteo_tweets = data.groupBy('date','username').count().withColumnRenamed("count", "tweets")
df_conteo_tweets.orderBy(desc("tweets")).show(5)

+----------+---------------+------+
|      date|       username|tweets|
+----------+---------------+------+
|2021-02-19|       preetm91|   267|
|2021-02-18|neetuanjle_nitu|   195|
|2021-02-17| raajvinderkaur|   185|
|2021-02-13|maandee08215437|   178|
|2021-02-12|ranbirs00614606|   176|
+----------+---------------+------+
only showing top 5 rows



In [None]:
# Se define una ventana para obtener el usuario con más tweets por fecha
window = Window.partitionBy('date').orderBy(col('tweets').desc())
top_twitters_df = df_conteo_tweets.withColumn('top_twitters', rank().over(window)).filter(col('top_twitters') == 1)
top_twitters_df.orderBy(col('date').asc(), col('top_twitters').asc()).show()

+----------+---------------+------+------------+
|      date|       username|tweets|top_twitters|
+----------+---------------+------+------------+
|2021-02-12|ranbirs00614606|   176|           1|
|2021-02-13|maandee08215437|   178|           1|
|2021-02-14|  rebelpacifist|   119|           1|
|2021-02-15|         jot__b|   134|           1|
|2021-02-16|         jot__b|   133|           1|
|2021-02-17| raajvinderkaur|   185|           1|
|2021-02-18|neetuanjle_nitu|   195|           1|
|2021-02-19|       preetm91|   267|           1|
|2021-02-20|mangalj23056160|   108|           1|
|2021-02-21|     surrypuria|   161|           1|
|2021-02-22| preetysaini321|   110|           1|
|2021-02-23|     surrypuria|   135|           1|
|2021-02-24| preetysaini321|   107|           1|
+----------+---------------+------+------------+



In [None]:
max_tweets_data = df_conteo_tweets.groupBy("date").agg(sum("tweets").alias("sum_tweets"))
max_tweets_data.orderBy(col('sum_tweets').desc()).show(100)

+----------+----------+
|      date|sum_tweets|
+----------+----------+
|2021-02-12|     12347|
|2021-02-13|     11296|
|2021-02-17|     11087|
|2021-02-16|     10443|
|2021-02-14|     10249|
|2021-02-18|      9625|
|2021-02-15|      9197|
|2021-02-20|      8502|
|2021-02-23|      8417|
|2021-02-19|      8204|
|2021-02-21|      7532|
|2021-02-22|      7071|
|2021-02-24|      3437|
+----------+----------+



In [None]:
#join de ambas dataframes para setear el resultado solicitado
df_final = top_twitters_df.join(max_tweets_data, "date", "inner").select('*').orderBy(col('sum_tweets').desc())
df_final.show(100)

+----------+---------------+------+------------+----------+
|      date|       username|tweets|top_twitters|sum_tweets|
+----------+---------------+------+------------+----------+
|2021-02-12|ranbirs00614606|   176|           1|     12347|
|2021-02-13|maandee08215437|   178|           1|     11296|
|2021-02-17| raajvinderkaur|   185|           1|     11087|
|2021-02-16|         jot__b|   133|           1|     10443|
|2021-02-14|  rebelpacifist|   119|           1|     10249|
|2021-02-18|neetuanjle_nitu|   195|           1|      9625|
|2021-02-15|         jot__b|   134|           1|      9197|
|2021-02-20|mangalj23056160|   108|           1|      8502|
|2021-02-23|     surrypuria|   135|           1|      8417|
|2021-02-19|       preetm91|   267|           1|      8204|
|2021-02-21|     surrypuria|   161|           1|      7532|
|2021-02-22| preetysaini321|   110|           1|      7071|
|2021-02-24| preetysaini321|   107|           1|      3437|
+----------+---------------+------+-----

In [None]:
#al estar ordenados por la cantidad de tweets por fecha al seleccionar los primeros 10 obtenemos efectivamente el top 10 de tweets en esa fecha
result_list = [(row['date'], row['username']) for row in df_final.limit(10).collect()]
result_list

[(datetime.date(2021, 2, 12), 'ranbirs00614606'),
 (datetime.date(2021, 2, 13), 'maandee08215437'),
 (datetime.date(2021, 2, 17), 'raajvinderkaur'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 14), 'rebelpacifist'),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 15), 'jot__b'),
 (datetime.date(2021, 2, 20), 'mangalj23056160'),
 (datetime.date(2021, 2, 23), 'surrypuria'),
 (datetime.date(2021, 2, 19), 'preetm91')]