# Homework 3: PySpark - II
### Due: Tuesday April 25, 2017, 11:59 PM
### Note: **This homework is to be done individually!  Do not modify any existing method signatures.**
### **This is the second of two .ipynb files in this homework.

In [1]:
## On some computers it may be possible to run this lab 
## locally by using this script; you will need to run
## this each time you start the notebook.


import sys
import os
import zipfile

if not os.path.exists("cs186_spark.zip"):
    print "Downloading Spark"
    !wget http://sist.shanghaitech.edu.cn/faculty/hexm/cs186_spark.zip

if not (os.path.isdir("cs186_spark") and os.path.exists("cs186_spark")):
    print "Extracting Spark"
    with zipfile.ZipFile("cs186_spark.zip","r") as zip_ref:
        zip_ref.extractall("./")

sys.path.append(os.path.join(os.getcwd(), 'cs186_spark', 'python', 'lib', 'pyspark.zip'))
sys.path.append(os.path.join(os.getcwd(), 'cs186_spark', 'python', 'lib', 'py4j-0.9-src.zip'))
os.environ["SPARK_HOME"] = os.path.join(os.getcwd(), 'cs186_spark')
import getpass
if getpass.getuser()=='jane':
    os.environ["JAVA_HOME"] = "/usr/lib/java/jre1.8.0_91/"

In [2]:
import pyspark
from utils import SparkContext as sc

In [3]:
from utils import CleanRDD
from utils import tests

# Part 3: CacheMap

In this part, we'll construct an rdd that is backed by a `ClockMap` and will behave like `rdd.map(func)`.  
First, implement the `ClockMap` class so that it maintains a cache (of limited `cacheSize`) using the clock replacement policy.

### * BEGIN STUDENT CODE *

In [4]:
class ClockMap:
    
    def __init__(self, cacheSize, func):
        """
        Do not change existing variables.
        [Optional] You are free to add additional items and methods.
        """
        self.cacheSize = cacheSize
        self.fn = func
        self._p = 0 # pointer
        self._increments = 0
        self._miss_count = 0
        self.buffers = [[None, 0] for x in range(cacheSize)]
        self.items_to_index = {}
        
    def _increment(self):
        """
        Do not change this method.
        Updates the clock pointer. The modulo maintains the clock nature.
        """
        self._increments += 1
        self._p = (self._p + 1) % self.cacheSize

    def __getitem__(self, k):
        """
        Returns func(k) using the buffer to cache limited results.
        
        :param k: Value to be evaluated
        
        >>> clock = ClockMap(4, lambda x: x ** 2)
        >>> clock[4]
        16
        >>> clock[3]
        9
        >>> clock._p
        2
        """
        #check if the input key is already in the cache
        if k in self.items_to_index.keys():
            to_find_index = self.items_to_index[k]
            #set bit to 1
            self.buffers[to_find_index][1] = 1
            #return the result
            return self.fn(k)
        #otherwise,cache miss
        else:
            self._miss_count = self._miss_count + 1
            #now we need to move the pointer to find a correct place yo replace
            while self.buffers[self._p][1] != 0:
                self.buffers[self._p][1] = 0
                self._increment()
            #now we find a place to replace,change the information
            #delete the original one
            if self.buffers[self._p][0] != None:
                to_delete_index = self.buffers[self._p][0]
                del self.items_to_index[to_delete_index]
            #replace and move the poiter
            self.buffers[self._p] = [k,1]
            self.items_to_index[k] =self._p
            self._increment()
            #return the result
            return self.fn(k)
        #pass #TODO

In [5]:
def cacheMap(rdd, cacheSize, func):
    """
    Returns an RDD that behaves like rdd.map(func) but
    is implemented using the ClockMap.
    
    :param rdd: Given RDD
    :param cacheSize: Number of cache/buffer pages in the ClockMap
    :param func: Function to map with
    """
    def cache_generator(index,iterator):
        cache_map = ClockMap(cacheSize,func)
        for it in iterator:
            yield cache_map[it]
    return rdd.mapPartitionsWithIndex(cache_generator)
    #pass #TODO

### * END STUDENT CODE *

Free test for you!

In [6]:
clock = ClockMap(4, lambda x: x ** 2)
print clock[4], clock[3]
print clock._p

16 9
2


Output should be 
```
16, 9
2
```

# Part 4: External Algorithms

You'll need an understanding of the partitioning step of external hashing, and the divide step of external sorting (recall the lecture on external algorithms).

