# Homework 3: PySpark - II
### CS186, UC Berkeley, Spring 2016
### Due: Thursday Feb 25, 2016, 11:59 PM
### Note: **This homework is to be done individually!  Do not modify any existing method signatures.**
### **This is the second of two .ipynb files in this homework.

In [2]:
## On some computers it may be possible to run this lab 
## locally by using this script; you will need to run
## this each time you start the notebook.
## You do not need to run this on inst machines.

# from local_install import setup_environment
# setup_environment()

In [3]:
import pyspark
from utils import SparkContext as sc

In [4]:
from utils import CleanRDD
from utils import tests

# Part 3: CacheMap

In this part, we'll construct an rdd that is backed by a `ClockMap` and will behave like `rdd.map(func)`.  
First, implement the `ClockMap` class so that it maintains a cache (of limited `cacheSize`) using the clock replacement policy.

### * BEGIN STUDENT CODE *

In [5]:
class ClockMap:
    
    def __init__(self, cacheSize, func):
        """
        Do not change existing variables.
        [Optional] You are free to add additional items and methods.
        """
        self.cacheSize = cacheSize
        self.fn = func
        self._p = 0 # pointer
        self._increments = 0
        self._miss_count = 0
        self.buffers = [[None, 0] for x in range(cacheSize)]
        self.items_to_index = {}
        
    def _increment(self):
        """
        Do not change this method.
        Updates the clock pointer. The modulo maintains the clock nature.
        """
        self._increments += 1
        self._p = (self._p + 1) % self.cacheSize

    def __getitem__(self, k):
        """
        Returns func(k) using the buffer to cache limited results.
        
        :param k: Value to be evaluated
        
        >>> clock = ClockMap(4, lambda x: x ** 2)
        >>> clock[4]
        16
        >>> clock[3]
        9
        >>> clock._p
        2
        """
        # pass #TODO
        
        # Do it like in lecture??
        # First check if in cache
        for i in range(len(self.buffers)):
            if self.buffers[i][0] == k:
                # Check it
                self.items_to_index[i] = True
                return self.buffers[i][1]
            
        # Else, cache miss
        self._miss_count += 1
        
        # Move pointer around, to find where to replace
        while self._p in self.items_to_index and self.items_to_index.get(self._p):
            # Second chance! Uncheck and move on
            self.items_to_index[self._p] = False
            self._increment()
        
        # Found where to replace, now replace. And don't forget to check it. 
        result = self.fn(k)
        self.buffers[self._p] = [k, result]
        self.items_to_index[self._p] = True
        # I guess, according to test, also have to increment after writing.
        self._increment()
        return result


        
        

Now implement `cacheMap`, which will return an rdd.

In [6]:
def cacheMap(rdd, cacheSize, func):
    """
    Returns an RDD that behaves like rdd.map(func) but
    is implemented using the ClockMap.
    
    :param rdd: Given RDD
    :param cacheSize: Number of cache/buffer pages in the ClockMap
    :param func: Function to map with
    """
    # pass #TODO
    
    # Just use mapPartitionsWithIndex again? LOL
    def iterate(y, itr):
        clock = ClockMap(cacheSize, func)
        for x in itr:
            yield clock[x]
            
    
    return rdd.mapPartitionsWithIndex(iterate)

### * END STUDENT CODE *

Free test for you!

In [7]:
clock = ClockMap(4, lambda x: x ** 2)
print clock[4], clock[3]
print clock._p

16 9
2


Output should be 
```
16, 9
2
```

# Part 4: External Algorithms

You'll need an understanding of the partitioning step of external hashing, and the divide step of external sorting (recall the lecture on external algorithms).

In [8]:
from utils import *
import itertools
import bisect
import os

The following are some tools you may want to use (examples use cases included). You should Google the unfamiliar ones!

In [9]:
# itertools.islice
generator = (y for y in range(100))
test1 = itertools.islice(generator, 5)
print next(test1)
print next(test1)
test2 = itertools.islice(generator, 5)
print next(generator)
print next(test2)

0
1
2
3


In [10]:
# heapq.merge
generator1 = (odd for odd in range(100) if odd % 2)
generator2 = (even for even in range(100)[::2])
key = lambda x: x
test2 = heapq.merge([generator1, generator2], key=key, reverse=False)
next(test2)

0

