In [1]:
from pyspark import SparkContext

sc = SparkContext("local", "pyspark")

## Accumulators

When we normally pass functions to Spark, such as a **map()** function or a condition for **filter()**, they can use variables defined outside them in the driver program, but each task running on the cluster gets a new copy of each variable, and updates from these copies are not propagated back to the driver. Spark's shared variables, accumulators and broadcast variables, relax this restriction for two common types of communication patterns: 

1. **Aggregation of results** 
2. **Broadcasts**.

One of the most common uses of accumulators is to count events that occur during job execution for debugging purposes. For example, say that we are loading a list of all of the call signs for which we want to retrieve logs from a file, but we are also interested in how many lines of the input file were blank.

In [2]:
file = sc.textFile("callsign.txt")

blankLines = sc.accumulator(0)

def extractCallSigns(line):
    global blankLines
    if (line == ""):
        blankLines += 1
    return line.split(" ")

callSigns = file.flatMap(extractCallSigns)

callSigns.first()

u'W8PAL'

In [3]:
print "Blank lines: %d" % blankLines.value

Blank lines: 0


Accumulator's value becomes valid **ONLY** after the an action is run on rdd.

To use accumulator in spark:

1. We create them in the driver by calling the **SparkContext.accumulator**(**initial Value**) method, which produces an accumulator holding an initial value. The return type is an **org.apache.spark.Accumulator[T]** object, where **T** is the type of **initialValue**.
2. Worker code in Spark closures can add to the accumulator with its **+=** method.
3. The driver program can call the **value** property on the accumulator to access its value.

Tasks on worker nodes cannot access the accumulator's **value**(). The value of accumulator is available **only in the driver program**.

**Example1: ham radio**

In [4]:
import re

validSignCount = sc.accumulator(0)
invalidSignCount = sc.accumulator(0)

def validateSign(sign):
    global validSignCount, invalidSignCount
    if re.match(r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1,3}\Z", sign):
        validSignCount += 1
        return True
    else:
        invalidSignCount += 1
        return False

validSigns = callSigns.filter(validateSign)
contactCount = validSigns.map(lambda sign: (sign, 1)).reduceByKey(lambda (x, y): x + y)

contactCount.collect()

[(u'KK6JKQ', 1),
 (u'K2AMH', 1),
 (u'VE2UN', 1),
 (u'OH2TI', 1),
 (u'N7ICE', 1),
 (u'VE2CUA', 1),
 (u'UA1LO', 1),
 (u'W8PAL', 1),
 (u'W6BB', 1),
 (u'GB1MIR', 1),
 (u'VE3UOW', 1)]

In [5]:
validSignCount.value, invalidSignCount.value

(11, 0)

**Example2:**

In [6]:
accum = sc.accumulator(0)

sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))

accum.value

10

### Accumulators and Fault Tolerance

If an rdd were to be **reevaluated** (failed or evicted from cache):

1. If an accumulator was updated in an **action** on this rdd, reevaluation would **not** update the value of accumulator again (**safe**).
2. If an accumulator was updated in a **transformation** on this rdd, reevaluation would update the value of accumulator again (**unsafe**). 

### Custom Accumulators

Only works on operations that are **associative** and **commutative**

In [7]:
# Find minimum value using custom accumulator

import sys
from pyspark import AccumulatorParam

class MinAccumulatorParam(AccumulatorParam):
    def zero(self, initialValue):
        return initialValue
    def addInPlace(self, v1, v2):
        return v1 if v1 < v2 else v2

accum = sc.accumulator(sys.maxint, MinAccumulatorParam())

rdd = sc.parallelize([3, -1, 4, -1, 5, -9, 2, -6])
rdd.foreach(lambda x: accum.add(x))

accum.value

-9

## Broadcast Variables

Spark's second type of shared variable, broadcast variables, allows the program to efficiently send a **large**, **read-only** value to all the worker nodes for use in one or more Spark operations

In [8]:
br = sc.broadcast({"a": 3, "b": -4, "c": -1, "d": 5, "e": 9})
rdd = sc.parallelize(list("abcde"))

observedSizes = rdd.map(lambda x: br.value[x])

observedSizes.collect()

[3, -4, -1, 5, 9]

The process of using broadcast variables is simple:
1. Create a **Broadcast[T]** by calling **SparkContext.broadcast** on an object of type **T**. Any type works as long as it is also Serializable.
2. Access its value with the **value** property.
3. The variable will be sent to each node only once, and should be treated as read-only (updates will not be propagated to other nodes).

### Optimizing Broadcasts

It is important to choose a data serialization format that is both fast and compact.

## Working on a Per-Partition Basis