In [7]:
from utils import *
import itertools
import bisect
import os

The following are some tools you may want to use (examples use cases included). You should Google the unfamiliar ones!

In [8]:
# itertools.islice
generator = (y for y in range(100))
test1 = itertools.islice(generator, 5)
next(test1)

0

In [9]:
# heapq.merge
generator1 = (odd for odd in range(100) if odd % 2)
generator2 = (even for even in range(100)[::2])
key = lambda x: x
test2 = heapq.merge([generator1, generator2], key=key, reverse=False)
next(test2)

0

In [10]:
# bisect.bisect_left
buckets = [2, 4, 4]
print "If we insert 3, it goes to %d" % bisect.bisect_left(buckets, 3)
print "If we insert 1, it goes to %d" % bisect.bisect_left(buckets, 1)
print "If we insert 4, it goes to %d" % bisect.bisect_left(buckets, 4)

If we insert 3, it goes to 1
If we insert 1, it goes to 0
If we insert 4, it goes to 1


In [11]:
# RDD.sample
rdd = sc.parallelize(range(100))
fraction = 0.1
rdd.sample(False, fraction).collect()

[2, 22, 28, 38, 40, 43, 44, 65, 67, 71, 77, 84, 93, 94]

In [12]:
# Serializer and os.unlink (Serializer is provided via utils.GeneralTools)
generator1 = (odd for odd in range(100) if odd % 2)
filename = "temp"
with open(filename, "w") as f:
    serializer.dump_stream(generator1, f)

with open(filename, "r") as f:
    stream = serializer.load_stream(f)
    print next(stream)

os.unlink(filename)

1


In [13]:
# get_used_memory - returns an int in MB
get_used_memory()

55

No need to modify the following function - it should come in handy!

In [14]:
def get_sort_dir(partId, n):
    """
    Returns a path for temporary file.

    :param n: Unique identification for file
    """
    d = "tmp/sort/" + str(partId) + "/"
    if not os.path.exists(d):
        os.makedirs(d)
    return os.path.join(d, str(n))

### * BEGIN STUDENT CODE *

In [15]:
def externalSortStream(iterator, partId=0, reverse=False, keyfunc=None, serial=serializer, limit=10, batch=100):
    """
    Given an iterator, returns an iterator of sorted elements (according to parameters). 
    
    :param iterator: iterator. Expects (Key, Value).
    :param keyfunc: function applied on the keykey.
    :param reverse: Reverse default ordering if true. (default is ascending; reverse is descending) 
    :param serializer: See README.
    :param limit: memory limit.
    :param batch: Number of elements to read at a time.
    """
    
    all_runs = [] # can be used to hold a list of iterators
    run = [] # used to hold the current run of elements
    
    def load(fileobj):
        """
        Returns a generator object that outputs elements 
        from a serialized (saved) stream. Closes the file when done.
        
        :param fileobj: python object file
        """
        for _ in serial.load_stream(fileobj):
            yield _
        fileobj.close()
   
    # TODO everywhere below 
    run_count = 0
    while True:
        c = list(itertools.islice(iterator, batch))
        #if the list c is empty,then break
        length_of_c = len(c)
        if length_of_c == 0:
            break
        #else, add c to the run
        run = run + c
        #if the momery limit is exceeded,sort the run,write to the disk
        #run_count = len(all_runs)
        if get_used_memory() > limit:
            run = sorted(run,key=lambda x: keyfunc(x[0]),reverse=reverse)
            file_tmp_path = get_sort_dir(partId,run_count)
            with open(file_tmp_path,"w+") as file_w:
                serializer.dump_stream(run,file_w)
            #file_w.close()
            # TODO some cleanup stuff
            del run[:]
            run_count = run_count + 1
    #reopen the file we just write,store it in all_runs
    i=0
    while i < run_count:
        tmp_path = get_sort_dir(partId,i)
        #with open(tmp_path,"r") as file_r:
        file_r = open(tmp_path,"r")
        all_runs.append(load(file_r))
        i = i + 1
    return heapq.merge(all_runs, key=lambda x: keyfunc(x[0]), reverse=reverse)

In [16]:
# Remember to run the import box above.

