In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import regexp_replace

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
df = spark.read.option("encoding", "UTF-8").option("decode", "UTF-8").csv('../dataset/debate-tweets.tsv', sep='\t', inferSchema=True)

In [245]:
df.show(vertical=True)

-RECORD 0--------------------
 _c0  | 522394422710136832   
 _c1  | @anacddd verdade,... 
 _c2  | -27.516566           
 _c3  | -48.646082           
 _c4  | False                
 _c5  | 522394422710136832   
 _c6  | 522394422710136832   
 _c7  | Wed Oct 15 14:31:... 
 _c8  | 2014-10-15           
 _c9  | 3.0342583E8          
 _c10 | pt                   
 _c11 | Biguaçu              
 _c12 | 77c15e08a456c529     
 _c13 | 0.0                  
 _c14 | 0.0                  
 _c15 | 0.0                  
 _c16 | 0.0                  
 _c17 | 0.0                  
 _c18 | 0.0                  
 _c19 | 0.0                  
 _c20 | 0.0                  
 _c21 | null                 
 _c22 | null                 
 _c23 | null                 
 _c24 | null                 
 _c25 | Geovana Nunes        
 _c26 | 295414968            
 _c27 | 295414968            
 _c28 | null                 
 _c29 | 316                  
 _c30 | Mon May 09 00:12:... 
 _c31 | geovanannunes        
-RECORD 1-

In [285]:
df.printSchema()

root
 |-- _c0: long (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: double (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: double (nullable = true)
 |-- _c14: double (nullable = true)
 |-- _c15: double (nullable = true)
 |-- _c16: double (nullable = true)
 |-- _c17: double (nullable = true)
 |-- _c18: double (nullable = true)
 |-- _c19: double (nullable = true)
 |-- _c20: string (nullable = true)
 |-- _c21: string (nullable = true)
 |-- _c22: long (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: string (nullable = true)
 |-- _c25: string (nullable = true)
 |-- _c26: string (nullable = true)
 |-- _c27: long (nullable = true)
 |-

## Questão 1:

### a)

In [225]:
df_time = df.withColumn('_c7', to_timestamp(f.split(df._c7, ' ')[3], 'HH:mm:ss'))

In [227]:
wordCountMorning = df_time.withColumn('word', f.explode(f.split(regexp_replace(f.upper(df_time._c1), '[\$,!"”]', ''), ' ')))\
    .where("_c7 between '1970-01-01 06:00:01' AND '1970-01-01 12:00:00'")\
    .where("word like '#%'")\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)

In [228]:
wordCountAfternoon = df_time.withColumn('word', f.explode(f.split(regexp_replace(f.upper(df_time._c1), '[\$,!"”]', ''), ' ')))\
    .where("_c7 between '1970-01-01 12:00:01' AND '1970-01-01 18:00:00'")\
    .where("word like '#%'")\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)

In [229]:
wordCountNight = df_time.withColumn('word', f.explode(f.split(regexp_replace(f.upper(df_time._c1), '[\$,!"”]', ''), ' ')))\
    .where("_c7 between '1970-01-01 18:00:01' AND '1970-01-01 23:59:59'")\
    .where("word like '#%'")\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)

In [235]:
wordCountDawn = df_time.withColumn('word', f.explode(f.split(regexp_replace(f.upper(df_time._c1), '[\$,!"”]', ''), ' ')))\
    .where("_c7 between '1970-01-01 00:00:01' AND '1970-01-01 06:00:00'")\
    .where("word like '#%'")\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)

In [156]:
wordCountMorning.show(1)

+-----------------+-----+
|             word|count|
+-----------------+-----+
|#EMABiggestFans1D|14757|
+-----------------+-----+
only showing top 1 row



In [157]:
wordCountAfternoon.show(1)

+-----------------+-----+
|             word|count|
+-----------------+-----+
|#EMABiggestFans1D|58489|
+-----------------+-----+
only showing top 1 row



In [158]:
wordCountNight.show(1)

+-----------------+-----+
|             word|count|
+-----------------+-----+
|#EMABiggestFans1D|64676|
+-----------------+-----+
only showing top 1 row



In [237]:
wordCountDawn.show(1)

+--------------------+-----+
|                word|count|
+--------------------+-----+
|#EMABIGGESTFANSJU...|85159|
+--------------------+-----+
only showing top 1 row



### b)

In [8]:
df_time = df.withColumn('_c8', to_timestamp(df._c8, 'yyyy-MM-dd'))

In [297]:
wordCount = df_time.withColumn('word', f.explode(f.split(regexp_replace(f.upper(df_time._c1), '[\$,!"”]', ''), ' ')))\
    .where("word like '#%'")\
    .groupBy('_c8', 'word')\
    .count()\
    .sort('count', ascending=False)

In [298]:
wordCount.show()