Working with data on a per-partition basis allows us to avoid redoing setup work for each data item. Operations like opening a database connection or creating a random-number generator are examples of setup steps that we wish to avoid doing for each element. Spark has per-partition versions of map and foreach to help reduce the cost of these operations by letting you run code only once for each partition of an RDD.

In [9]:
import urllib3
import json

def processCallSigns(signs):
    """Lookup call signs using a connection pool"""
    # Create a connection pool
    http = urllib3.PoolManager()
    # the URL associated with each call sign record
    urls = map(lambda x: "http://73s.com/qsos/%s.json" % x, signs)
    # create the requests (non-blocking)
    requests = map(lambda x: (x, http.request('GET', x)), urls)
    # fetch the results
    result = map(lambda x: (x[0], json.loads(x[1].data)), requests)
    # remove any empty results and return
    return filter(lambda x: x[1] is not None, result)

def fetchCallSigns(input):
    """Fetch call signs"""
    return input.mapPartitions(lambda callSigns : processCallSigns(callSigns))

contactsContactList = fetchCallSigns(validSigns)

contactsContactList.first()

(u'http://73s.com/qsos/KK6JKQ.json',
 [{u'address': u'330 N. Mathilda ave #204',
   u'age': None,
   u'arrl_sect': None,
   u'band': u'40m',
   u'callsign': u'KK6JLK',
   u'city': u'SUNNYVALE',
   u'comment': u'',
   u'confirmcode': u'ix3kq728g7w2ns6mpe3plb9d',
   u'confirmtime': None,
   u'contactgrid': u'CM87xj',
   u'contactlat': u'37.384733',
   u'contactlong': u'-122.032164',
   u'contacttime': u'2014-02-08T23:51:00Z',
   u'contestid': None,
   u'continent': None,
   u'country': u'United States',
   u'county': u'Santa Clara',
   u'cqzone': None,
   u'created_at': u'2014-08-13T23:52:06Z',
   u'dxcc': u'291',
   u'email': u'',
   u'event': u'',
   u'frequency': u'',
   u'fullname': u'MATTHEW McPherrin',
   u'id': 57779,
   u'image': u'',
   u'iota': None,
   u'ituzone': None,
   u'mode': u'FM',
   u'mygrid': u'CM87ss',
   u'mylat': u'37.7519528215759',
   u'mylong': u'-122.42086887359619',
   u'notes': None,
   u'operator': None,
   u'propmode': None,
   u'qslmessage': None,
   u'qs

In addition to avoiding setup work, we can sometimes use **mapPartitions()** to avoid object creation overhead. Sometimes we need to make an object for aggregating the result that is of a different type.

When we computed the average of a list of numbers, one of the ways we did this was by converting our RDD of numbers to an RDD of tuples so we could track the number of elements processed in our reduce step. 

**Average without mapPartitions() in Python:**

In [10]:
def combineCtrs(c1, c2):
    return (c1[0] + c2[0], c1[1] + c2[1])

nums = sc.parallelize(range(10), 2)
nums.map(lambda num: (num, 1)).reduce(combineCtrs)

(45, 10)

Instead of doing this for each element, we can instead create the tuple once per partition.

**Average with mapPartition() in Python**

In [11]:
def partitionCtr(nums):
    sumCount = [0, 0]
    for num in nums:
        sumCount[0] += num
        sumCount[1] += 1

    return [sumCount]

sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs)

sumCount[0] / float(sumCount[1])

4.5

**mapPartitions(f, preservesPartitioning=False)**

Return a new RDD by applying a function to each partition of this RDD.

In [12]:
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)

def f(iterator): yield sum(iterator)
    
rdd.mapPartitions(f).collect()

[15, 40]

## Numeric RDD Operations

Spark provides several descriptive statistics operations on RDDs containing numeric data. 

Spark's numeric operations are implemented with a streaming algorithm that allows for building up our model one element at a time. The descriptive statistics are all computed in a single pass over the data and returned as a StatsCounter object by
calling **stats()**.

In [13]:
import numpy as np
import math

distances = sc.parallelize(np.abs(np.random.rand(100) * 10)).persist()

# Compute statistics uing actions on RDD
print "mean: %f" % distances.mean()
print "std: %f" % distances.stdev()

mean: 4.541629
std: 2.932548


In [14]:
# OR use the StatsCounter() object from RDD
stats = distances.stats()
stddev = stats.stdev()
mean = stats.mean()
reasonableDistances = distances.filter(lambda x: math.fabs(x - mean) < 3 * stddev)

reasonableDistances.take(5)

[6.6849729354803262,
 1.0060285724857188,
 5.6319080113919728,
 6.9631208140385663,
 4.3241829286522639]