In [3]:
from pyspark import SparkContext
sc = SparkContext(master="local[2]")

In [4]:
text_file = sc.textFile("Moby-Dick.txt")
text_file

Moby-Dick.txt MapPartitionsRDD[1] at textFile at <unknown>:0

In [6]:
word_list = text_file.flatMap(lambda line: line.split(" "))
word_list.toDebugString().decode().split("\n")

['(2) PythonRDD[3] at RDD at PythonRDD.scala:53 []',
 ' |  Moby-Dick.txt MapPartitionsRDD[1] at textFile at <unknown>:0 []',
 ' |  Moby-Dick.txt HadoopRDD[0] at textFile at <unknown>:0 []']

In [7]:
not_empty = word_list.filter(lambda ne: ne != "")
not_empty.toDebugString().decode().split("\n")

['(2) PythonRDD[4] at RDD at PythonRDD.scala:53 []',
 ' |  Moby-Dick.txt MapPartitionsRDD[1] at textFile at <unknown>:0 []',
 ' |  Moby-Dick.txt HadoopRDD[0] at textFile at <unknown>:0 []']

In [9]:
key_values = not_empty.map(lambda word: (word, 1))
key_values.toDebugString().decode().split("\n")

['(2) PythonRDD[5] at RDD at PythonRDD.scala:53 []',
 ' |  Moby-Dick.txt MapPartitionsRDD[1] at textFile at <unknown>:0 []',
 ' |  Moby-Dick.txt HadoopRDD[0] at textFile at <unknown>:0 []']

In [10]:
counts = key_values.reduceByKey(lambda a,b: a+b)
counts.toDebugString().decode().split("\n")

['(2) PythonRDD[10] at RDD at PythonRDD.scala:53 []',
 ' |  MapPartitionsRDD[9] at mapPartitions at PythonRDD.scala:133 []',
 ' |  ShuffledRDD[8] at partitionBy at <unknown>:0 []',
 ' +-(2) PairwiseRDD[7] at reduceByKey at <ipython-input-10-ca778e8096d1>:1 []',
 '    |  PythonRDD[6] at reduceByKey at <ipython-input-10-ca778e8096d1>:1 []',
 '    |  Moby-Dick.txt MapPartitionsRDD[1] at textFile at <unknown>:0 []',
 '    |  Moby-Dick.txt HadoopRDD[0] at textFile at <unknown>:0 []']

In [11]:
total = counts.count()
print("Different words : {}".format(total))

Different words : 33781


In [12]:
words_sum = counts.map(lambda x: x[1]).reduce(lambda x,y: x+y)
print("Total words in the file : {}".format(words_sum))

Total words in the file : 215133


In [13]:
mean_occ = words_sum / total
print("Mean occurence of words : {}".format(mean_occ))

Mean occurence of words : 6.368461561232645


In [14]:
# Finding most common words
list_of_words = counts.collect()

In [20]:
# We are not using parallelism in this approach.
list_of_words.sort(key=lambda x:x[1])
print("Most common words \n" + "\n".join(["{} : {}".format(x[0], x[1]) for x in reversed(list_of_words[-5:])]))

Most common words 
the : 13766
of : 6587
and : 5951
a : 4533
to : 4510


In [21]:
# Collect top 5 elements using purely spark
reverse_count = counts.map(lambda x: (x[1], x[0]))
reverse_count.toDebugString().decode().split("\n")

['(2) PythonRDD[13] at RDD at PythonRDD.scala:53 []',
 ' |  MapPartitionsRDD[9] at mapPartitions at PythonRDD.scala:133 []',
 ' |  ShuffledRDD[8] at partitionBy at <unknown>:0 []',
 ' +-(2) PairwiseRDD[7] at reduceByKey at <ipython-input-10-ca778e8096d1>:1 []',
 '    |  PythonRDD[6] at reduceByKey at <ipython-input-10-ca778e8096d1>:1 []',
 '    |  Moby-Dick.txt MapPartitionsRDD[1] at textFile at <unknown>:0 []',
 '    |  Moby-Dick.txt HadoopRDD[0] at textFile at <unknown>:0 []']

In [22]:
sort_keys = reverse_count.sortByKey(ascending=False)
sort_keys.toDebugString().decode().split("\n")

['(2) PythonRDD[20] at RDD at PythonRDD.scala:53 []',
 ' |  MapPartitionsRDD[19] at mapPartitions at PythonRDD.scala:133 []',
 ' |  ShuffledRDD[18] at partitionBy at <unknown>:0 []',
 ' +-(2) PairwiseRDD[17] at sortByKey at <ipython-input-22-add8bf707f50>:1 []',
 '    |  PythonRDD[16] at sortByKey at <ipython-input-22-add8bf707f50>:1 []',
 '    |  MapPartitionsRDD[9] at mapPartitions at PythonRDD.scala:133 []',
 '    |  ShuffledRDD[8] at partitionBy at <unknown>:0 []',
 '    +-(2) PairwiseRDD[7] at reduceByKey at <ipython-input-10-ca778e8096d1>:1 []',
 '       |  PythonRDD[6] at reduceByKey at <ipython-input-10-ca778e8096d1>:1 []',
 '       |  Moby-Dick.txt MapPartitionsRDD[1] at textFile at <unknown>:0 []',
 '       |  Moby-Dick.txt HadoopRDD[0] at textFile at <unknown>:0 []']

In [23]:
result = sort_keys.take(5)
print("Most common words \n" + "\n".join(["{} : {}".format(x[0], x[1]) for x in result]))

Most common words 
13766 : the
6587 : of
5951 : and
4533 : a
4510 : to


## Result
1. Collecting and performing operations at the head node does not scale.
2. Using RDDs to the end increases efficiency of the operations being performed.