<h1><b>Intro to Cluster Computing</b></h1><br/>
Cluster computing, or Grid computing, is all about getting a set of computers to work together and to act like a single system. This system then has each node focused on performing the same task, which can make solving certain tasks much more feasible and efficient, particularly in comparison with the speed or availability of a single computer. Cluster computing has a wide range of applications from small businesses clusters with only a few nodes to some of the fastest supercomputers in the world, like IBM's Sequoia. At the same time, since cluster computing requires software to be purpose-built per task, it is limited in the sense that it is not applicable in casual computing situations. In this lab, we will introduce Apache Hadoop and Apache Spark, which are open source cluster computing frameworks, and will provide some interesting applications of cluster computing in these settings.
<h1><b>Apache Hadoop</b></h1><br/>
Apache Hadoop is a framework built for the purpose of distributed processing of very large data sets on computer clusters built from commodity hardware. The core of Hadoop consists of a storage part, known as Hadoop Distributed File System (HDFS), and a processing part called MapReduce, which you will become familiar with throughout this lab. Hadoop splits files into large blocks and distributes them across nodes in a cluster, so each computer in the cluster can do a fraction of the work, improving efficiency. However, the MapReduce cluster computing paradigm forces a particular linear dataflow structure on distributed programes, which limits the effectiveness of Hadoop, leading to the creation of various modules to overcome these limitations. Particularly, Apache Spark was founded in this endeavor, and is the current framework where much of Hadoop is used. In fact, many would say that Hadoop currently exists only to facilitate the use of Spark in applications.
<h1><b>Apache Spark</b></h1><br/>
Specifically, Apache Spark is (like Hadoop) an open source cluster computing framework, while at the same time being a module of Hadoop. Spark needs to be installed on top of Hadoop, and is part of Hadoop's "ecosystem." While in Hadoop alone, MapReduce programs read input data from disk, map a function across the data, reduce the results of the map, and store reduction results on disk, Spark's RDDs function as a working set for distrubuted programs that offers a (deliberately) restricted form of distributed shared memory. Actually, the avaiability of RDDs (which will be defined shortly) facilitates the implementation of both iterative algorithms and data analysis, making it much more versatile in applications. In fact, the latency of such applications (compared to Hadoop alone) may be reduced by several orders of magnitude! Particularly, Spark specializes in being used to train algorithms for machine learning systems, and was one of the main inspirations for its creation. Spark's usefulness in machine learning is our motivation for this lab.

Each Spark program first requires instantiating a SparkContext, which represents our connection to the cluster. The first argumement is the URL to the cluster (it will be `"local"` for all our examples) and the second arguement is merely a name for our application

In [None]:
from pyspark import SparkContext
sc = SparkContext("local", "My App")

It requires significant setup to run Spark from an ipython notebook. However, if Spark is installed, we can easily run an application on a temporary local cluster using `spark-submit`

In [None]:
spark-submit --master local[4] sparkApp.py

This spins up a master spark node with 4 local processors.

<h1><b>Introduction to RDD's.</b></h1><br/>
An RDD is short for Resilient Distributed Dataset. We can create an RDD with Spark, and it will be broken up into chunks and stored on multiple devices. We can then perform parallelized operations on RDD's almost as if it were a single, local, file on one device, without having to worry about the details of multi-processing. This is the power of Spark. This results in very elegant, intuitive source code.
<br/><br/>
Spark can easily work with files distributed across several nodes in a network using the Hadoop Distributed File System. However, in this lab we will only be using simple local text files.

For example, we can create a simple RDD with the following code:


In [None]:
from pyspark import SparkContext
sc = SparkContext("local", "My App")
data = sc.textFile("example.txt")
"""example.txt:
hello
this is a text file
another line
more lines
"""
print data.collect() #Show that data

The function `SparkContext.textFile` creates an RDD from a text file, which atomatically splits it by line into an RDD of strings. Printing `data.collect()` reveals the RDD `data` to be a series of unicode strings


Problem 1: Create an RDD from the textfile nums.txt, and print it, following the example code above. Save your code in a file nums.py, as you will be performing further operations on the dataset nums.txt.

Solution:

In [None]:
from pyspark import SparkContext
sc = SparkContext("local", "My App")
data = sc.textFile("nums.txt")

There are many basic operations that can be performed on RDDs. A few useful ones include `RDD.collect()`, which returns a list of all elements in the RDD; `RDD.count()`, which gives the number of elements; and `RDD.take(x)` will return the first `x` elements of the RDD.



In [None]:
data.reduce(lambda a,b:float(a)+float(b))

will add up all elements in the data set (first converting them to float), giving the sum of all elements in the RDD.

Problem 2: RDD Actions.
Modify the nums.py file you created in Problem 1 to print the following statistics about the dataset.<br/>
(i) Number of lines in the RDD. <br/>
(ii) The first 5 elements of the dataset.<br/>
(iii) The lexicographically first string in the dataset. (HINT: use reduce with max)<br/>
(iv) The entire dataset, as an array.<br/>
<br/>
Solution:

In [None]:
from pyspark import SparkContext
sc = SparkContext("local", "My App")
data = sc.textFile("nums.txt")
print "Num Lines: %d" % data.count()
print data.take(5)
print data.reduce(max)
print data.collect()

The core functionality of Spark is the MapReduce algorithm, which is used for massively parallelized . In the MapReduce algorithm a function is first *mapped* to many nodes. In Spark this is done using `RDD.map(f)`, which also accepts a function `f` to apply to each element of the RDD.

