In [0]:
import pyspark.sql.functions as f

In [0]:
## CHALLENGE 1

In [0]:
#Lectura de archivo JSON
dfTweets = spark.read.format("json").option("multiLine", False).load("dbfs:///FileStore/_latam/farmers_protest_tweets_2021_2_4.json")

In [0]:
dfTweets.show(20, False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+-------------------------+-------------------+----+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------

In [0]:
#Mostramos el esquema de metadatos
dfTweets.printSchema()

root
 |-- content: string (nullable = true)
 |-- conversationId: long (nullable = true)
 |-- date: string (nullable = true)
 |-- id: long (nullable = true)
 |-- lang: string (nullable = true)
 |-- likeCount: long (nullable = true)
 |-- media: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- duration: double (nullable = true)
 |    |    |-- fullUrl: string (nullable = true)
 |    |    |-- previewUrl: string (nullable = true)
 |    |    |-- thumbnailUrl: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- variants: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- bitrate: long (nullable = true)
 |    |    |    |    |-- contentType: string (nullable = true)
 |    |    |    |    |-- url: string (nullable = true)
 |-- mentionedUsers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created: string (nullable = true)
 |    |   

In [0]:
#Selecciono los campos necesarios
dfTweetsColumns = dfTweets.select(
  f.date_format(dfTweets["date"], "yyyy-MM-dd").alias("date"),
  dfTweets["user.username"]
)

#Mostrar data
dfTweetsColumns.show(20, False)

+----------+---------------+
|date      |username       |
+----------+---------------+
|2021-02-24|ArjunSinghPanam|
|2021-02-24|PrdeepNain     |
|2021-02-24|parmarmaninder |
|2021-02-24|anmoldhaliwal  |
|2021-02-24|KotiaPreet     |
|2021-02-24|babli_708      |
|2021-02-24|Varinde17354019|
|2021-02-24|BitnamSingh    |
|2021-02-24|anmoldhaliwal  |
|2021-02-24|SatThiara      |
|2021-02-24|PasumaiVikatan |
|2021-02-24|anmoldhaliwal  |
|2021-02-24|ShariaActivist |
|2021-02-24|babli_708      |
|2021-02-24|Dallehal       |
|2021-02-24|KaurAma57668156|
|2021-02-24|KaurDosanjh1979|
|2021-02-24|ArjunSinghPanam|
|2021-02-24|anmoldhaliwal  |
|2021-02-24|BitnamSingh    |
+----------+---------------+
only showing top 20 rows



In [0]:
dfTweetsColumns.printSchema()

root
 |-- date: string (nullable = true)
 |-- username: string (nullable = true)



In [0]:
# Agrupar por fecha y contar el número de tweets por fecha
dfTweetsQuantityDateUser = dfTweetsColumns.groupBy("date","username").agg(f.count("*").alias("TweetsQuantity"))

In [0]:
dfTweetsQuantityDateUser.show()

+----------+---------------+--------------+
|      date|       username|TweetsQuantity|
+----------+---------------+--------------+
|2021-02-24|     ksgoraya23|             3|
|2021-02-24|HarteerathSingh|             1|
|2021-02-24| rajbir_chattha|             1|
|2021-02-24|     aman_82011|             3|
|2021-02-24|   LovableHindu|             1|
|2021-02-24|Navdeepchauhan0|             1|
|2021-02-23|     shergil190|             5|
|2021-02-23|     royermattw|             1|
|2021-02-23|   PrasadHastag|             1|
|2021-02-23|       aartic02|             1|
|2021-02-23|MohitYa00062314|             3|
|2021-02-23|      arise_ias|             1|
|2021-02-23|   _jaggicheema|             7|
|2021-02-23|BhanuSi65771227|             1|
|2021-02-23|Rupinde78638612|             1|
|2021-02-23|      GGrewal87|             1|
|2021-02-23|Preetdh41842533|             2|
|2021-02-23|Ranjodh72739297|             1|
|2021-02-23|MangalS93055765|             1|
|2021-02-23| ramanjitsandhu|    

In [0]:
# Encontrar las 10 primeras fechas donde se dieron más ventas
dfTop10Dates = dfTweetsQuantityDateUser.groupBy("date").agg(f.sum("TweetsQuantity").alias("TweetsQuantity")).orderBy(f.col("TweetsQuantity").desc()).limit(10)

In [0]:
dfTop10Dates.show()

+----------+--------------+
|      date|TweetsQuantity|
+----------+--------------+
|2021-02-12|         12347|
|2021-02-13|         11296|
|2021-02-17|         11087|
|2021-02-16|         10443|
|2021-02-14|         10249|
|2021-02-18|          9625|
|2021-02-15|          9197|
|2021-02-20|          8502|
|2021-02-23|          8417|
|2021-02-19|          8204|
+----------+--------------+



In [0]:
dfTweetsTop10DateUser = dfTop10Dates.join(
    dfTweetsQuantityDateUser,
    dfTop10Dates["date"] == dfTweetsQuantityDateUser["date"],
    "inner"
).groupBy(dfTop10Dates["date"]).agg(
    f.first("username").alias("username"),  
    f.max(dfTweetsQuantityDateUser["TweetsQuantity"]).alias("TweetsQuantity")
)

In [0]:
dfTweetsTop10DateUser.show()

+----------+---------------+--------------+
|      date|       username|TweetsQuantity|
+----------+---------------+--------------+
|2021-02-12|  Hardhaliwal99|           176|
|2021-02-13|     saltlamp11|           178|
|2021-02-14|     yashu_1717|           119|
|2021-02-15|    manpreetj13|           134|
|2021-02-16|      jdhillonA|           133|
|2021-02-17|       bot_shiv|           185|
|2021-02-18|   Nick13947057|           195|
|2021-02-19|chouhan_jasmeet|           267|
|2021-02-20|     AJ92097171|           108|
|2021-02-23|     shergil190|           135|
+----------+---------------+--------------+



In [0]:
###########################################################################################################
###########################################################################################################
###########################################################################################################
###########################################################################################################
###########################################################################################################
###########################################################################################################
###########################################################################################################
###########################################################################################################
###########################################################################################################
###########################################################################################################
###########################################################################################################
###########################################################################################################

In [0]:
from typing import List, Tuple
from datetime import datetime
import pyspark.sql.functions as f

def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
    
    # Lectura de archivo JSON
    dfTweets = spark.read.format("json").option("multiLine", False).load(file_path)

    # Selecciono los campos necesarios, para el ejercicio 1, solo necesitamos date y username
    dfTweetsColumns = dfTweets.select(
        f.to_date(dfTweets["date"]).alias("date"),
        dfTweets["user.username"].alias("username")
    )

    # Contar el Numero de Tweets, agrupando fecha y usuario
    dfTweetsQuantityDateUser = dfTweetsColumns.groupBy("date", "username").agg(f.count("*").alias("TweetsQuantity"))

    # Top 10 de las fechas donde se tiene mas tweets
    dfTop10Dates = dfTweetsQuantityDateUser.groupBy("date").agg(f.sum("TweetsQuantity").alias("TweetsQuantity")).orderBy(f.col("TweetsQuantity").desc()).limit(10)

    # Unimos el top 10 de las fechas con el usuario que realizó mas tweets dentro de esas fechas.
    dfTweetsTop10DateUser = dfTop10Dates.join(
        dfTweetsQuantityDateUser,
        dfTop10Dates["date"] == dfTweetsQuantityDateUser["date"],
        "inner"
    ).groupBy(dfTop10Dates["date"]).agg(
        f.first("username").alias("username"),
        f.max(dfTweetsQuantityDateUser["TweetsQuantity"]).alias("TweetsQuantity")
    )

    # Recolectamos los resultados y los devolvemos como una lista de tuplas
    result = [(row["date"], row["username"]) for row in dfTweetsTop10DateUser.collect()]
   

    return result

In [0]:
file_path = "dbfs:///FileStore/_latam/farmers_protest_tweets_2021_2_4.json"

In [0]:
result = q1_time(file_path)
print(result)

[(datetime.date(2021, 2, 12), 'loyal90901246'), (datetime.date(2021, 2, 13), 'Surjeet26238794'), (datetime.date(2021, 2, 14), 'gargiprasad'), (datetime.date(2021, 2, 15), 'mohit_1904'), (datetime.date(2021, 2, 16), 'oneworl02477933'), (datetime.date(2021, 2, 17), 'SinghisKing908'), (datetime.date(2021, 2, 18), 'the_jethwani'), (datetime.date(2021, 2, 19), 'subeeron'), (datetime.date(2021, 2, 20), 'RSandhu2017'), (datetime.date(2021, 2, 23), 'TechieKisaan')]


In [0]:
######################

In [0]:
from typing import List, Tuple
from datetime import datetime
import pyspark.sql.functions as f

def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    
    # Lectura de archivo JSON
    dfTweets = spark.read.format("json").option("multiLine", False).load(file_path)

    # Selecciono los campos necesarios, para el ejercicio 1, solo necesitamos date y username
    dfTweetsColumns = dfTweets.select(
        f.to_date(dfTweets["date"]).alias("date"),
        dfTweets["user.username"].alias("username")
    )

    # Contar el Numero de Tweets, agrupando fecha y usuario
    dfTweetsQuantityDateUser = dfTweetsColumns.groupBy("date", "username").agg(f.count("*").alias("TweetsQuantity"))

    # Top 10 de las fechas donde se tiene mas tweets
    dfTop10Dates = dfTweetsQuantityDateUser.groupBy("date").agg(f.sum("TweetsQuantity").alias("TweetsQuantity")).orderBy(f.col("TweetsQuantity").desc()).limit(10)

    # Unimos el top 10 de las fechas con el usuario que realizó mas tweets dentro de esas fechas.
    dfTweetsTop10DateUser = dfTop10Dates.join(
        dfTweetsQuantityDateUser,
        dfTop10Dates["date"] == dfTweetsQuantityDateUser["date"],
        "inner"
    ).groupBy(dfTop10Dates["date"]).agg(
        f.first("username").alias("username"),
        f.max(dfTweetsQuantityDateUser["TweetsQuantity"]).alias("TweetsQuantity")
    )

    # Recolectamos los resultados y los devolvemos como una lista de tuplas
    result = [(row["date"], row["username"]) for row in dfTweetsTop10DateUser.toLocalIterator()]
   

    return result

In [0]:
result = q1_memory(file_path)
print(result)



[(datetime.date(2021, 2, 12), 'loyal90901246'), (datetime.date(2021, 2, 13), 'Surjeet26238794'), (datetime.date(2021, 2, 14), 'gargiprasad'), (datetime.date(2021, 2, 15), 'mohit_1904'), (datetime.date(2021, 2, 16), 'oneworl02477933'), (datetime.date(2021, 2, 17), 'SinghisKing908'), (datetime.date(2021, 2, 18), 'the_jethwani'), (datetime.date(2021, 2, 19), 'subeeron'), (datetime.date(2021, 2, 20), 'RSandhu2017'), (datetime.date(2021, 2, 23), 'TechieKisaan')]