+-------------------+--------------------+-----+
|                _c8|                word|count|
+-------------------+--------------------+-----+
|2014-10-16 00:00:00|   #EMABIGGESTFANS1D|68414|
|2014-10-16 00:00:00|#EMABIGGESTFANSJU...|58811|
|2014-10-17 00:00:00|#EMABIGGESTFANSJU...|49599|
|2014-10-17 00:00:00|   #EMABIGGESTFANS1D|47701|
|2014-10-15 00:00:00|   #EMABIGGESTFANS1D|34366|
|2014-10-19 00:00:00|#EMABIGGESTFANSJU...|33508|
|2014-10-19 00:00:00|   #EMABIGGESTFANS1D|29497|
|2014-10-15 00:00:00|#EMABIGGESTFANSJU...|27998|
|2014-10-18 00:00:00|#EMABIGGESTFANSJU...|27399|
|2014-10-18 00:00:00|   #EMABIGGESTFANS1D|27200|
|2014-10-16 00:00:00|       #CAMILASAYSHI|10635|
|2014-10-20 00:00:00|#EMABIGGESTFANSJU...|10475|
|2014-10-20 00:00:00|   #EMABIGGESTFANS1D| 7399|
|2014-10-15 00:00:00|        #STEALMYGIRL| 6929|
|2014-10-20 00:00:00|     #DEBATENARECORD| 4430|
|2014-10-15 00:00:00|   #BIGPAYNODANCEOFF| 4062|
|2014-10-16 00:00:00|        #DEBATENOSBT| 3692|
|2014-10-16 00:00:00

In [314]:
wordCount.filter('_c8 is not null').groupBy('_c8').agg(f.first("word"), f.max("count")).show();

+-------------------+--------------------+----------+
|                _c8|         first(word)|max(count)|
+-------------------+--------------------+----------+
|2014-10-18 00:00:00|#EMABIGGESTFANSJU...|     27399|
|2014-10-15 00:00:00|   #EMABIGGESTFANS1D|     34366|
|2014-10-17 00:00:00|#EMABIGGESTFANSJU...|     49599|
|2014-10-19 00:00:00|#EMABIGGESTFANSJU...|     33508|
|2014-10-16 00:00:00|   #EMABIGGESTFANS1D|     68414|
|2014-10-20 00:00:00|#EMABIGGESTFANSJU...|     10475|
+-------------------+--------------------+----------+



### c)

In [131]:
df_time = df.withColumn('_c7', f.date_trunc('hour',f.to_timestamp(f.concat(df._c8, f.lit(" "), f.split(df._c7, ' ')[3]), 'yyyy-MM-dd HH:mm:ss')))

In [137]:
df_time.select('_c7').filter('_c7 is not null').groupBy('_c7').count().sort('_c7').show(144)

+-------------------+------+
|                _c7| count|
+-------------------+------+
|2014-10-15 14:00:00| 34378|
|2014-10-15 15:00:00| 79157|
|2014-10-15 16:00:00| 78353|
|2014-10-15 17:00:00| 83950|
|2014-10-15 18:00:00| 77713|
|2014-10-15 19:00:00| 65095|
|2014-10-15 20:00:00| 66813|
|2014-10-15 21:00:00| 79270|
|2014-10-15 22:00:00| 86030|
|2014-10-15 23:00:00| 97574|
|2014-10-16 00:00:00|110232|
|2014-10-16 01:00:00|163338|
|2014-10-16 02:00:00|176211|
|2014-10-16 03:00:00|124599|
|2014-10-16 04:00:00| 77743|
|2014-10-16 05:00:00| 42661|
|2014-10-16 06:00:00| 22228|
|2014-10-16 07:00:00| 10157|
|2014-10-16 08:00:00|  8327|
|2014-10-16 09:00:00| 23616|
|2014-10-16 10:00:00| 37528|
|2014-10-16 11:00:00| 42716|
|2014-10-16 12:00:00| 49156|
|2014-10-16 13:00:00| 55958|
|2014-10-16 14:00:00| 66154|
|2014-10-16 15:00:00| 76891|
|2014-10-16 16:00:00| 79276|
|2014-10-16 17:00:00| 65824|
|2014-10-16 18:00:00| 57098|
|2014-10-16 19:00:00| 58547|
|2014-10-16 20:00:00| 68148|
|2014-10-16 21

In [109]:
dates = df_time.select('_c8').filter('_c7 is not null').distinct().collect()

In [120]:
for date in dates:
    diff_sec = df_time.where("_c8 = '{}'".format(date['_c8']))\
                    .sort('_c7', ascending=False).first()._c7 -\
    df_time.where("_c8 = '{}'".format(date['_c8'])).first()._c7
    
    value = df_time.select('_c8')\
        .filter('_c7 is not null')\
        .where("_c8 = '{}'".format(date['_c8']))\
        .groupBy('_c8').count().select('count').collect()[0]['count']
    
    print(date['_c8'], 3600*(value/diff_sec.seconds))

2014-10-18 49878.36896260374
2014-10-16 70737.2353846688
2014-10-20 77489.27658628736
2014-10-19 62006.34266600308
2014-10-17 63126.98063634995
2014-10-15 79028.39038986184


### d)

In [None]:
spark.stop()