In [11]:
# bisect.bisect_left
buckets = [2, 4, 4]
print "If we insert 3, it goes to %d" % bisect.bisect_left(buckets, 3)
print "If we insert 1, it goes to %d" % bisect.bisect_left(buckets, 1)
print "If we insert 4, it goes to %d" % bisect.bisect_left(buckets, 4)

buckets2 = [(1, 2), (3, 4), (5, 6)]
print bisect.bisect_left(buckets2, (0, 0))
print bisect.bisect_left(buckets2, (5, 7))

If we insert 3, it goes to 1
If we insert 1, it goes to 0
If we insert 4, it goes to 1
0
3


In [12]:
# RDD.sample
rdd = sc.parallelize(range(100))
fraction = 0.1
rdd.sample(False, fraction).collect()

[14, 16, 27, 28, 30, 59, 94, 97]

In [13]:
# Serializer and os.unlink (Serializer is provided via utils.GeneralTools)
generator1 = (odd for odd in range(100) if odd % 2)
filename = "temp"
with open(filename, "w") as f:
    serializer.dump_stream(generator1, f)

with open(filename, "r") as f:
    stream = serializer.load_stream(f)
    print next(stream)

os.unlink(filename)

1


In [14]:
# get_used_memory - returns an int in MB
get_used_memory()

70

No need to modify the following function - it should come in handy!

In [15]:
def get_sort_dir(partId, n):
    """
    Returns a path for temporary file.

    :param n: Unique identification for file
    """
    d = "tmp/sort/" + str(partId) + "/"
    if not os.path.exists(d):
        os.makedirs(d)
    return os.path.join(d, str(n))

### * BEGIN STUDENT CODE *

In [16]:
def externalSortStream(iterator, partId=0, reverse=False, keyfunc=None, serial=serializer, limit=10, batch=100):
    """
    Given an iterator, returns an iterator of sorted elements (according to parameters). 
    
    :param iterator: iterator. Expects (Key, Value).
    :param keyfunc: function applied on the keykey.
    :param reverse: Reverse default ordering if true. (default is ascending; reverse is descending) 
    :param serializer: See README.
    :param limit: memory limit.
    :param batch: Number of elements to read at a time.
    """
    
    all_runs = [] # can be used to hold a list of iterators
    all_runs_paths = []
    run = [] # used to hold the current run of elements
    length_c = 0
    
    def load(fileobj):
        """
        Returns a generator object that outputs elements 
        from a serialized (saved) stream. Closes the file when done.
        
        :param fileobj: python object file
        """
        for _ in serial.load_stream(fileobj):
            yield _
        fileobj.close()
   
    # TODO everywhere below 
    
    
    # Load up until hit soft limit
    # Then sort the run
    # Save run into a stream
    # And then use heap merge to merge them
    
    # sorted() seems to take in a list, output list, okay
    
    
    # Get all runs into all_runs
    while True:
        
        
        
        # Fill up a single run
        while True:
            
            
            c = list(itertools.islice(iterator, batch))
            length_c = len(c)
            # print(next(iterator))
            
#             for i in range(len(c)):
#                 print(next(iterator))
        
        # TODO
        
#         if "base case goes here":
#             break
        
        # Load up
            run = run + c
            #print(run)
        
        
        # End case: hit memory limit, or nothing left to stream
            if get_used_memory() > limit or length_c < batch: # TODO
                break
        
        # Filled up a run
        # Put it into a lambda function
#         sorted_run = sorted(run, lambda x: keyfunc(x[0]))


        sorted_run = sorted(run, key=lambda x: keyfunc(x[0]), reverse=reverse)
        srun_path = get_sort_dir(partId, len(all_runs))
        # with open(srun_path, "w") as f:
        f = open(srun_path, "w")
        serializer.dump_stream(sorted_run, f)
        # Link into all_runs
    
    
        # With ___ as ____ apparently closes the file
        #with open(srun_path, "r") as f:
            # f is the file obj
        f = open(srun_path, "r")
        all_runs.append(load(f))
#             stream = serializer.load_stream(f)
        
        
        all_runs_paths.append(srun_path)
        
        #os.unlink(srun_path)
        
        
        # Keep making more runs, until nothing left to stream
        if length_c < batch:
            #print(True)
            break;
    
        #print(False)
        #print(length_c)
        
        # OH GOTTA CLEAR OUT THE RUN 
        run = []
        
        
    # TODO some cleanup stuff
    
    # Delete all the runs! unlink
    for path in all_runs_paths:
        os.unlink(path)
    
    # Did all the runs! Now merge them
    return heapq.merge(all_runs, key=lambda x: keyfunc(x[0]), reverse=reverse)

    

