In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, concat_ws, split, hour
import re
from utils import get_session

spark = get_session()

df = spark.read.csv('hdfs:///tweets/debate-tweets-001.tsv', sep='\t').select('_c1', '_c7', '_c8')
df.show()

Setting default log level to "

22/12/08 23:06:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+--------------------+--------------------+----------+
|                 _c1|                 _c7|       _c8|
+--------------------+--------------------+----------+
|@anacddd verdade,...|Wed Oct 15 14:31:...|2014-10-15|
|              Que ñ*|Wed Oct 15 14:31:...|2014-10-15|
| Vou quebrar a Bruna|Wed Oct 15 14:31:...|2014-10-15|
|agora vou p segun...|Wed Oct 15 14:31:...|2014-10-15|
|Me sinto tão bem ...|Wed Oct 15 14:31:...|2014-10-15|
|Eu estou aqui, de...|Wed Oct 15 14:31:...|2014-10-15|
|Quando vai embora...|Wed Oct 15 14:31:...|2014-10-15|
|@paynecaralhudo k...|Wed Oct 15 14:31:...|2014-10-15|
|Conceição da Barr...|Wed Oct 15 14:31:...|2014-10-15|
| @Maniavato te amo ♥|Wed Oct 15 14:31:...|2014-10-15|
|Alg me curtindo rs ♡|Wed Oct 15 14:31:...|2014-10-15|
|@MiiluAA No, porq...|Wed Oct 15 14:31:...|2014-10-15|
|#EMABiggestFansJu...|Wed Oct 15

Primeiramente, processamos as colunas com os horários para obter o horário de cada tweet.

In [2]:
df = df.withColumn("tweet_hour", hour(to_timestamp(concat_ws(' ',df._c8, split(df._c7, ' ')[3]))))
df.show()

+--------------------+--------------------+----------+----------+
|                 _c1|                 _c7|       _c8|tweet_hour|
+--------------------+--------------------+----------+----------+
|@anacddd verdade,...|Wed Oct 15 14:31:...|2014-10-15|        14|
|              Que ñ*|Wed Oct 15 14:31:...|2014-10-15|        14|
| Vou quebrar a Bruna|Wed Oct 15 14:31:...|2014-10-15|        14|
|agora vou p segun...|Wed Oct 15 14:31:...|2014-10-15|        14|
|Me sinto tão bem ...|Wed Oct 15 14:31:...|2014-10-15|        14|
|Eu estou aqui, de...|Wed Oct 15 14:31:...|2014-10-15|        14|
|Quando vai embora...|Wed Oct 15 14:31:...|2014-10-15|        14|
|@paynecaralhudo k...|Wed Oct 15 14:31:...|2014-10-15|        14|
|Conceição da Barr...|Wed Oct 15 14:31:...|2014-10-15|        14|
| @Maniavato te amo ♥|Wed Oct 15 14:31:...|2014-10-15|        14|
|Alg me curtindo rs ♡|Wed Oct 15 14:31:...|2014-10-15|        14|
|@MiiluAA No, porq...|Wed Oct 15 14:31:...|2014-10-15|        14|
|#EMABigge

Com os horários de cada tweet, podemos particionar os dados em manhã, tarde e noite. Consideramos manhã como até 12H, tarde para tweets entre 12H e 19H e noite para tweets após as 19H.

In [3]:
morning_rows = df.filter(df.tweet_hour < 12)
afternoon_rows = df.filter((df.tweet_hour >= 12) & (df.tweet_hour <= 19 ))
night_rows = df.filter(df.tweet_hour > 19)

A função `persist_hashtags` salva os dados processados de volta no HDFS. A célula abaixo deleta resultados anteriores desses processamentos que já tenham sido salvos.

In [4]:
!hdfs dfs -rmr /user

rmr: DEPRECATED: Please use '-rm -r' instead.
Deleted /user


Para extrair as hashtags de cada tweet, aplicamos uma expressão regular para capturar as hashtags e seus conteúdos.
Após isso, aplicamos um flatMap para transformar cada resultado em uma linha separada do dataset, por fim, aplicamos um reduce para somar a frequência de cada tweet.

A etapa abaixo poderia ter sido feita de outras formas, mas optamos por converter o Dataframe para usar a API de RDD que fornece flatMap e reduce.

Por fim, salvamos os dados no HDFS.

In [5]:
def persist_hashtags(df, csv_name):
    rows_with_hashtags = df.filter(df._c1.contains("#"))

    hastags_only_rdd = rows_with_hashtags.rdd.flatMap(lambda r: map(lambda hashtag: (hashtag, 1), re.findall(r"(#[A-Za-z\d]+)", r._c1)))
    hashtags_with_count_rdd = hastags_only_rdd.reduceByKey(lambda x, y: x + y)

    df_new = spark.createDataFrame(hashtags_with_count_rdd, ["hashtag", "freq"])
    df_new = df_new.orderBy(df_new.freq.desc())
    df_new.write.csv(csv_name)
    return df_new

In [6]:
persist_hashtags(morning_rows, "morning-hashtags").show(40, truncate=False)

+------------------------------+------+
|hashtag                       |freq  |
+------------------------------+------+
|#EMABiggestFansJustinBieber   |100221|
|#EMABiggestFans1D             |91176 |
|#camilasayshi                 |10517 |
|#TheVoiceBrasil               |2799  |
|#DebateNaRecord               |2429  |
|#trndnl                       |2020  |
|#QueroDilmaTreze              |1983  |
|#Lan                          |1940  |
|#debatenarecord               |1854  |
|#LinkinParkNoMultishow        |1854  |
|#E                            |1763  |
|#MasterChefBR                 |1324  |
|#TwOff                        |1285  |
|#AFazenda                     |1273  |
|#LuaBlancoNoAgoraETarde       |1168  |
|#askdirectioner               |1153  |
|#KCAArgentina                 |1141  |
|#AustinMahone                 |1084  |
|#AustinMahoneChile            |1074  |
|#askjadelittlemix             |1070  |
|#QueroNoTVZ                   |1034  |
|#CongratsOn1MChris            |988   |


In [7]:
persist_hashtags(afternoon_rows, "afternoon-hashtags").show(40, truncate=False)

+----------------------------------+-----+
|hashtag                           |freq |
+----------------------------------+-----+
|#EMABiggestFans1D                 |80229|
|#EMABiggestFansJustinBieber       |69887|
|#StealMyGirl                      |6296 |
|#QueroNoTVZ                       |5167 |
|#bigpaynodanceoff                 |4310 |
|#LuanSantanaNaHoraDoFaro          |2180 |
|#AustinMahoneChile                |1536 |
|#AustinMahone                     |1435 |
|#demiyourstorydoesntdefineyou     |1301 |
|#trndnl                           |1181 |
|#HottieOfTheWeek                  |1042 |
|#E                                |1040 |
|#KCAArgentina                     |970  |
|#EMABiggestFansArianaGrande       |954  |
|#EMABiggestFans5SOS               |913  |
|#Vote5HEMA                        |910  |
|#VamosLaU                         |858  |
|#AMAs                             |857  |
|#UnlockMockingjay                 |834  |
|#DomingoPregui                    |811  |
|#WeWantZay

In [None]:
persist_hashtags(night_rows, "night-hashtags").show(40, truncate=False)

In [None]:
spark.stop()