# Homework 3: PySpark - I
### CS186, UC Berkeley, Spring 2016
### Due: Thursday Feb 25, 2016, 11:59 PM
### Note: This homework is to be done individually!  Do not modify any existing method signatures.
### **This is the first of two .ipynb files in this homework.

In [None]:
## On some computers it may be possible to run this lab 
## locally by using this script; you will need to run
## this each time you start the notebook.
## You do not need to run this on inst machines.

# from local_install import setup_environment
# setup_environment()

In [None]:
import os
import math
import pyspark
from utils import SparkContext as sc

In [None]:
from utils import CleanRDD
from utils import tests

# Part 1: Word Count in PySpark


Using RDD transformations, find the top 20 words in the given text file, sorted in descending order by the number of times they occur. Part of the code has been implemented for you - fill out the rest!

### * BEGIN STUDENT CODE *

In [None]:
def countWords(input_text):
    """
    Returns a python list of the top 20 words in input_text, 
    sorted in descending order by the number of times they occur.
    
    :param input_text: Path to text file
    
    >>> countWords("asyoulikeit.txt")[0:3]
    [('the', 692), ('and', 671), ('i', 638)]
    
    """
    rdd = sc.textFile(input_text) # Create an rdd containing lines from the file
    
    result_rdd = rdd # rdd.DO_SOME_THING_AMAZING
    
    python_list = result_rdd.take(20) # Note that all computation will happen here
    return python_list

### * END STUDENT CODE *

The following is a quick debugging test:

In [None]:
countWords("asyoulikeit.txt")[0:3]

The output should be:
```
[('the', 692), ('and', 671), ('i', 638)]
```

# Part 2: Pyspark Fundamentals

Let's now explore how an rdd is constructed. Specifically, let's implement our own textFile method, which will support custom line delimiters. We've already implemented some of the logic in `textFile` for you, but you will need to complete `readlineInPartitions`, which should yield lines that the given partition should iterate over.  Recall that RDDs have multiple partitions - you will need to partition the file appropriately.

We've also written the following function for you to make life just a little easier.

In [None]:
# This function takes an input file handle and returns a
# "line" of characters, up to but not including the 
# delimiter. We've implemented this for you :-).
def readToDelimiter(fHandle, delimiter):
    s = ""
    while True:
        c = fHandle.read(1)
        if c == "" or c == delimiter:
            return s
        s += c

### * BEGIN STUDENT CODE *

In [None]:
def readlinesInPartition(totalPartitions, partitionId, filename, delimiter):
    """
    Return an *iterator* over the "valid" lines in this partition of the file.
    
    :param totalPartitions: Total number of partitions in the RDD
    :param partitionId: The index of this current partition - (0 indexed)
    :param filename: The path to the file.
    :param delimiter: Character used to signal stop of element chunk.
    """
    
    filesize = os.path.getsize(filename)
    partitionSize = int(math.ceil(filesize / float(totalPartitions)))
    
    beginPartition = None # TODO: you do the math (it's pretty easy)
    endPartition = None # TODO: you do the math (+1 that)
    
    with open(filename, "r") as fHandle:
        # TODO: yield great things!
        # Hint 1: The very first line is special
        # Hint 2: We might find ourselves overlapping a bit
        pass

### * END STUDENT CODE *

In [None]:
def textFile(filename, delimiter="\n", numPartitions=2):
    """
    This function should take a file and return an RDD of the lines 
    in that file.
    
    :param filename: The path to the file.
    :param delimiter: Character used to signal stop of element chunk.
    """
    # create a collection of partitionIds from 0 up to numPartitions
    partitionIds = sc.parallelize(range(numPartitions), numPartitions)
    
    readIteratorWithIndex = lambda idx, iterator: readlinesInPartition(numPartitions, idx, filename, delimiter)
    
    ## Execute your readlines code on each partition
    return partitionIds.mapPartitionsWithIndex(readIteratorWithIndex)

Here's a mini test to help debug your line reader:

In [None]:
numPartitions = 5
partitionId = 0
filename = "urls.txt"
delimiter = "\n"
list(readlinesInPartition(numPartitions, partitionId, filename, delimiter))[0:2]

The output should look like:
```
['http://jmhdb.cs.berkeley.edu:8000/pages/page_511.html',
 'http://jmhdb.cs.berkeley.edu:8000/pages/page_87.html']
```

## PySpark Transformations

Implement `filter()`, `reduceByKey()`, `flatMap()`, and `join()` using (primarily)
 - `mapPartitionsWithIndex`
 - `partitionBy`
 - `collect`
 - `zipPartitions`
 
