### Setup

In [1]:
from pyspark.sql import SparkSession
from operator import add

import json

spark_session = SparkSession\
        .builder\
        .master("spark://de1:7077") \
        .appName("simon_pettersson_de1_p")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-03-14 11:53:08,330 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2022-03-14 11:53:08,630 WARN util.Utils: Service 'sparkDriver' could not bind on port 9998. Attempting port 9999.
2022-03-14 11:53:08,864 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2022-03-14 11:53:09,174 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10005. Attempting port 10006.
2022-03-14 11:53:09,269 WARN spark.ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.


In [2]:
import json

posts = spark_context.textFile("hdfs://127.0.0.1:9000/RC_2005-12.bz2").map(lambda x: json.loads(x))

### Print a sample from the loaded JSON file

In [12]:
print('Sample JSON entry from read file:')
print(str(posts.take(1)))

Sample JSON entry from read file:
[{'controversiality': 0, 'body': 'A look at Vietnam and Mexico exposes the myth of market liberalisation.', 'subreddit_id': 't5_6', 'link_id': 't3_17863', 'stickied': False, 'subreddit': 'reddit.com', 'score': 2, 'ups': 2, 'author_flair_css_class': None, 'created_utc': 1134365188, 'author_flair_text': None, 'author': 'frjo', 'id': 'c13', 'edited': False, 'parent_id': 't3_17863', 'gilded': 0, 'distinguished': None, 'retrieved_on': 1473738411}]


### Define functions used to map posts to lists of words and controversiality scores

In [4]:
import re

def get_words(post):
    content = post.get('body')
    
    if content is None:
        return None
    
    return [ re.sub(r'[^a-zA-Z0-9]', '', word).lower() for word in content.split(" ") ]

def get_words_controversiality(post):
    words = get_words(post)
    controversiality = post.get('controversiality')
    
    if words is None or controversiality is None:
        return None
    
    return [ (word, controversiality) for word in words ]

### Map posts to its words along with the controversiality of the post

In [5]:
words = posts.flatMap(lambda post: get_words_controversiality(post)).filter(lambda entry: entry is not None)

### Append a count to each word specifying its number of occurances

In [6]:
words_with_counts = words.map(lambda entry: (entry[0], (entry[1], 1)))

### Reduce words to combine their controversiality and count

In [7]:
reduced = words_with_counts.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) 

### Remove words occuring less than N times, to filter out words with insufficient data

In [8]:
min_occurances = 100
filtered_words = reduced.filter(lambda word: word[1][1] >= min_occurances)

### Calculate percentage of how many of the words that occur in a controversial post, and sort them in descending order

In [9]:
word_percentages = filtered_words\
    .map(lambda word: (word[0], word[1][0] / word[1][1]))\
    .sortBy(lambda word: word[1], ascending=False)

### Print the results

In [10]:
# Number of words to print
num = 100
print(f'Printing top {num} most controversial words')

# Get subset of length num from start of sorted word list
top_words = word_percentages.take(num)

# Find the length of the longest word to make formatting prettier
max_length = max([ len(word[0]) for word in top_words ])

# Define a format for printing words, where words are left-padded with spaces 
# to become max_length long
print_format = '{:>' + str(max_length) + '}: {:<8}'

# Iterate over all top words and print them
for entry in top_words:
    # Find the word from the tuple
    word = entry[0]
    
    # Multiply by 100 and round to two decimal points
    percentage = round(entry[1]*100, 2)
    
    # Print the word and percentage using the previously defined format
    print(print_format.format(\
                entry[0], \
                str(percentage) + '%'))

Printing top 100 most controversial words
because: 12.0%   
    his: 11.21%  
     he: 9.46%   
    who: 9.24%   
     im: 9.02%   
   some: 8.97%   
     me: 8.45%   
 people: 8.24%   
 really: 7.96%   
   only: 7.84%   
    was: 7.76%   
    how: 7.56%   
   like: 7.37%   
      i: 7.24%   
    are: 7.23%   
  about: 7.17%   
   know: 6.8%    
   what: 6.31%   
  which: 6.19%   
  think: 6.15%   
    you: 6.05%   
     is: 5.82%   
   just: 5.78%   
deleted: 5.56%   
  their: 5.51%   
     do: 5.46%   
   with: 5.42%   
   this: 5.36%   
    not: 5.35%   
       : 5.31%   
     to: 5.2%    
  would: 5.16%   
     on: 5.05%   
    but: 5.03%   
article: 4.9%    
   that: 4.81%   
  there: 4.8%    
    can: 4.73%   
     my: 4.73%   
     it: 4.72%   
    its: 4.66%   
   they: 4.65%   
      a: 4.6%    
     at: 4.59%   
     no: 4.59%   
    one: 4.58%   
   good: 4.55%   
     so: 4.5%    
     an: 4.42%   
    and: 4.29%   
     of: 4.24%   
     in: 4.18%   
    the: 4.17%   
   d