## MIDS UC Berkeley, Machine Learning at Scale 
 
__W261-1__ Summer 2016    
__Week 7__: SSSP    

__Name__   
name@ischool.berkeley.edu  

July 1, 2016   

***

<h1 style="color:#021353;">General Description</h1>
<div style="margin:10px;border-left:5px solid #eee;">
<pre style="font-family:sans-serif;background-color:transparent">
In this assignment you will explore networks and develop MRJob code for 
finding shortest path graph distances. To build up to large data 
you will develop your code on some very simple, toy networks.
After this you will take your developed code forward and modify it and 
apply it to two larger datasets (performing EDA along the way).

<h3>Undirected toy network dataset</h3>


In an undirected network all links are symmetric, 
i.e., for a pair of nodes 'A' and 'B,' both of the links:

A -> B and B -> A

will exist. 

The toy data are available in a sparse (stripes) representation:

(node) \t (dictionary of links)

on AWS/Dropbox via the url:

s3://ucb-mids-mls-networks/undirected_toy.txt
On under the Data Subfolder for HW7 on Dropbox with the same file name. 
The Data folder is in: https://db.tt/Kxu48mL1)

In the dictionary, target nodes are keys, link weights are values 
(here, all weights are 1, i.e., the network is unweighted).


<h3>Directed toy network dataset</h3>

In a directed network all links are not necessarily symmetric, 
i.e., for a pair of nodes 'A' and 'B,' it is possible for only one of:

A -> B or B -> A

to exist. 

These toy data are available in a sparse (stripes) representation:

(node) \t (dictionary of links)

on AWS/Dropbox via the url:

