# Working with RDD

Note: this notebook is modified from the Spark Fundamentals I - PythonRDD.

For this lesson, let us work on some transformations of RDD we have not gone over in the last lesson

## Joining RDDs

Next, you are going to create RDDs for the README and the CHANGES file.

In [None]:
readmeFile = sc.textFile("/resources/LabData/README.md")
pomFile = sc.textFile("/resources/LabData/pom.xml")

How many Spark keywords are in each file?

In [None]:
print readmeFile.filter(lambda line: "Spark" in line).count()
print pomFile.filter(lambda line: "Spark" in line).count()

Now do a WordCount on each RDD so that the results are (K,V) pairs of (word,count)

In [None]:
readmeCount = readmeFile.                    \
    flatMap(lambda line: line.split(" ")).   \
    map(lambda word: (word, 1)).             \
    reduceByKey(lambda a, b: a + b)
    
pomCount = pomFile.                          \
    flatMap(lambda line: line.split(" ")).   \
    map(lambda word: (word, 1)).            \
    reduceByKey(lambda a, b: a + b)

To see the array for either of them, just call the collect function on it.

In [None]:
print "Readme Count\n"
print readmeCount.collect()

In [None]:
print "Pom Count\n"
print pomCount.collect()

The join function combines the two datasets (K,V) and (K,W) together and get (K, (V,W)). Let's join these two counts together.

In [None]:
joined = readmeCount.join(pomCount)

Print the value to the console

In [None]:
joined.collect()

Let's combine the values together to get the total count

In [None]:
joinedSum = joined.map(lambda k: (k[0], (k[1][0]+k[1][1])))

To check if it is correct, print the first five elements from the joined and the joinedSum RDD

In [None]:
print "Joined Individial\n"
print joined.take(5)

print "\n\nJoined Sum\n"
print joinedSum.take(5)

## Shared variables

Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.

### Broadcast variables

Broadcast variables are useful for when you have a large dataset that you want to use across all the worker nodes. A read-only variable is cached on each machine rather than shipping a copy of it with tasks. Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage.


Read more here: [http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables](http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables)

Create a broadcast variable. Type in:

In [None]:
broadcastVar = sc.broadcast([1,2,3])

To get the value, type in:

In [None]:
broadcastVar.value

### Accumulators

Accumulators are variables that can only be added through an associative operation. It is used to implement counters and sum efficiently in parallel. Spark natively supports numeric type accumulators and standard mutable collections. Programmers can extend these for new types. Only the driver can read the values of the accumulators. The workers can only invoke it to increment the value.

Create the accumulator variable. Type in:

In [None]:
accum = sc.accumulator(0)

Next parallelize an array of four integers and run it through a loop to add each integer value to the accumulator variable. Type in:

In [None]:
rdd = sc.parallelize([1,2,3,4])
def f(x):
    global accum
    accum += x

Next, iterate through each element of the rdd and apply the function f on it:

In [None]:
rdd.foreach(f)

To get the current value of the accumulator variable, type in:

In [None]:
accum.value

You should get a value of 10.

This command can only be invoked on the driver side. The worker nodes can only increment the accumulator.


## Key-value pairs

You have already seen a bit about key-value pairs in the Joining RDD section.

Create a key-value pair of two characters. Type in:

In [None]:
pair = ('a', 'b')

To access the value of the first index use [0] and [1] method for the 2nd.

In [None]:
print pair[0]

print pair[1]

#### Your Turn