In [2]:
import re
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,col
from pyspark.sql.types import *
#Connect to the cluster
# New API
spark = SparkSession\
        .builder\
        .master("spark://192.168.2.207:7077") \
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",2)\
        .appName("pa_test1")\
        .getOrCreate()

# Old API (RDD)
sc = spark.sparkContext

sc.setLogLevel("INFO")

In [3]:
nlines = sc.textFile("hdfs://192.168.2.207:9000/user/ubuntu/negative-words.txt")
plines = sc.textFile("hdfs://192.168.2.207:9000/user/ubuntu/positive-words.txt")

In [4]:
df = spark.read.json("hdfs://192.168.2.207:9000/user/ubuntu/sample_data.json")
data_clean = df.select("subreddit", "body", "score", "controversiality", "created_utc")

In [5]:
def compile_regexp(word_list):
    re_string = "[\s\W]("
    for word in word_list:
        re_string += (re.escape(word) + "|")
    re_string = re_string[0:-1] + ")[\s\W]"
    return re.compile(re_string, re.IGNORECASE)

negative = compile_regexp(nlines.collect())
sc.broadcast(negative)
positive = compile_regexp(plines.collect())
sc.broadcast(positive)

<pyspark.broadcast.Broadcast at 0x7f17f0746860>

In [9]:
def match_negative(comment, wc):
    return len(negative.findall(comment))/wc

def match_positive(comment, wc):
    return len(positive.findall(comment))/wc

def count_words(comment):
    return len(comment.split())

udf_match_negative = udf(match_negative, DoubleType())
udf_match_positive = udf(match_positive, DoubleType())
udf_count_words = udf(count_words, IntegerType())

In [22]:
df.printSchema()