s3://ucb-mids-mls-networks/directed_toy.txt
Or under the Data Subfolder for HW7 on Dropbox with the same file name
(On Dropbox https://www.dropbox.com/sh/2c0k5adwz36lkcw/AAAAKsjQfF9uHfv-X9mCqr9wa?dl=0)

In the dictionary, target nodes are keys, link weights are values 
(here, all weights are 1, i.e., the network is unweighted).
</pre>
</div>

<h1 style="color:#021353;">HW 7.0: Shortest path graph distances (toy networks)</h1>
<div style="margin:10px;border-left:5px solid #eee;">
<pre style="font-family:sans-serif;background-color:transparent">
In this part of your assignment you will develop the base of your code for the week.

Write MRJob classes to find shortest path graph distances, as described in the lectures. In addition to finding the distances, your code should also output a distance-minimizing path between the source and target.
Work locally for this part of the assignment, and use both of the undirected and directed toy networks.

To proof you code's function, run the following jobs

- shortest path in the undirected network from node 1 to node 4
Solution: 1,5,4. NOTE: There is another shortest path also (HINT: 1->5->4)! Either will suffice (you will find this also in the remaining problems. E.g., 7.2 and 7.4.
 

- shortest path in the directed network from node 1 to node 5
Solution: 1,2,4,5

and report your output---make sure it is correct!

<h3>Main dataset 1: NLTK synonyms</h3>

In the next part of this assignment you will explore a network derived from the NLTK synonym database used for evaluation in HW 5. At a high level, this network is undirected, defined so that there exists link between two nodes/words if the pair or words are a synonym. These data may be found at the location:

<a href="s3://ucb-mids-mls-networks/synNet/synNet.txt">s3://ucb-mids-mls-networks/synNet/synNet.txt</a>
<a href="s3://ucb-mids-mls-networks/synNet/indices.txt">s3://ucb-mids-mls-networks/synNet/indices.txt</a>
On under the Data Subfolder for HW7 on Dropbox with the same file names

where synNet.txt contains a sparse representation of the network:

(index) \t (dictionary of links)

in indexed form, and indices.txt contains a lookup list

(word) \t (index)

of indices and words. This network is small enough for you to explore and run
scripts locally, but will also be good for a systems test (for later) on AWS.

In the dictionary, target nodes are keys, link weights are values 
(here, all weights are 1, i.e., the network is unweighted).
</pre>
</div>

In [114]:
%%writefile identity.py
#!/Users/AnthonySpalvieriKruse/anaconda/bin/python

from mrjob.job import MRJob
from mrjob.step import MRStep
import ast

class Identity(MRJob):
    
    def mapper(self, _, value):
        key, value = value.strip().split('\t')
        yield key, ast.literal_eval(value)
    def steps(self):
        return [MRStep(mapper=self.mapper)] 
    
if __name__ == '__main__':
    Identity.run()

Overwriting identity.py


In [214]:
%%writefile parseGraph.py
#!/Users/AnthonySpalvieriKruse/anaconda/bin/python

from mrjob.job import MRJob
from mrjob.step import MRStep
import ast

class ParseGraph(MRJob):

    def configure_options(self):
        super(ParseGraph, self).configure_options()
        self.add_passthrough_option('--originNode', type = "str")
    
    def __init__(self, *args, **kwargs):
        super(ParseGraph, self).__init__(*args, **kwargs)
        self.originNode = self.options.originNode
        
    def mapper(self, key, line):
        node, neighbors = line.strip().split("\t")
        if node == self.originNode:
            yield node, (ast.literal_eval(neighbors), 0, self.originNode, "V")
        else:
            yield node, (ast.literal_eval(neighbors), 9999999999, '', "U")
            
    def reducer(self, key, line): 
        #neighbors, distance, path, state = next(line)
        #yield key, (neighbors, distance, path, state)
        yield key, next(line)
            
    def steps(self):
        return [MRStep(mapper=self.mapper,
                       reducer=self.reducer)] 
    
if __name__ == '__main__':
    ParseGraph.run()

Overwriting parseGraph.py


In [56]:
%%writefile ShortestPathToyz.py
#!/Users/AnthonySpalvieriKruse/anaconda/bin/python

from mrjob.job import MRJob
from mrjob.step import MRStep
import ast

class ShortestPathToys(MRJob):

    def configure_options(self):
        super(ShortestPathToys, self).configure_options()
        self.add_passthrough_option('--isFirstPass', type = "int", default=0)
        self.add_passthrough_option('--originNode', type = "str", default = "1")
    
    def __init__(self, *args, **kwargs):
        super(ShortestPathToys, self).__init__(*args, **kwargs)
        self.isFirstPass = self.options.isFirstPass
        self.originNode = self.options.originNode
    
    def mapper_init(self):
        try:
            # Read weights file
            with open('/Users/AnthonySpalvieriKruse/VirtualBoxShared/HW7/visited.txt', 'r') as f:
                self.visited = set(f.readlines()[0].split(','))
        except IndexError:
            self.visited = set()
        
    def mapper(self, _, line):
        node, value = line.strip().split('\t')
        node = node.strip('"') #just in case
        if self.isFirstPass:
            node, neighbors = line.strip().split("\t")
            if node == self.originNode:
                yield node, (ast.literal_eval(neighbors), 0, self.originNode, "V")
            else:
                yield node, (ast.literal_eval(neighbors), 9999999999, '', "U")
        else:
            frontier, distance, path, state = ast.literal_eval(value)
            if state == "V":
                for neighbor in frontier:
                    if neighbor not in self.visited:
                        if path:
                            yield neighbor, (None, distance+1, path+"-"+neighbor, "Q")
                        else:
                            yield neighbor, (None, distance+1, neighbor, "Q")      
            yield node, (frontier, distance, path, state)
            
    def reducer(self, key, line): 
        if self.isFirstPass:
            yield int(key), next(line)
        else:
            frontiers=[]
            distances=[]
            states=[]
            truePath=''

            for frontier, distance, path, state in line:
                frontiers.append(frontier)
                distances.append(distance)
                states.append(state)
                if path!='':
                    truePath = path

            if "Q" in states:
                if len([frontier for frontier in frontiers if frontier!=None])==0:
                    yield key, ({}, min(distances), truePath, "V")
                else:
                    yield key, ([frontier for frontier in frontiers if frontier!=None][0], min(distances), truePath, "V") 
            else:
                yield key, (frontier, distance, truePath, state)
            
    def steps(self):
        return [MRStep(mapper_init=self.mapper_init, mapper=self.mapper,reducer=self.reducer)] 
    
if __name__ == '__main__':
    ShortestPathToys.run()

Overwriting ShortestPathToyz.py


In [118]:
%%writefile shortestPathToyRunner.py
#!/Users/AnthonySpalvieriKruse/anaconda/bin/python

from numpy import random,array
from ShortestPathToyz import ShortestPathToys
import sys 

mr_job = ShortestPathToys(args=[sys.argv[3],"--originNode",sys.argv[1], "--isFirstPass", '1', "--output-dir", "output/"])
visited = set()
firstPass = True
i=0
visitedLength = 0 

#Clear visited file from previous runs
with open('visited.txt', 'w+') as f:
        f.writelines(','.join(str(j) for j in visited))
        
while(1):
    print "iteration ="+str(i)+"  visited =", str(len(visited))
    allVisited = True
    output = {}
    
    with mr_job.make_runner() as runner: 
        runner.run()
        
        # stream_output: get access of the output 
        for line in runner.stream_output():
            # value is the gradient value
            node, value =  mr_job.parse_output_line(line)
            #print node, value
            frontier, distance, path, state = value
            output[node]=value
            
            if state == "V":
                visited.add(node)
            else:
                allVisited = False
    
    if firstPass:
        mr_job = ShortestPathToys(args=['output/', "--file", "indices.txt", "--file", "visited.txt","--originNode", sys.argv[1], "--isFirstPass", '0', "--output-dir", "output/"])
        firstPass = False
        
    i = i + 1
    
    with open('visited.txt', 'w+') as f:
        f.writelines(','.join(str(j) for j in visited))
        
    if allVisited or len(visited)-visitedLength == 0:
        break

    visitedLength = len(visited)
    
print "iteration ="+str(i)+"  visited =", str(len(visited))

print output[sys.argv[2]]

Overwriting shortestPathToyRunner.py


<h1 style="color:#021353;">HW 7.1: Exploratory data analysis (NLTK synonyms)</h1>
<div style="margin:10px;border-left:5px solid #eee;">
<pre style="font-family:sans-serif;background-color:transparent">
Using MRJob, explore the synonyms network data.
Consider plotting the degree distribution (does it follow a power law?),
and determine some of the key features, like:

number of nodes, 
number links,
or the average degree (i.e., the average number of links per node),
etc...

As you develop your code, please be sure to run it locally first (though on the whole dataset). 
Once you have gotten you code to run locally, deploy it on AWS as a systems test
in preparation for our next dataset (which will require AWS).
</pre>
</div>


In [116]:
%%writefile nltkEDA.py
#!/Users/AnthonySpalvieriKruse/anaconda/bin/python

from mrjob.job import MRJob
from mrjob.step import MRStep
import ast
import itertools

class EDA(MRJob):
    
    def configure_options(self):
        super(EDA, self).configure_options()
        self.add_passthrough_option('--feature', type = "str")
        
    def __init__(self, *args, **kwargs):
        super(EDA, self).__init__(*args, **kwargs)
        self.feature = self.options.feature
        self.frequency = 0 
        self.numLinks = 0
        self.minDegree = 0
        self.maxDegree = 0 
        self.options.jobconf = {"mapred.reduce.tasks":1 }
        
    def mapper(self, key, line):
        node, neighbors = line.strip().split("\t")
        neighbors = ast.literal_eval(neighbors)
        numLinks = len(neighbors)
        
        if self.feature == "numNodes":
            yield node, 1
        elif self.feature == "numLinks":
            yield node, numLinks
        elif self.feature == "averageDegree":
            yield node, numLinks
        elif self.feature == "minMaxDegree":
            yield None, numLinks
        else:
            print "Acceptable features: numNodes, numLinks, averageDegree, minMaxDegree"
            exit()
            
    def combiner(self, key, values):
        if self.feature == "numNodes":
            yield node, 1
        elif self.feature == "numLinks":
            yield key, sum(values)
        elif self.feature == "averageDegree":
            yield key, sum(values)
        elif self.feature == "minMaxDegree":
            values, valuesCp = itertools.tee(values)
            yield None, (min(values), max(valuesCp))
        
    def reducer(self, key, values):
        if self.feature == "numNodes":
            self.frequency += 1
        elif self.feature == "numLinks":
            self.numLinks += sum(values)
        elif self.feature == "averageDegree":  
            self.frequency +=1
            self.numLinks += sum(values)
        elif self.feature == "minMaxDegree":
            values, valuesCp = itertools.tee(values)
            self.maxDegree = max(values)
            self.minDegree = min(valuesCp)
    
    def reducer_final(self):
        if self.feature == "numNodes":
            yield "Number of Nodes: ", self.frequency
        elif self.feature == "numLinks":
            yield "Number of Links", self.numLinks
        elif self.feature == "averageDegree":
            yield "Average Degree: ", float(self.numLinks)/self.frequency
        elif self.feature == "minMaxDegree":
            yield "Min/Max Degree: ", (self.minDegree, self.maxDegree)
        
    
    def steps(self):
        return [MRStep(mapper=self.mapper,reducer=self.reducer,
                      reducer_final=self.reducer_final)] 
    
if __name__ == '__main__':
    EDA.run()

Overwriting nltkEDA.py


In [117]:
!./nltkEDA.py synNet.txt --feature "numNodes" 
!./nltkEDA.py synNet.txt --feature "numLinks" 
!./nltkEDA.py synNet.txt --feature "averageDegree" 
!./nltkEDA.py synNet.txt --feature "minMaxDegree" 


No configs found; falling back on auto-configuration
Creating temp directory /var/folders/tl/kz2xr4bd3dvf07wcdjmvpl680000gn/T/nltkEDA.AnthonySpalvieriKruse.20161031.203554.390850
Running step 1 of 1...
Streaming final output from /var/folders/tl/kz2xr4bd3dvf07wcdjmvpl680000gn/T/nltkEDA.AnthonySpalvieriKruse.20161031.203554.390850/output...
"Min/Max Degree: "	[1, 196]
Removing temp directory /var/folders/tl/kz2xr4bd3dvf07wcdjmvpl680000gn/T/nltkEDA.AnthonySpalvieriKruse.20161031.203554.390850...


<h1 style="color:#021353;">HW 7.2: Shortest path graph distances (NLTK synonyms)</h1>
<div style="margin:10px;border-left:5px solid #eee;">
<pre style="font-family:sans-serif;background-color:transparent">
Write (reuse your code from 7.0) an MRJob class to find shortest path graph distances, 
and apply it to the NLTK synonyms network dataset. 

Proof your code's function by running the job:

- shortest path starting at "walk" (index=7827) and ending at "make" (index=536),

and showing you code's output. Once again, your output should include the path and the distance.

As you develop your code, please be sure to run it locally first (though on the whole dataset). 
Once you have gotten you code to run locally, deploy it on AWS as a systems test
in preparation for our next dataset (which will require AWS).

=====================================
<strong>NOTE: Dataset 2 English Wikipedia hyperlink network.data </strong>
The dataset is available via Dropbox at:

https://www.dropbox.com/sh/2c0k5adwz36lkcw/AAAAKsjQfF9uHfv-X9mCqr9wa?dl=0

on S3 at 
<a href="s3://ucb-mids-mls-networks/wikipedia/">s3://ucb-mids-mls-networks/wikipedia/</a>
<a href="s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt">s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt</a> # Graph
<a href="s3://ucb-mids-mls-networks/wikipedia/indices.txt">s3://ucb-mids-mls-networks/wikipedia/indices.txt</a> # Page titles and page Ids

For the remainder of this assignment you will explore the English Wikipedia hyperlink network.

The dataset is built from the Sept. 2015 XML snapshot of English Wikipedia.
For this directed network, a link between articles: 

A -> B

is defined by the existence of a hyperlink in A pointing to B.
This network also exists in the indexed format:

Data: <a href="s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt">s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt</a>
Data: <a href="s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-in.txt">s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-in.txt</a>
Data: <a href="s3://ucb-mids-mls-networks/wikipedia/indices.txt">s3://ucb-mids-mls-networks/wikipedia/indices.txt</a>

but has an index with more detailed data:

(article name) \t (index) \t (in degree) \t (out degree)

In the dictionary, target nodes are keys, link weights are values .
Here, a weight indicates the number of time a page links to another.
However, for the sake of this assignment, treat this an unweighted network,
and set all weights to 1 upon data input.

</pre>
</div>

In [None]:
!./shortestPathToyRunner.py 7827 536 synNet.txt

<h1 style="color:#021353;">HW 7.3: Exploratory data analysis (Wikipedia)</h1>
<div style="margin:10px;border-left:5px solid #eee;">
<pre style="font-family:sans-serif;background-color:transparent">
Using MRJob, explore the Wikipedia network data on the AWS cloud. Reuse your code from HW 7.1---does is scale well? 

Be cautioned that Wikipedia is a directed network, where links are not symmetric. 
So, even though a node may be linked to, it will not appear as a primary record itself if it has no out-links. 

This means that you may have to ADJUST your code (depending on its design). 

To be sure of your code's functionality in this context, run a systems test on the directed_toy.txt network.
</pre>
</div>

In [None]:
!./nltkEDA.py synNet.txt --feature "numNodes" 
!./nltkEDA.py synNet.txt --feature "numLinks" 
!./nltkEDA.py synNet.txt --feature "averageDegree" 
!./nltkEDA.py synNet.txt --feature "minMaxDegree" 

<h1 style="color:#021353;">HW 7.4: Shortest path graph distances (Wikipedia)</h1>
<div style="margin:10px;border-left:5px solid #eee;">
<pre style="font-family:sans-serif;background-color:transparent">
Using MRJob, find shortest path graph distances in the Wikipedia network on the AWS cloud.
Reuse your code from 7.2, but once again be warned of Wikipedia being a directed network.
To be sure of your code's functionality in this context, run a systems test on the directed_toy.txt network.

When running your code on the Wikipedia network, proof its function by running the job:

- shortest path from "Ireland" (index=6176135) to "University of California, Berkeley" (index=13466359),

and show your code's output. Show the shortest path in terms of just page IDS but also in terms of the name of page (show of your MapReduce join skills!!)

Once your code is running, find some other shortest paths and report your results.
</pre>
</div>

<h1 style="color:#021353;">HW 7.5: Conceptual exercise: Largest single-source network distances</h1>
<div style="margin:10px;border-left:5px solid #eee;">
<pre style="font-family:sans-serif;background-color:transparent">
Suppose you wanted to find the largest network distance from a single source,
i.e., a node that is the furthest (but still reachable) from a single source.

How would you implement this task? 
How is this different from finding the shortest path graph distances?

Is this task more difficult to implement than the shortest path distance?

As you respond, please comment on program structure, runtimes, iterations, general system requirements, etc...
</pre>
</div>

<h1 style="color:#021353;">HW 7.5.1: </h1>
<div style="margin:10px;border-left:5px solid #eee;">
<pre style="font-family:sans-serif;background-color:transparent">
Can we utilize combiners in the HW 7 to perform the shortest path implementation?
Does order inversion help with the HW 7 shortest path implementation?
</pre>
</div>

<h1 style="color:#021353;">HW 7.5.2: OPTIONAL </h1>
<div style="margin:10px;border-left:5px solid #eee;">
<pre style="font-family:sans-serif;background-color:transparent">
Implement combiners in the context of HW 7.5 and contrast the performance of this implementation versus the implementation with no combiners. 

Please report the cluster configuration and runtimes in tabular format for both experiments and comment on your findings.
</pre>
</div>

<h1 style="color:#021353;">HW 7.6: Computational exercise: Largest single-source network distances: OPTIONAL </h1>
<div style="margin:10px;border-left:5px solid #eee;">
<pre style="font-family:sans-serif;background-color:transparent">
Using MRJob, write a code to find the largest graph distance and distance-maximizing nodes from a single-source.
Test your code first on the toy networks and synonyms network to proof its function.
</pre>
</div>

==================END HW 7==================