In [32]:
# Remember to run the import box above.

def partitionByKey(rdd, ascending=True, numPartitions=None, keyfunc=lambda x: x):
    """        
    Uses sampling to partitions the elements by the return value of 
    keyfunc.

    :param ascending: Smallest first.
    :param numPartitions: Number of partitions of the returning RDD.
    :param keyfunc: function to be applied to the key.
    """
    # Base cases done.

    if numPartitions is None:
        numPartitions = rdd.getNumPartitions()

    if numPartitions == 1:
        if rdd.getNumPartitions() > 1:
            rdd = rdd.coalesce(1)
        return rdd
    
    
    # TODO
    
    # What does getBuckets have to do with this though? 
    # Use these samples (maybe collect it?) to find (a list of) boundaries for each bucket.
    # 
    
    boundaries = getBuckets(rdd, ascending, numPartitions, keyfunc)
    
    print("boundaries = " + str(boundaries))
    
    # You want to write a function that will bucket your data (think coarse partitioning) 
        # - given that you've calculated your buckets
        # Just bisect.bisect_left will do? Should return index of insertion. 
    # Okay, so grab the boundary values, throw into list
        # And then use bisect.bisect_left to figure out which bucket it should go into. Should be okay.
        
    
    def bi(x):
        print ("x = " + str(x))
        print ("index = " + str(bisect.bisect_left(boundaries, x)))
        return bisect.bisect_left(boundaries, x)
        
    #balanceLoad = lambda x: bisect.bisect_left(boundaries, x) # I dunno it didn't like it when I did x[0] so
    balanceLoad = bi
    
    # Okay I think this is like
        # balanceLoad tells you what...index? to go to
        # So then hash by, everyone with same index into same partition. 
        
    
    # Omg let's just test out bisect
    def iterate(y, itr):
        for item in itr:
            print("item = " + str(item))
            print(bisect.bisect_left(boundaries, item))
            yield bisect.bisect_left(boundaries, item)

    print("bisect results: " + str(rdd.mapPartitionsWithIndex(iterate).collect()))
    # Okay it looks like bisect.bisect_left is working right
    # Just the rdd isn't PARTITIONING it right
    #return rdd.partitionBy(numPartitions, func=bi)
    print("right before partitioning, numPartitions = " + str(numPartitions))


    
    thing = rdd.partitionBy(numPartitions, lambda x: bisect.bisect_left(boundaries, x))
    
    print("partitioned, has " + str(thing.getNumPartitions()) + " partitions")
    
    def iterateAgain(y, itr):
        for item in itr:
            print(str(item))
            yield item
    print("results: " + str(thing.mapPartitionsWithIndex(iterateAgain).collect()))

    
    return thing


def getBuckets(rdd, ascending=True, numPartitions=None, keyfunc=lambda x: x):
    """        
    [Optional] Returns a list of bucket boundaries of length (numPartitions - 1),
    in an order as specfied by the given parameters: ascending, keyfunc. 
    Bucket boundaries are determined by sampling as specified in the README.

    :param ascending: Smallest first.
    :param numPartitions: Number of partitions of the returning RDD.
    :param keyfunc: function to be applied to the key.
    """
    # Base cases done.
    
    
    
    
    # Try sampling about 10 per partition (expected value).
    print("numPartitions = " + str(numPartitions))
    fraction = (10 * numPartitions) / rdd.count()
        # You can do this using the given sample function (without replacement).
    samples = rdd.sample(False, fraction).collect()
        
    # Use these samples (maybe collect it?) to find (a list of) boundaries for each bucket.
    # But you have like 10x more than you want. 
    # So maybe sample, and then sort it, and then like divide by numPartitions, to grab the boundary values?
#     sorted_samples = sorted(samples, key=keyfunc, reverse=(not ascending)) # sort by key
    sorted_samples = sorted(samples, key=keyfunc)
    print("sorted_samples length = " + str(len(sorted_samples)))
    # 2 partitions, want 1 bound value. So want numPartitions - 1 bound values. 
    
    
    
    # OOHHHHH okay you only have 10x more than you want IF your rdd is big enough
    # So gonna have to divide, actually. Not just skip by 10. 
    skip_by = len(sorted_samples) / numPartitions
    if skip_by < 1:
        skip_by = 1
          
    list = [sorted_samples[i][0] for i in range(skip_by - 1, len(sorted_samples) - 1, skip_by)]
    
    # list = [sorted_samples[i] for i in range(9, len(sorted_samples) - 1, 10)]
    
    
    return list # I hope this does numPartitions - 1
    