def partitionByKey(rdd, ascending=True, numPartitions=None, keyfunc=lambda x: x):
    """        
    Uses sampling to partitions the elements by the return value of 
    keyfunc.

    :param ascending: Smallest first.
    :param numPartitions: Number of partitions of the returning RDD.
    :param keyfunc: function to be applied to the key.
    """
    # Base cases done.

    if numPartitions is None:
        numPartitions = rdd.getNumPartitions()

    if numPartitions == 1:
        if rdd.getNumPartitions() > 1:
            rdd = rdd.coalesce(1)
        return rdd
    
    
    # TODO
    bucket_boundary = getBuckets(rdd,ascending,numPartitions,keyfunc)
    if not ascending:
        balanceLoad = lambda x:numPartitions-1-bisect.bisect_left(buckets,x)
    else:
        balanceLoad = lambda x:bisect.bisect_left(buckets,x)
    return rdd.partitionBy(numPartitions, balanceLoad)


def getBuckets(rdd, ascending=True, numPartitions=None, keyfunc=lambda x: x):
    """        
    [Optional] Returns a list of bucket boundaries of length (numPartitions - 1),
    in an order as specfied by the given parameters: ascending, keyfunc. 
    Bucket boundaries are determined by sampling as specified in the README.

    :param ascending: Smallest first.
    :param numPartitions: Number of partitions of the returning RDD.
    :param keyfunc: function to be applied to the key.
    """
    # Base cases done.
    
    part = (10.0 * numPartitions) / rdd.count()
    fraction = rdd.sample(False, part).collect()
    after_sort_fraction = sorted(fraction, key=keyfunc)
    
    result_buckets = []
    #traverse the list to find put what fraction in the result list
    i = 0
    skip_length = len(fraction)/numPartitions
    while i<numPartitions:
        index = i*skip_length
        if i<len(fraction):
            result_buckets.append(fraction[index][0])
        else:
            continue
        i = i + 1
    return result_buckets

In [17]:
def sortByKey(rdd, ascending=True, numPartitions=None, keyfunc=lambda x: x):
    """
    Returns an RDD after executing an external sort using 
    functions partitionByKey and externalSortStream. 

    :param ascending: Smallest first.
    :param numPartitions: Number of partitions of the returning RDD.
    :param keyFunc: function to be applied to the key.
    """
    def sort_generator(index,iterator):
        return externalSortStream(iterator,index,not ascending,keyfunc)
    return partitionByKey(rdd,ascending,numPartitions,keyfunc).mapPartitionsWithIndex(sort_generator)
    #pass # TODO

### * END STUDENT CODE *

Here are tests for `partitionByKey` and `externalSortStream`:

In [18]:
test_stream = ((i, i) for i in range(100))
list(externalSortStream(test_stream, keyfunc=(lambda x: abs(50 - (x ** 2)))))[:10]

[(7, 7),
 (6, 6),
 (8, 8),
 (5, 5),
 (9, 9),
 (4, 4),
 (3, 3),
 (2, 2),
 (1, 1),
 (0, 0)]

Your output should be:
```
[(7, 7),
 (6, 6),
 (8, 8),
 (5, 5),
 (9, 9),
 (4, 4),
 (3, 3),
 (2, 2),
 (1, 1),
 (0, 0)]
```

In [19]:
rdd = CleanRDD(sc.parallelize(range(20), 4).map(lambda x: (x * 37 % 6, x ** 3 % 34)))
partitionByKey(rdd)

<utils.CleanRDD.CleanRDD at 0x7f25740b0650>

Your output should look rather well-distributed. Try forcing a skewed distribution and observe how effective the partitioning is.

Here's a test for `sortByKey`:

In [20]:
rdd = CleanRDD(sc.parallelize(range(100), 4).map(lambda x: (x *((-1) ** x) , x)))
sortByKey(rdd, keyfunc=lambda key: key, ascending=False).collect()[-10:]

[(-81, 81),
 (-83, 83),
 (-85, 85),
 (-87, 87),
 (-89, 89),
 (-91, 91),
 (-93, 93),
 (-95, 95),
 (-97, 97),
 (-99, 99)]

Your output should be:
```
[(-81, 81),
 (-83, 83),
 (-85, 85),
 (-87, 87),
 (-89, 89),
 (-91, 91),
 (-93, 93),
 (-95, 95),
 (-97, 97),
 (-99, 99)]
```

# Testing

In [21]:
tests.test3ClockMap(ClockMap)
tests.test3CacheMap(cacheMap)
tests.test4(sortByKey)

Task 3: PASS - task3ClockMap.txt matched reference output.
Task 3: PASS - task3CacheMap.txt matched reference output.
Task 4: PASS - task4.txt matched reference output.