Since in spark the data distributed among processors, the it can be processed in parallel. Eventually however, we often need to recombine this data into a single output, for example a sum, maximum, or average. This is done by *reducing* the data. In Spark we use `RDD.reduce(f)` we  two variables with one output and applies this function in a tree-like fashion to all elements in parallel. 

For example, the following

In [None]:
fdata = data.map(float)
fdata.reduce(lambda a,b:float(a)+float(b))

will map the function `float` to all elements of the RDD then sum up all elements in parallel, working up in a tree.

Another function that can be useful is the `RDD.filter` function, which allows you to filter the data set based on a boolean function. For example,

In [None]:
print fData.filter(lambda a: a < 50.0).count()

will first filter only the elements under fifty, then give us the number of those elements.

Problem 3: RDD transformations<br/>
Further extend your file, nums.py to answer the following questions about your dataset, using RDD transformations (filter, reduce, map).<br/>
(i) The number of even numbers (Hint: what map function will put the number in a form that is easy to query for evens?).<br/>
(ii) The sum of the numbers in the dataset.<br/>
(iii) The sum of the squares of the dataset.<br/><br/>
Solutions:

In [None]:
from pyspark import SparkContext
from math import log

sc = SparkContext("local", "My App")
data = sc.textFile("nums.txt")
print "Num Lines: %d" % data.count()
print "First five elements.", data.take(5)
print "Maximum Element " + data.reduce(max)
print "Total dataset ", data.collect()

intData = data.map(int)
print "Numbers less than 50: %d" % intData.filter(lambda a: a).count()
print "Sum %d " % intData.reduce(lambda a,b : a+b)
print "Sum Of Squares %d" % intData.map(lambda x: x*x).reduce(lambda a,b: a+b)

## Pair RDDs
Spark also has the ability to create RDDs out of key/value pairs. Using the `reduceByKey` function, we can combine and reduce results from a pair RDD by key.

Creating a pair RDD from a normal RDD is simple: we simply take each element from the original RDD and create a tuple key/value pair from each element using the `map` function. For example, if the file `pairs.txt` contains a list of letter-number pairs, we can load this into a pair RDD using the following:

In [None]:
from pyspark import SparkContext

sc = SparkContext("local", "My App")
data = sc.textFile("pairs.txt")

def mapper(line):
    key,value = line.split()
    return key,int(value)
pairData = data.map(mapper)
print pairData.collect()

We could then query our pair RDD for the max value *per key* using `RDD.reduce`

In [None]:
maxes = pairData.reduce(max)
print maxes.collect()

Note that maxes is an RDD with as many elements as there were keys in `pairData`.

Problem 4: Basic Map-Reduce<br/>
Create a new file, words.py. Create an RDD from words.txt. By following the steps below, use MapReduce to compute the number of words start with each letter.<br/>
(i) Create a mapper function map that maps a word to a key-value pair where the key is the first letter of the word, and the value is simply 1, as in the preceding example. (TODO write an example)<br/>
(ii) Create a reducer function that simply sums two values.<br/>
(iii) Using the operations map, reduceByKey, and collect, determine, for each letter, the number of words that start with that letter.<br/><br/>
Solutions

In [None]:
from pyspark import SparkContext

corpus = "words.txt" 
sc = SparkContext("local", "Simple App")
#Create the RDD
data = sc.textFile(corpus).cache()


def mapper(w):
  return (w[0], 1)

def reducer(a,b):
 return a+b

print data.map(mapper).reduceByKey(reducer).collect()

In some cases we need to split each element of an RDD into smaller elements. For example, our original RDD may contain the lines of a text file as it's elements, but we need an RDD with individual words as the elements. This can be accomplished with `map`'s cousin, `flatmap`. `RDD.flatmap` works exactly like map except if the output of the mapping function is a list, each element of the list will be added to the RDD as a separate element. We can convert our RDD of lines into an RDD of words like this

In [None]:
from pyspark import SparkContext

sc = SparkContext("local", "My App")
data = sc.textFile("text.txt")

words = data.flatMap(lambda line:line.split())
print data.count()
print words.count()

Problem 5: Advanced Map-Reduce with bigrams.<br/>A bigram is a two-character string, like "aa" or "bj". The purpose of this problem is to determine frequencies of each bigram in the Brown Corpus. Bigram frequencies are used in crytographic attacks against substitution ciphers.<br/> 
(i) Write a function bigramMapper that takes in a string and returns a list of all two-character strings in the string. For example, for the input "alpha" it should return ["al", "lp", "ph", "ha"].<br/>
(ii) Use your mapper function in conjunction with flatMap to create an RDD with bigrams as keys and 1 as values.<br/>
(iii) Use reduceByKey to find the frequencies of each bigram, and then print out the probability of any given bigram occuring. Congratulations! You are now ready to use Spark for evil, brute-force cryptographic purposes!
<br/><br/>
Solution:

In [None]:
from pyspark import SparkContext

corpus = "/home/benjamin/ACME_Spark/words.txt" 
sc = SparkContext("local", "Simple App")
#Create the RDD
data = sc.textFile(corpus).cache()

def bigramMapper(w):
  return [w[i:i+2] for i in xrange(len(w)-1)]

bigrams = data.flatMap(bigramMapper)
numBigrams = float(bigrams.count())
bigram_freqs = bigrams.map(lambda w: (w,1)).reduceByKey(lambda a,b: a+b).collect()

for bigram, freq in bigram_freqs:
  print "Probability of bigram "+bigram+" "+str(freq/numBigrams)