## Word Count
Counting the number of occurances of words in a text is one of the most popular first exercises when learning Map-Reduce Programming. It is the equivalent to `Hello World!` in regular programming.

We will do it two way, a simpler way where sorting is done after the RDD is collected, and a more sparky way, where the sorting is also done using an RDD.

In [1]:
# First, check that the text file is where we expect it to be
%ls ../Data/Moby-Dick.txt

../Data/Moby-Dick.txt


### Read the text file into an RDD
Note that, as execution is Lazy, this does not necessarily mean that actual reading of the file content has occured.

In [2]:
%%time
text_file = sc.textFile(u'../Data/Moby-Dick.txt')
type(text_file)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 555 ms


### Count the words
Next, we count the number of words that occured in the text. Again, this is only setting the plan.

In [3]:
%%time
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
type(counts)

CPU times: user 12 ms, sys: 4 ms, total: 16 ms
Wall time: 219 ms


### Have a look a the execution plan
Note that the earliest node in the dependency graph is the file `../Data/Moby-Dick.txt`. It is possible that that even the first element in that file has not yet been read!

In [4]:
print counts.toDebugString()

(2) PythonRDD[6] at RDD at PythonRDD.scala:43 []
 |  MapPartitionsRDD[5] at mapPartitions at PythonRDD.scala:374 []
 |  ShuffledRDD[4] at partitionBy at NativeMethodAccessorImpl.java:-2 []
 +-(2) PairwiseRDD[3] at reduceByKey at <timed exec>:1 []
    |  PythonRDD[2] at reduceByKey at <timed exec>:1 []
    |  MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 []
    |  ../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 []


### Count!
Finally we count the number of times each word has occured.
Note that this cell, finaally, the Lazy execution model finally performs some actual work.

In [5]:
%%time
Count=counts.count()
Sum=counts.map(lambda (w,i): i).reduce(lambda x,y:x+y)
print 'count=%f, sum=%f, mean=%f'%(Count,Sum,float(Sum)/Count)

count=33782.000000, sum=219480.000000, mean=6.496951
CPU times: user 16 ms, sys: 0 ns, total: 16 ms
Wall time: 4.65 s


In [6]:
%%time
def SUM(x,y):
    return x+y
Count=counts.count()
Sum=counts.map(lambda (w,i): i).reduce(SUM)
print 'count=%f, sum=%f, mean=%f'%(Count,Sum,float(Sum)/Count)

count=33782.000000, sum=219480.000000, mean=6.496951
CPU times: user 12 ms, sys: 8 ms, total: 20 ms
Wall time: 219 ms


### Collect the `Sum` RDD into the driver node
This also takes significant work.

In [7]:
%%time
C=counts.collect()
type(C)

CPU times: user 12 ms, sys: 8 ms, total: 20 ms
Wall time: 145 ms


### Sort 
Now that we have collected the Sum RDD into the driver node, we no longer rely on Spark. The following two cells
are simple python commands.

In [8]:
C.sort(key=lambda x:x[1])
print 'most common words',C[-10:]
print 'Least common words',C[:10]

most common words [(u'I', 1724), (u'his', 2415), (u'that', 2693), (u'in', 3878), (u'', 4347), (u'to', 4510), (u'a', 4533), (u'and', 5951), (u'of', 6587), (u'the', 13766)]
Least common words [(u'funereal', 1), (u'unscientific', 1), (u'lime-stone,', 1), (u'shouted,', 1), (u'pitch-pot,', 1), (u'cod-liver', 1), (u'prices', 1), (u'prefix', 1), (u'boots."', 1), (u'slew.', 1)]


### Compute the mean number of occurances per word.

In [9]:
Count2=len(C)
Sum2=sum([i for w,i in C])
print 'count2=%f, sum2=%f, mean2=%f'%(Count2,Sum2,float(Sum2)/Count2)


count2=33782.000000, sum2=219480.000000, mean2=6.496951


## Word Count in Pure Spark
We now show how to perform word count, including sorting, using RDDs, returning to the driver node just the top 10 words.

In [10]:
%%time
RDD=text_file.flatMap(lambda x: x.split(' '))\
    .filter(lambda x: x!='')\
    .map(lambda word: (word,1))

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 66 µs


In [11]:
%%time
RDD1=RDD.reduceByKey(lambda x,y:x+y)

CPU times: user 4 ms, sys: 4 ms, total: 8 ms
Wall time: 21.2 ms


In [12]:
%%time
RDD2=RDD1.map(lambda (c,v):(v,c))

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 26 µs


In [13]:
%%time
RDD3=RDD2.sortByKey(False)

CPU times: user 16 ms, sys: 4 ms, total: 20 ms
Wall time: 750 ms


In [14]:
print RDD3.toDebugString()

(2) PythonRDD[21] at RDD at PythonRDD.scala:43 []
 |  MapPartitionsRDD[20] at mapPartitions at PythonRDD.scala:374 []
 |  ShuffledRDD[19] at partitionBy at NativeMethodAccessorImpl.java:-2 []
 +-(2) PairwiseRDD[18] at sortByKey at <timed exec>:1 []
    |  PythonRDD[17] at sortByKey at <timed exec>:1 []
    |  MapPartitionsRDD[14] at mapPartitions at PythonRDD.scala:374 []
    |  ShuffledRDD[13] at partitionBy at NativeMethodAccessorImpl.java:-2 []
    +-(2) PairwiseRDD[12] at reduceByKey at <timed exec>:1 []
       |  PythonRDD[11] at reduceByKey at <timed exec>:1 []
       |  MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 []
       |  ../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 []


In [15]:
%%time
RDD3.take(10)

CPU times: user 0 ns, sys: 4 ms, total: 4 ms
Wall time: 229 ms


[(13766, u'the'),
 (6587, u'of'),
 (5951, u'and'),
 (4533, u'a'),
 (4510, u'to'),
 (3878, u'in'),
 (2693, u'that'),
 (2415, u'his'),
 (1724, u'I'),
 (1692, u'with')]