In [36]:
from pyspark.sql import SparkSession
import timeit 


ss_experiment_1 = {'cores': 1, 'dataset': "RC_2009-11.json"}
ss_experiment_2 = {'cores': 2, 'dataset': "RC_2009-11.json"}
ss_experiment_3 = {'cores': 4, 'dataset': "RC_2009-11.json"}
ss_experiment_4 = {'cores': 6, 'dataset': "RC_2009-11.json"}
ss_experiment_5 = {'cores': 8, 'dataset': "RC_2009-11.json"}

ws_experiment_1 = {'cores': 1, 'dataset': "RC_2008-08.json"}
ws_experiment_2 = {'cores': 2, 'dataset': "RC_2009-05.json"}
ws_experiment_3 = {'cores': 4, 'dataset': "RC_2009-11.json"}
ws_experiment_4 = {'cores': 6, 'dataset': "RC_2010-06.json"}
ws_experiment_5 = {'cores': 8, 'dataset': "RC_2010-09.json"}


config = ws_experiment_1
# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.131:7077") \
        .appName("Team7_code.ipynb")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.cores.max",config['cores'])\
        .config("spark.worker.instances",4)\
        .config("spark.executor.cores",1)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

#        .config("spark.executor.memory", "2g")\
#\#)\
# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("INFO")
print("Started Session")

Started Session


############## DATA LOADING ############

In [37]:
def create_spark_dataframe(filename):
    df = spark_session.read.json('hdfs://192.168.2.131:9000/user/ubuntu/{}'.format(filename))
    return df

start_time_0 = timeit.default_timer() 

df = create_spark_dataframe(config['dataset'])
                            
print("execution time {}s".format(timeit.default_timer()-start_time_0))

execution time 28.64332375099184s


Author comments count

In [38]:
from pyspark.sql.functions import desc, col
start_time_0 = timeit.default_timer() 

author_comment_counts = df.groupBy('author').count()
author_comment_counts.cache()
author_comment_counts.sort(desc("count")).show()
print("execution time {}s".format(timeit.default_timer()-start_time_0))

+---------------+------+
|         author| count|
+---------------+------+
|      [deleted]|198037|
|           Ra__|  1357|
|NoMoreNicksLeft|  1344|
|   uteunawaytay|  1246|
|         matts2|  1151|
|       duskglow|  1131|
|     malcontent|  1029|
|        fingers|   967|
|     mexicodoug|   924|
|     dirtymoney|   873|
|      glengyron|   824|
|         h0dg3s|   822|
|    Pikajabroni|   819|
|      Poromenos|   817|
|       mutatron|   802|
|      moogle516|   738|
|     braindrane|   681|
|      otakucode|   649|
|   nixonrichard|   644|
|         Deacon|   610|
+---------------+------+
only showing top 20 rows

execution time 11.628074069973081s


Word count and score

In [39]:
import re
def pre_process_text(text):
    text = text.lower()
    text = text.replace('\r', '')
    text = text.replace('\n', ' ')
    text = text.strip()
    text=text.split(' ')
    text = map(lambda x: re.sub(r'[^a-zA-Z0-9]+', '', x), text)
    text = filter(lambda x: len(x) > 0, text)
    return text

total_time = timeit.default_timer() 

############ WORD COUNTING ###################
start_time_0 = timeit.default_timer() 
df_tmp = df.select(['body','score']).rdd.map(tuple).map(lambda x: (pre_process_text(x[0]), x[1])).flatMap(lambda x: [(y, x[1]) for y in x[0]]).cache()
word_counts =df_tmp.map(lambda x: (x[0], 1)).reduceByKey(lambda x,y: x+y)
print(word_counts.sortBy(lambda x: x[1], ascending=False).take(10))
print("execution time {}s".format(timeit.default_timer()-start_time_0))

################ WORD SCORE ################
start_time_0 = timeit.default_timer() 
word_scores = df_tmp.reduceByKey(lambda x,y: x+y)
print(word_scores.sortBy(lambda x: x[1], ascending=False).take(10))
print("execution time {}s".format(timeit.default_timer()-start_time_0))

################# WORD AVERAGE SCORE ###################
start_time_0 = timeit.default_timer() 
word_avg_score = word_counts.filter(lambda x: x[1]>5).join(word_scores).map(lambda x: (x[0], x[1][1]/x[1][0]))
print(word_avg_score.sortBy(lambda x: x[1], ascending=False).take(100))
print("execution time {}s".format(timeit.default_timer()-start_time_0))

print("Total execution time {}s".format(timeit.default_timer()-total_time))

[('the', 805563), ('to', 491652), ('a', 433181), ('of', 367834), ('and', 362032), ('i', 319635), ('that', 293473), ('is', 290676), ('you', 250066), ('in', 242733)]
execution time 72.39900828100508s
[('the', 3161368), ('to', 1909824), ('a', 1743123), ('and', 1462728), ('of', 1435201), ('i', 1280625), ('that', 1074843), ('is', 1055296), ('in', 962635), ('it', 917759)]
execution time 30.920393443026114s
[('checkitout', 201.0), ('saybro', 166.45454545454547), ('frequenting', 165.0), ('parlour', 158.16666666666666), ('roseburg', 142.85714285714286), ('schwarzschild', 114.25), ('streetview', 109.75), ('vj', 109.0), ('praetorius', 107.17647058823529), ('dialed', 99.3076923076923), ('theatrics', 99.3), ('courted', 98.2), ('fleegman', 94.0), ('lala', 89.0), ('timelocaltime', 86.66666666666667), ('bert', 85.28571428571429), ('nonslip', 81.66666666666667), ('farout', 81.42857142857143), ('piedmont', 80.16666666666667), ('plotdriven', 79.16666666666667), ('gtguy', 78.5), ('hitherto', 76.4615384615

In [33]:
# release the cores for another application!
spark_context.stop()
print("Ended Session")

Ended Session
