# 5. Résultats: Requêtes Spark SQL

In [1]:
# import des librairies
from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import explode

In [2]:
# Initialisation de spark
sc = SparkContext()
spark = SparkSession.builder.config("spark.sql.broadcastTimeout", "36000").getOrCreate()

### 5.1. Observations de batchView: 

In [3]:
# Chargement des données dans une dataframe spark
df_batchView = spark.read.format("mongo").option("uri","mongodb://127.0.0.1/twitter.batchView").load()
df_batchView.persist()
df_batchView.printSchema()
df_batchView.createOrReplaceTempView("dfbatch")

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- count: long (nullable = true)
 |-- date: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- heureDebut: string (nullable = true)
 |-- heureFin: string (nullable = true)



In [4]:
# C'est plus pratique d'utiliser une fonction qui affichera les résultats une par une
def afficherTopTenHtagFromBatch(batchDataframe, date, heure_debut):
    """
        Fonction affichant le classement des hashtags les plus populaire
        à partir du dataframeBatch
        La fonction prend en paramètre:
            - date: String comme par exemple : 2020-09-03
            - heure_debut: int entre 0 et 23
    """
    #date = "2020-09-03"
    #heure_debut = 16
    df2 = spark.sql("""SELECT hashtags, count FROM dfbatch df1 WHERE df1.date == '{0}' AND df1.heureDebut == '{1}h:00m:00s'"""\
                     .format(date, heure_debut )).orderBy("count", ascending=False)

    print("***********************************************************************************************")
    print("Le {0} entre {1}:00:00 et {2}:00:00, ci-dessous les hashtags les plus utilisés: ".format(date, heure_debut, heure_debut +1 ))
    df2.show()


In [5]:
date = "2020-09-05"
# Afficher les résultat par heure du 5 septembre
for x in range(25):
    afficherTopTenHtagFromBatch(df_batchView, date, x)

***********************************************************************************************
Le 2020-09-05 entre 0:00:00 et 1:00:00, ci-dessous les hashtags les plus utilisés: 
+--------------------+-----+
|            hashtags|count|
+--------------------+-----+
|   ArtistoftheSummer|  764|
|       WeRespectVets|  125|
|               HiKai|   81|
|             BBNaija|   72|
|                 BTS|   65|
|قفا_الموستكار_بيط...|   56|
|TrumpHatesOurMili...|   52|
|            IceCream|   36|
|        ยังคั่นกูEP4|   34|
|   SayangAdminShopee|   32|
+--------------------+-----+

***********************************************************************************************
Le 2020-09-05 entre 1:00:00 et 2:00:00, ci-dessous les hashtags les plus utilisés: 
+-----------------+-----+
|         hashtags|count|
+-----------------+-----+
|ArtistoftheSummer|  537|
|   DraftDodgerDon|  132|
|    WeRespectVets|  108|
|          GoStars|   86|
|              BTS|   58|
|          BBNaija|   52|

***********************************************************************************************
Le 2020-09-05 entre 15:00:00 et 16:00:00, ci-dessous les hashtags les plus utilisés: 
+--------------------+-----+
|            hashtags|count|
+--------------------+-----+
|23YrsofSENSATiONA...|  987|
|      SooraraiPottru|  906|
|          5बजे5मिनिट|  568|
|        RRBExamDates|  422|
|             speakup|  260|
|       5Baje5Minutes|  188|
|   SarkaruVaariPaata|  188|
|        5Baje5Minute|  125|
|   ArtistoftheSummer|  111|
|RamCharan13YrsTre...|   97|
+--------------------+-----+

***********************************************************************************************
Le 2020-09-05 entre 16:00:00 et 17:00:00, ci-dessous les hashtags les plus utilisés: 
+--------------------+-----+
|            hashtags|count|
+--------------------+-----+
|23YrsofSENSATiONA...|  455|
|      SooraraiPottru|  372|
|          5बजे5मिनिट|  334|
|        RRBExamDates|  255|
|   ArtistoftheSummer|  20

In [6]:
# Liberer la mémoire
df_batchView.unpersist()

DataFrame[_id: struct<oid:string>, count: bigint, date: string, hashtags: string, heureDebut: string, heureFin: string]

### 5.2. Observations de speedView: 

In [7]:
df_speedView = spark.read.format("mongo").option("uri","mongodb://127.0.0.1/twitter.speedView").load()
df_speedView.persist()
df_speedView.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- date_debut: timestamp (nullable = true)
 |-- date_fin: timestamp (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- hashtagUsed: string (nullable = true)
 |    |    |-- quantite: integer (nullable = true)
 |-- nbTotalMessagesPosted: integer (nullable = true)



In [8]:
# C'est plus pratique d'utiliser une fonction qui affichera les résultats une par une
def afficherTopTenInTime(dataframe, date, heure_debut_int, heure_fin_int):
    """
        Cette fonction filtre les heures dans le dataframe provenant de mongoDB
        et affiches les classement des HASHTAGS dans l'ordre
        Cette fonction prend en paramètre:
            - df : dataframe provenant de mongoDB
            - date est au format 2020-08-24
            -heure de début : int entre 0, et 23
            -heure de début : int entre 1, et 23
    """
    df1 = dataframe.\
        filter("date_debut > timestamp'{0} {1}:00:00' AND date_fin < timestamp'{0} {2}:00:00'".format(date, heure_debut_int, heure_fin_int))\
        .select("hashtags")

    df1.persist()
    #print(df1.count())

    df_exploded = df1.select(explode(df1.hashtags))
    df_exploded2 = df_exploded\
        .select(df_exploded.col.hashtagUsed, df_exploded.col.quantite)\
        .withColumnRenamed('col.hashtagUsed', 'hashtagUsed')\
        .withColumnRenamed('col.quantite', 'quantitee')

    result_df = df_exploded2.groupBy('hashtagUsed').sum().orderBy("sum(quantitee)", ascending=False).limit(10)
    print("***********************************************************************************************")
    print("Le {0} entre {1}:00:00 et {2}:00:00, ci-dessous les hashtags les plus utilisés: ".format(date, heure_debut_int, heure_fin_int))

    result_df.show()
    df1.unpersist()

In [9]:
# On parcours toutes les heure depuis minuit à midi
date = "2020-09-06"
for x in range(12):
    afficherTopTenInTime(df_speedView, date, x, x+1)

***********************************************************************************************
Le 2020-09-06 entre 0:00:00 et 1:00:00, ci-dessous les hashtags les plus utilisés: 
+--------------------+--------------+
|         hashtagUsed|sum(quantitee)|
+--------------------+--------------+
|   ArtistoftheSummer|           782|
|             BBNaija|           246|
|             BBNajia|           126|
|    TurnUpWithLaycon|            88|
|                 BTS|            74|
|             bbnaija|            69|
|   SayangAdminShopee|            31|
|23YrsofSENSATiONA...|            29|
|      BBNaijaShallWe|            27|
|            Dumbkirk|            25|
+--------------------+--------------+

***********************************************************************************************
Le 2020-09-06 entre 1:00:00 et 2:00:00, ci-dessous les hashtags les plus utilisés: 
+-----------------+--------------+
|      hashtagUsed|sum(quantitee)|
+-----------------+--------------+
|A

In [10]:
# Liberer la mémoire
df_speedView.unpersist()

DataFrame[_id: struct<oid:string>, date_debut: timestamp, date_fin: timestamp, hashtags: array<struct<hashtagUsed:string,quantite:int>>, nbTotalMessagesPosted: int]