In [31]:
from pyspark.sql import SparkSession
# Define a sparksession and connect it to the spark cluster
spark_session = SparkSession\
        .builder\
        .master("spark://hadoop-master:7077") \
        .appName("DE-1-4-SparkSession")\
        .config("spark.dynamicAllocation.enabled", False)\
        .config("spark.executor.instances", 7) \
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.cores.max", 14) \
        .config("spark.cores.min", 14) \
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

In [32]:
# read the data and print the schema for understanding
reddit_df = spark_session.read.json("hdfs://hadoop-master:9000/user/hadoop/input/corpus-webis-tldr-17.json")
reddit_df.printSchema()



root
 |-- author: string (nullable = true)
 |-- body: string (nullable = true)
 |-- content: string (nullable = true)
 |-- content_len: long (nullable = true)
 |-- id: string (nullable = true)
 |-- normalizedBody: string (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- summary_len: long (nullable = true)
 |-- title: string (nullable = true)



                                                                                

In [33]:
# define paths to positive and negative word lists
path_pos = "./opinion-lexicon-English/positive-words.txt"
path_neg = "./opinion-lexicon-English/negative-words.txt"

# create sets (hash tables) to store the positive and negative words
positive_words = set()
negative_words = set()

# read the content of the word files
with open(path_pos, 'r', encoding='utf-8') as file:
    for line in file:
        positive_words.add(line.strip())
with open(path_neg, 'r', encoding='utf-8') as file:
    for line in file:
        negative_words.add(line.strip())

In [34]:
# broadcast the word sets so all nodes can access the data
broadcast_positive_words = spark_session.sparkContext.broadcast(positive_words)
broadcast_negative_words = spark_session.sparkContext.broadcast(negative_words)

In [35]:
import re
# function to use to define the udf
def pre_process(summary):
    # convert to lowercase
    line = summary.lower()
    
    # tokenize the summaries and remove dots and commas
    line = re.sub(r"[.,]",'',line).split(" ")
    
    # count the number positive and negative words
    positive_count = sum([word in broadcast_positive_words.value for word in line])
    negative_count = sum([word in broadcast_negative_words.value for word in line])

    # if more positive words return positive sentiment (1)
    if positive_count > negative_count:
        return (1,positive_count,negative_count, 1)
        
    # if more negative words return negative sentiment (-1)
    elif negative_count > positive_count:
        return (-1,positive_count,negative_count, 1)
        
    # else neutral (0)
    else:
        return (0, positive_count, negative_count, 1)
    

In [36]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType, StructType, StructField

# define a structure for the new column
schema = StructType([
    StructField("sentiment", IntegerType(), False),
    StructField("positive_count", IntegerType(), False),
    StructField("negative_count", IntegerType(), False),
    StructField("number_of_tweets", IntegerType(), False)
])

In [38]:
from operator import add
import time as time 
# clear cache for more accurate timings
spark_session.catalog.clearCache()
start_time = time.time()

# read the data into dataframe
reddit_df = spark_session.read.json("hdfs://hadoop-master:9000/user/hadoop/input/corpus-webis-tldr-17.json")

# drop the columns that are not needed
reddit_df = reddit_df.drop("author", "body", "normalizedBody", "id", "subreddit_id", "title", "content", "content_len", "summary_len")

# filter out comments where the subreddit is NULL
sampled_reddit_df = reddit_df.filter(reddit_df["subreddit"] != "NULL")

# define udf
conotation_udf = udf(pre_process, schema)

# apply udf and create new column
reddit_df_conotation = sampled_reddit_df.withColumn("conotation", conotation_udf("summary"))

# map the subreddit of the comment to its sentiment and a counter keeping track of the number of total comments in subreddit
grouped = reddit_df_conotation.rdd.map(lambda x: (x["subreddit"], (x["conotation"]["sentiment"], x["conotation"]["number_of_tweets"])))

# reduce by the key, sentiments add up to a total sentiment of the subreddit
# the counter adds up to keep track of the total comments on a subreddit
grouped = grouped.reduceByKey(lambda a,b: (a[0] + b[0], a[1] + b[1]))

# take(1) to ensure not only lazy operations are made
grouped.take(1)
end_time = time.time()
reddit_df.unpersist()
print(f"Total time: {end_time-start_time} seconds")

[Stage 23:>                                                         (0 + 1) / 1]

Total time: 226.8636417388916 seconds


                                                                                

In [39]:
# sort to view the most positive subreddits
grouped = grouped.sortBy(lambda x: x[1], ascending = False)
grouped.take(10)

                                                                                

[('leagueoflegends', (5234, 109307)),
 ('buildapc', (3695, 14817)),
 ('summonerschool', (2532, 13806)),
 ('DotA2', (1657, 22405)),
 ('Guildwars2', (1398, 10948)),
 ('magicTCG', (1384, 10624)),
 ('DestinyTheGame', (1325, 19878)),
 ('seduction', (1316, 8784)),
 ('personalfinance', (1312, 14403)),
 ('photography', (1243, 5157))]

In [40]:
# same for negative subreddits
grouped = grouped.sortBy(lambda x: x[1], ascending = True)
grouped.take(10)

[('AskReddit', (-121714, 589947)),
 ('relationships', (-75276, 352049)),
 ('tifu', (-17853, 52219)),
 ('relationship_advice', (-7130, 50416)),
 ('funny', (-6758, 40171)),
 ('WTF', (-6555, 25781)),
 ('trees', (-6271, 47286)),
 ('AdviceAnimals', (-6257, 40783)),
 ('politics', (-6144, 36518)),
 ('offmychest', (-5298, 17175))]