Methods with the `[OPTIONAL]` tag are _optional_ - they are there for guidance and/or practice and can potentially make your life easier.

### * BEGIN STUDENT CODE *

In [None]:
class CS186RDD(CleanRDD):
    
    def __init__(self, rdd):
        """
        You should not modify this method.
        """
        CleanRDD.__init__(self, rdd)
        
    def filter(self, func):
        """
        Return a new RDD containing only the elements that satisfy a predicate.
        
        :param func: Function returning output that can be evaluated as True or False.
        
        >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
        >>> rdd.filter(lambda x: x % 2 == 0).collect()
        [2, 4]
        """ 
        return self # TODO
    
    def reduceByKey(self, func, numPartitions=None):
        """
        Merge the values for each key using an associative reduce function.

        This will also perform the merging locally on each mapper before
        sending results to a reducer, similarly to a "combiner" in MapReduce.
        
        :param func: Function given elements a, b returns one output.
        :param numPartitions: Number of partitions expected on final RDD.

        >>> from operator import add
        >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
        >>> sorted(rdd.reduceByKey(add).collect())
        [('a', 2), ('b', 1)]
        """
        return self # TODO
    
    def flatMap(self, func):
        """
        Return a new RDD by first applying a function to all elements of this
        RDD, and then flattening the results.

        :param func: Function given an element returns an iterable.
        
        >>> rdd = sc.parallelize([2, 3, 4])
        >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
        [1, 1, 1, 2, 2, 3]
        >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
        [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
        """
        return self # TODO
    
    def join(self, other):
        """
        Return an RDD containing all pairs of elements with matching keys in
        C{self} and C{other}.

        Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
        (k, v1) is in C{self} and (k, v2) is in C{other}.

        Performs a hash join across the cluster.
        
        :param other: Another RDD. Expects (K,V) elements.
        
        >>> x = sc.parallelize([("a", 1), ("b", 4)])
        >>> y = sc.parallelize([("a", 2), ("a", 3)])
        >>> sorted(x.join(y).collect())
        [('a', (1, 2)), ('a', (1, 3))]
        """
        return self # TODO
    
    ## ## ## ## ## ## ## OPTIONAL BELOW ## ## ## ## ## ## ## ## ## 
    
    def mapPartitions(self, func):
        """
        [OPTIONAL]. Return a new RDD by applying a function to each partition 
        of this RDD.
        """
        pass
    
    def map(self, func):
        """
        [OPTIONAL]. Return a new RDD by applying a function to each element 
        of this RDD.
        """
        pass
    
def _combine(function, iterator):
    """
    [OPTIONAL]. Can be in reduceByKey, to combine the values of (key, value) 
    using the given function.
    """
    pass

def _probe(iter1, iter2):
    """
    [OPTIONAL]. Probes and returns the result of joins between two partitions. 
    """
    pass

### * END STUDENT CODE *

Here is a simple test for `filter`. Feel free to add to this test!

In [None]:
numPartitions = 5
testcollection = range(300)
testRDD = CS186RDD(sc.parallelize(testcollection, numPartitions))
testRDD.filter(lambda x: x % 3 == 0).collect()[:10]

Output should be as follows:
```
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
```

Yet another simple test for `reduceByKey` - feel free to modify:

In [None]:
testRDD = CS186RDD(sc.parallelize([("a", 1), ("b", 4), ("a", 2), ("b", 6)]))
sorted(testRDD.reduceByKey(lambda value1, value2: value1 + value2).collect())

Output should be: 
```
[('a', 3), ('b', 10)]
```

And a test for `flatMap`:

In [None]:
numPartitions = 5
testcollection = range(300)
testRDD = CS186RDD(sc.parallelize(testcollection, numPartitions))
sorted(testRDD.flatMap(lambda x: range(1, x)).collect())[-10:]

Output should be 
```
[295, 295, 295, 295, 296, 296, 296, 297, 297, 298]
```

Another test for you to play with - `join`:

In [None]:
testRDD = CS186RDD(sc.parallelize([("a", 1), ("b", 4)]))
testRDD2 = CS186RDD(sc.parallelize([("a", 2), ("a", 3), ("b", "c"), ("b", "1")]))
sorted(testRDD.join(testRDD2).collect())

Output should be 
```
[('a', (1, 2)), ('a', (1, 3)), ('b', (4, '1')), ('b', (4, 'c'))]
```

# Testing

In [None]:
tests.test1(countWords)
tests.test2TextFile(textFile)
tests.test2Filter(CS186RDD)
tests.test2Reduce(CS186RDD)
tests.test2FlatMap(CS186RDD)
tests.test2Join(CS186RDD)