In [1]:
# Create connection to spark cluster
from pyspark.sql import SparkSession

spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.118:7077") \
        .appName("hadoop_example")\
        .config("spark.dynamicAllocation.enabled", False)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.executor.cores",2)\
        .getOrCreate()

spark_context = spark_session.sparkContext

In [2]:
#load the data from the HDFS cluster.
rdd = spark_context.newAPIHadoopFile(
    'hdfs://192.168.2.118:9000/unzip_input/RC_2011-09',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text'
)
# for weak scalcability test increase no_duplicates otherwise keep no_duplicates = 0
# no_duplicates = 0 for one node 
# no_duplicates = 1 for two nodes
# no_duplicates = 2 for three nodes
# no_duplicates = 3 for four nodes
no_duplicates=0
for i in range(no_duplicates):
    rdd += spark_context.newAPIHadoopFile(
    'hdfs://192.168.2.118:9000/unzip_input/RC_2011-09',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text'
    )

In [3]:
from operator import add
import re

# Convert each line of the file into json format and then filter out lines only from the subbredit AskReddit
import json
rdd_askReddit = rdd\
    .map(lambda line: json.loads(line[1]))\
    .filter(lambda line: line["subreddit"] == "AskReddit")

# Slit the comments on cahracters that aren't letters, numbers or '. 
# Then maps all words into one list insted of one for every comment.
# Removes words that are empty ("").
# Then does a word count using map and reduce.
r = re.compile(r"[^a-zA-Z0-9']")
rdd_word_count = rdd_askReddit\
    .map(lambda line: r.split(line["body"]))\
    .flatMap(lambda word_list: list(word_list))\
    .filter(lambda word: word != '')\
    .map(lambda word: (word.lower(), 1))\
    .reduceByKey(add)

In [4]:
# Measure te runtime of the application usin time and also outputs the top 100 most common word in the Subreddit Askreddit
import time
start = time.time()
print(rdd_word_count.takeOrdered(100, key=lambda x: -x[1]))
end = time.time()
runtime = end - start

[('the', 1845776), ('i', 1449640), ('to', 1394168), ('a', 1368250), ('and', 1191115), ('you', 882045), ('of', 881621), ('it', 763788), ('that', 720639), ('in', 686286), ('is', 599635), ('for', 458880), ('my', 450864), ('was', 398039), ('with', 355510), ('but', 354209), ('on', 353653), ('have', 350931), ('not', 333752), ('this', 325213), ('be', 324019), ('if', 301716), ('are', 295334), ('me', 272701), ('your', 270539), ('just', 260878), ('as', 253776), ('they', 251298), ('or', 242290), ('like', 238776), ('so', 228956), ('at', 227886), ('do', 206153), ('about', 200973), ('out', 200529), ("don't", 199893), ('he', 198363), ("it's", 196166), ('what', 193391), ('all', 190255), ('up', 186517), ('people', 182483), ('get', 180860), ('would', 180676), ('when', 178846), ("i'm", 178807), ('one', 177993), ('can', 168212), ('an', 156360), ('from', 154484), ('we', 151916), ('her', 151517), ('no', 151152), ('there', 147261), ('deleted', 144910), ('them', 142506), ('she', 141771), ('had', 134432), ('th

In [5]:
# Print out the runtime to be able to perfor scalability tests
print(runtime)

104.96110558509827


In [6]:
spark_context.stop()