### Only Two workers

In [1]:
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from operator import add
import nltk
import json
from time import time
#nltk.download('punkt')
#nltk.download('averaged_perceptron_tagger')

In [2]:
_tstart_stack = []
def tic():
    _tstart_stack.append(time())
def toc(fmt="Elapsed: %s s"):
    print (fmt % (time() - _tstart_stack.pop()))

In [3]:
gram_groups = dict()
gram_groups['Adjectives'] = ['JJ', 'JJR', 'JJS']
gram_groups['Nouns'] = ['NN', 'NNS', 'NNP', 'NNPS']
gram_groups['Pronouns'] = ['PRP', 'PRP$']
gram_groups['Adverbs'] = ['RB', 'RBR', 'RBS']
gram_groups['Verb'] = ['VB', 'VBG', 'VBD', 'VBN', 'VBP', 'VBZ']

def get_split_body(rdd):
    body = rdd\
    .map(lambda line: json.loads(line[1])['body'].strip().split())\
    
    return body

def check_gram_grp(tag_tuple):
    word, tag = tag_tuple
    for supergroup, subgroups in gram_groups.items():
                if tag in subgroups:
                    return supergroup
    return None

def categorize_words(split_rdd):
    cat_words = split_rdd.flatMap(lambda word: nltk.pos_tag(word))\
    .map(lambda tupl: (check_gram_grp(tupl), 1))\
    .filter(lambda x: x[0] != None)
   
    return cat_words

def final_test(rdd):
    split = get_split_body(rdd)
    categorized = categorize_words(split)
    group_counts = categorized.reduceByKey(add).collect()
    return group_counts

In [4]:
# (8 cores, 16gb per machine) x 5 = 40 cores
# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.119:7077") \
        .appName("ScalibilityTest_")\
        .config("spark.executor.memory", "2g")\
        .getOrCreate()
# Old API (RDD)
#        .config("spark.dynamicAllocation.enabled", True)\
#        .config("spark.shuffle.service.enabled", True)\

spark_context = spark_session.sparkContext

In [5]:
#conf = SparkConf()
#conf.setMaster('spark://192.168.2.119:7077')
#conf.setAppName('Strong_Scalibility')
#spark_context = SparkContext(conf=conf)
#spark_context.setLogLevel("DEBUG")

In [6]:
rdd = spark_context.newAPIHadoopFile(
    'hdfs://localhost:9000/user/ubuntu/RC_2006-02',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text'
)

In [7]:
rdd.take(1)

[(0,
  '{"created_utc":1138752114,"author_flair_css_class":null,"score":0,"ups":0,"subreddit":"reddit.com","stickied":false,"link_id":"t3_15xh","subreddit_id":"t5_6","body":"THAN the title suggests.  Whoops.","controversiality":1,"retrieved_on":1473820870,"distinguished":null,"gilded":0,"id":"c166b","edited":false,"parent_id":"t3_15xh","author":"gmcg","author_flair_text":null}')]

#### RDD  info

In [8]:
n = rdd.count()
print(n)

9095


In [9]:
np = rdd.getNumPartitions()
print(np)

1


In [10]:
rdd6 = rdd.repartition(6)
rdd6.getNumPartitions()

6

In [11]:
rdd6.count()

9095

In [12]:
#x = rdd.take(1)[0][1]
#json.loads(x)

#### Create an rdd with half size

In [13]:
rdd_2_3 = rdd6.sample(False, 2/3)

#### Test with full data

In [14]:
tic()
final_test(rdd6)
toc()

Elapsed: 23.275262594223022 s


#### Test with half data

In [15]:
tic()
final_test(rdd_2_3)
toc()

Elapsed: 14.650781869888306 s