In [18]:
def sortByKey(rdd, ascending=True, numPartitions=None, keyfunc=lambda x: x):
    """
    Returns an RDD after executing an external sort using 
    functions partitionByKey and externalSortStream. 

    :param ascending: Smallest first.
    :param numPartitions: Number of partitions of the returning RDD.
    :param keyFunc: function to be applied to the key.
    
    """
    #pass # TODO
    
    def iterate(y, itr):
        return externalSortStream(itr, y, not ascending, keyfunc)

    
    
    return partitionByKey(rdd).mapPartitionsWithIndex(iterate)
    #return rdd.mapPartitionsWithIndex(iterate)

### * END STUDENT CODE *

Here are tests for `partitionByKey` and `externalSortStream`:

In [19]:
test_stream = ((i, i) for i in range(100))
list(externalSortStream(test_stream, keyfunc=(lambda x: abs(50 - (x ** 2)))))[:10]

[(7, 7),
 (6, 6),
 (8, 8),
 (5, 5),
 (9, 9),
 (4, 4),
 (3, 3),
 (2, 2),
 (1, 1),
 (0, 0)]

Your output should be:
```
[(7, 7),
 (6, 6),
 (8, 8),
 (5, 5),
 (9, 9),
 (4, 4),
 (3, 3),
 (2, 2),
 (1, 1),
 (0, 0)]
```

In [36]:
rdd = CleanRDD(sc.parallelize(range(20), 4).map(lambda x: (x * 37 % 6, x ** 3 % 34)))
print(str(rdd.count()))
#partitionByKey(rdd)

def iterate(y, itr):
    for item in itr:
        yield (y, item)
            
    
#return rdd.mapPartitionsWithIndex(iterate)
# thing = partitionByKey(rdd, ascending=True, numPartitions=5)
# print thing.getNumPartitions()
# thing.mapPartitionsWithIndex(iterate).collect()



# FOR KICKS
newRdd = partitionByKey(rdd, ascending=True)
def counterFunction(y, iterator):
    count = 0
    for item in iterator:
        count += 1
    yield count
newRdd.mapPartitionsWithIndex(counterFunction).collect()


# newRdd = partitionByKey(rdd, ascending=True)
# def counterFunction(y, iterator):
#     count = 0
#     for item in iterator:
#         yield (y, item)
# newRdd.mapPartitionsWithIndex(counterFunction).collect()








#partitionByKey(rdd, ascending=True, numPartitions=5).collect()

#partitionByKey(rdd, True, 5).collect()
#print(partitionByKey(rdd).collect())

20
numPartitions = 4
sorted_samples length = 20
boundaries = [1, 2, 4]
bisect results: [3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]
right before partitioning, numPartitions = 4
partitioned, has 4 partitions
results: [(0, 0), (1, 1), (0, 12), (1, 3), (0, 28), (1, 21), (0, 18), (1, 25), (2, 8), (2, 2), (2, 24), (3, 27), (4, 30), (3, 15), (4, 14), (3, 9), (4, 16), (5, 23), (5, 5), (5, 17)]


[8, 3, 6, 3]

Your output should look rather well-distributed. Try forcing a skewed distribution and observe how effective the partitioning is.

Here's a test for `sortByKey`:

In [30]:
rdd = CleanRDD(sc.parallelize(range(100), 4).map(lambda x: (x *((-1) ** x) , x)))
sortByKey(rdd, keyfunc=lambda key: key, ascending=False).collect()[-10:]

[(-81, 81),
 (-83, 83),
 (-85, 85),
 (-87, 87),
 (-89, 89),
 (-91, 91),
 (-93, 93),
 (-95, 95),
 (-97, 97),
 (-99, 99)]

Your output should be:
```
[(-81, 81),
 (-83, 83),
 (-85, 85),
 (-87, 87),
 (-89, 89),
 (-91, 91),
 (-93, 93),
 (-95, 95),
 (-97, 97),
 (-99, 99)]
```

# Testing

In [31]:
tests.test3ClockMap(ClockMap)
tests.test3CacheMap(cacheMap)
tests.test4(sortByKey)

Task 3: PASS - task3ClockMap.txt matched reference output.
Task 3: PASS - task3CacheMap.txt matched reference output.
Task 4: PASS - task4.txt matched reference output.