root
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: long (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- is_submitter: boolean (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- retrieved_on: long (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)



In [14]:
w_count = data_clean.withColumn('wordcount', udf_count_words('body'))
negativity = w_count.withColumn('negativity', udf_match_negative('body', 'wordcount'))
n_p_df = negativity.withColumn('positivity', udf_match_positive('body', 'wordcount'))
filtered = n_p_df.select("subreddit", "score", "controversiality", "wordcount", "negativity", "positivity", "created_utc").filter("positivity != 0 and negativity != 0")
filtered.show()
#n_p_df.head()

+---------------+-----+----------------+---------+--------------------+--------------------+-----------+
|      subreddit|score|controversiality|wordcount|          negativity|          positivity|created_utc|
+---------------+-----+----------------+---------+--------------------+--------------------+-----------+
|      EchoArena|    1|               0|       55| 0.03636363636363636| 0.03636363636363636| 1506816000|
|     The_Donald|    2|               0|        9|  0.2222222222222222|  0.1111111111111111| 1506816000|
|       totalwar|    3|               0|      192|0.057291666666666664|             0.03125| 1506816000|
|   tvcrossovers|    1|               0|       49|0.061224489795918366|0.061224489795918366| 1506816000|
|     realmadrid|   10|               0|       13| 0.15384615384615385| 0.07692307692307693| 1506816000|
|            CFB|    1|               0|       15| 0.06666666666666667| 0.13333333333333333| 1506816000|
|     edc_raffle|    1|               0|      221| 0.01

In [13]:
#n_p_df.head()
#find how many comments are totally neutral
neutral_comments_count = n_p_df.select("subreddit", "score", "controversiality", "wordcount", "negativity", "positivity", "created_utc").filter("positivity == 0 and negativity == 0").count()
print(neutral_comments_count)

4516


In [25]:
#Group By subreddit
df1 = filtered.groupBy("created_utc", "subreddit").avg("positivity", "negativity")
#find "happiest" subreddit of the day 
highest_average_positive = df1.agg({"avg(positivity)": "max"}).first()[0]
highest_average_negative = df1.agg({"avg(negativity)": "max"}).first()[0]
#print(highest_average)

In [12]:
#df1.withColumn("NormalizedPositivity", "Normalized" ())

+-------------------+--------------------+--------------------+
|          subreddit|     avg(positivity)|     avg(negativity)|
+-------------------+--------------------+--------------------+
|              anime| 0.06870865426840965| 0.03835851305971725|
|         NHLStreams|0.011494252873563218| 0.04597701149425287|
|       SaltLakeCity|0.024793388429752067|0.024793388429752067|
|         MLBTheShow|0.020833333333333332|0.041666666666666664|
|         QuotesPorn| 0.08571428571428572| 0.02857142857142857|
|         WahoosTipi|              0.0625|              0.0625|
|       JennyPoussin| 0.15384615384615385| 0.07692307692307693|
|         television| 0.06637761487573753|0.047329995828118476|
|          JUSTNOMIL| 0.03747990229938484|  0.0476114772726827|
|         rapbattles|0.045454545454545456|0.045454545454545456|
| Anarcho_Capitalism| 0.04560617292270369| 0.07484219511647236|
|         LinkinPark| 0.02631578947368421| 0.05263157894736842|
|          BABYMETAL|0.04511278195488721

In [26]:
#df2 = fd1.withColumn("Happines Ratio", df1.avg(positivity) / df1.avg(negativity))
df2 = df1.withColumn("Happines Ratio", df1["avg(positivity)"] / df1["avg(negativity)"])

In [27]:
df2.show()

+-----------+---------------+--------------------+--------------------+-------------------+
|created_utc|      subreddit|     avg(positivity)|     avg(negativity)|     Happines Ratio|
+-----------+---------------+--------------------+--------------------+-------------------+
| 1506816001|    techtheatre| 0.09090909090909091|0.030303030303030304|                3.0|
| 1506816011|  linuxhardware| 0.03571428571428571| 0.08928571428571429|0.39999999999999997|
| 1506816018|            WTF| 0.23076923076923078| 0.07692307692307693|                3.0|
| 1506816041|           news| 0.05263157894736842| 0.05263157894736842|                1.0|
| 1506816047|      starbucks|0.030303030303030304| 0.06060606060606061|                0.5|
| 1506816049|  todayilearned| 0.04040404040404041| 0.06060606060606061| 0.6666666666666667|
| 1506816061|         hockey| 0.08333333333333333| 0.08333333333333333|                1.0|
| 1506816094|          vegan|0.043478260869565216|0.043478260869565216|         

In [60]:
df2.agg({"created_utc": "max"}).first()[0]

1506816312

In [44]:
#from pyspark.sql.functions import to_timestamp
#df3 = df2.withColumn('created_utc', to_timestamp(df2.created_utc, 'yyyy-MM-dd HH:mm:ss'))
#Convert timestamp to unix timestamp
from pyspark.sql.functions import *
df3 = df2.withColumn("created_utc", (from_unixtime(col("created_utc"),"MM-dd-yyyy HH:mm:ss").alias("created_utc")))
 

In [61]:
df3.agg({"created_utc": "max"}).first()[0]

'10-01-2017 00:05:12'

In [45]:
df3.take(10)

[Row(created_utc='10-01-2017 00:00:01', subreddit='techtheatre', avg(positivity)=0.09090909090909091, avg(negativity)=0.030303030303030304, Happines Ratio=3.0),
 Row(created_utc='10-01-2017 00:00:11', subreddit='linuxhardware', avg(positivity)=0.03571428571428571, avg(negativity)=0.08928571428571429, Happines Ratio=0.39999999999999997),
 Row(created_utc='10-01-2017 00:00:18', subreddit='WTF', avg(positivity)=0.23076923076923078, avg(negativity)=0.07692307692307693, Happines Ratio=3.0),
 Row(created_utc='10-01-2017 00:00:41', subreddit='news', avg(positivity)=0.05263157894736842, avg(negativity)=0.05263157894736842, Happines Ratio=1.0),
 Row(created_utc='10-01-2017 00:00:47', subreddit='starbucks', avg(positivity)=0.030303030303030304, avg(negativity)=0.06060606060606061, Happines Ratio=0.5),
 Row(created_utc='10-01-2017 00:00:49', subreddit='todayilearned', avg(positivity)=0.04040404040404041, avg(negativity)=0.06060606060606061, Happines Ratio=0.6666666666666667),
 Row(created_utc='10

In [56]:
from pyspark.sql import functions as F
from pyspark.sql.functions import *

df4 = df3.withColumn("Hour", hour(F.to_timestamp("created_utc","MM-dd-yyyy HH:mm:ss")))

In [57]:
#df1 = filtered.groupBy("created_utc", "subreddit").avg("positivity", "negativity")
df5 = df4.groupBy("Hour").avg("Happines Ratio")

In [59]:
df5.show()

+----+-------------------+
|Hour|avg(Happines Ratio)|
+----+-------------------+
|   0| 1.4285350419065044|
+----+-------------------+

