# Advanced RDD Operations


In [1]:
import findspark

findspark.init('C:/spark')

In [2]:
from pyspark import SparkContext

In [3]:
sc = SparkContext('local[*]')

In [4]:
rdd = sc.parallelize([(1, 2), (3, 4), (3, 6), (4, 5)])

## countByKey

In [5]:
total = rdd.countByKey()

In [6]:
total

defaultdict(int, {1: 1, 3: 2, 4: 1})

In [7]:
for k, v in total.items(): 
  print("key", k, "has", v, "counts")

key 1 has 1 counts
key 3 has 2 counts
key 4 has 1 counts


## Creating a base RDD and transforming it

In [8]:
file_path = '../data/Complete_Shakespeare.txt'
baseRDD = sc.textFile(file_path)

In [9]:
# Split lines into words
splitRDD = baseRDD.flatMap(lambda x: x.split())

In [10]:
print(f'Total number of words in splitRDD: {splitRDD.count()}')

Total number of words in splitRDD: 961441


In [11]:
stop_words = ['i',
 'me',
 'my',
 'myself',
 'we',
 'our',
 'ours',
 'ourselves',
 'you',
 'your',
 'yours',
 'yourself',
 'yourselves',
 'he',
 'him',
 'his',
 'himself',
 'she',
 'her',
 'hers',
 'herself',
 'it',
 'its',
 'itself',
 'they',
 'them',
 'their',
 'theirs',
 'themselves',
 'what',
 'which',
 'who',
 'whom',
 'this',
 'that',
 'these',
 'those',
 'am',
 'is',
 'are',
 'was',
 'were',
 'be',
 'been',
 'being',
 'have',
 'has',
 'had',
 'having',
 'do',
 'does',
 'did',
 'doing',
 'a',
 'an',
 'the',
 'and',
 'but',
 'if',
 'or',
 'because',
 'as',
 'until',
 'while',
 'of',
 'at',
 'by',
 'for',
 'with',
 'about',
 'against',
 'between',
 'into',
 'through',
 'during',
 'before',
 'after',
 'above',
 'below',
 'to',
 'from',
 'up',
 'down',
 'in',
 'out',
 'on',
 'off',
 'over',
 'under',
 'again',
 'further',
 'then',
 'once',
 'here',
 'there',
 'when',
 'where',
 'why',
 'how',
 'all',
 'any',
 'both',
 'each',
 'few',
 'more',
 'most',
 'other',
 'some',
 'such',
 'no',
 'nor',
 'not',
 'only',
 'own',
 'same',
 'so',
 'than',
 'too',
 'very',
 'can',
 'will',
 'just',
 'don',
 'should',
 'now']

In [12]:
# Convert the words in lowercase and remove stopwords
splitRDD_no_stop = splitRDD.filter(lambda x: x.lower() not in stop_words)

In [14]:
splitRDD_no_stop_words = splitRDD_no_stop.map(lambda w: (w, 1))

# Count of the number of occurences of each word
resultRDD = splitRDD_no_stop_words.reduceByKey(lambda x, y: x + y)

In [15]:
# Display the top 10 words
for word in resultRDD.take(10):
  print(word)

('Project', 79)
('Shakespeare', 5)
('use', 289)
('anyone', 8)
('anywhere', 5)
('United', 15)
('States', 8)
('world', 376)
('restrictions', 2)
('whatsoever.', 3)


In [16]:
resultRDD_swap = resultRDD.map(lambda x: (x[1], x[0]))

In [17]:
# Sort the keys in descending order
resultRDD_swap_sort = resultRDD_swap.sortByKey(ascending=False)

In [18]:
# Show the top 10 most frequent words and their frequencies from the sorted RDD
for word in resultRDD_swap_sort.take(10):
	print("{},{}". format(word[1], word[0]))

thou,4513
thy,3916
shall,3248
good,2169
would,2131
Enter,2023
thee,1894
hath,1719
like,1643
you,,1580
