In [1]:
#Reload changes -> always run this
%load_ext autoreload
%autoreload 2

## Homework 7
#### Ron Cordell, Ted Dunmire, Filip Krunic

*In this assignment, you will explore networks and develop MRJob code for finding shortest path graph distances. To build up to large datasets, you will develop your code on some very simple toy networks. Afte rthis, you will take your developed code and modify it, applying it to two larger datasets (performing EDA along the way).*

-----

### Question 7.0 

*Write MRJob classes to find the shortest path graph distances as described in the lectures. In addition to finding the distances, your code should also output a distance-minimizing path between the source and target. Work locally for this part of the assignment, and use both of the undirected and directed toy networks.*

*Make sure your output is correct!*

#### Solution: 

To accomplish this task, we write two separate MRJob classes. The first of these, `initGraphJob` initializes the graph and assigns a starter node. The second, `ShortestPathJob` actually finds the shortest path between the start and end nodes of choice. 

##### Driver

Below is the driver file for the job. This iterates over both of the jobs specified above, and prints the shortest path output for both toy datasets. 

In [1]:
%%writefile driver.py

#Driver for iterations
import os
from init_data import initGraphJob
from shortest_path import ShortestPathJob

def findShortestPath(filename, startNode, endNode):
    
    mr_job_init = initGraphJob(args = [filename, '--startNode', startNode]) #insert text file here to change 

    with open('working-graph.txt', 'w+') as myfile:
        
        with mr_job_init.make_runner() as runner:
            runner.run()
            
            for line in runner.stream_output():
                myfile.write(line)


    while True:
        with open('newFile.txt', 'w+') as myfile:
            mr_job = ShortestPathJob(args = ['working-graph.txt'])
            
            with mr_job.make_runner() as runner:
                runner.run()
                
                for line in runner.stream_output():
                    output = mr_job.parse_output_line(line)
                    myfile.write(line)
                
                    if output[0] == endNode and output[1][2] == "V":
                        return (output[1][3], output[1][1]) 
                        break
            
        os.rename('newFile.txt', 'working-graph.txt')

print 'Shortest path in undirected Graph:'
results = findShortestPath('undirected_toy.txt', '1', '4')
print 'Path: ' + str(results[0]) + ', with distance ' + str(results[1])

print 'Shortest path in directed Graph:'
results = findShortestPath('directed_toy.txt', '1', '5')
print 'Path: ' + str(results[0]) + ', with distance ' + str(results[1])

# Clean 
try: 
    os.remove('newFile.txt')
    os.remove('working-graph.txt')

except: 
    print 'Unable to clean files.'

Overwriting driver.py


##### Initialize

Below is the class for the initialization of the graph search. The main purpose of this class is to read in the graph data and output the node, along with its neighbors and graph structure. This lets us 'hold' the graph in the stream for the `ShortestPathJob` to execute on. 

In [2]:
%%writefile init_data.py

from sys import maxint
from mrjob.job import MRJob
from mrjob.step import MRStep

class initGraphJob(MRJob):
    
    def configure_options(self):
        super(initGraphJob, self).configure_options()
        self.add_passthrough_option('--startNode', default = '1')
    
    def mapper(self, _, node):
        nodeID, links = node.split('\t') #split on input tab
        links = eval(links) #make a dictionary
        
        if nodeID == self.options.startNode: 
            yield nodeID, (links.keys(), 0, 'Q', [nodeID]) #sets up start node
        else:
            yield nodeID, (links.keys(), maxint, 'U', [])
            
    def steps(self):
        return [MRStep(mapper = self.mapper)]

if __name__ == "__main__":
    initGraphJob.run()        

Overwriting init_data.py


##### Find Path 

Below is the class that finds the shortest path. It checks for nodes in the queue, and outputs them with the appropriate incremented distances. Finally, it passes all of the other nodes with corresponding statuses, and updates the queue for the next iteration. 

In [3]:
%%writefile shortest_path.py

from mrjob.job import MRJob
from mrjob.step import MRStep
import sys

class ShortestPathJob(MRJob):
    
    def mapper(self, _, line):
        newline = line.strip().split('\t')
        
        node = eval(newline[0])
        
        data = eval(newline[1])
        neighbors = (data[0])
        distance = int(data[1])
        label = data[2]
        path = data[3]
        
        if label == 'Q':
            for neighbor in neighbors:
                newPath = list(path)
                newPath.append(neighbor)
                yield neighbor, [None, distance + 1, 'Q', newPath]
            yield node, [neighbors, distance, 'V', path]
        else:
            yield node, [neighbors, distance, label, path]
     

    
    def reducer(self, key, values):
        #By default assume a node is unvisited with an empty list of neighbors, makes updating below easier
        neighbors = [] 
        distance = sys.maxint
        label = 'U'
        path = []
        
        for value in values:
            
            temp_neighbors = value[0]
            temp_distance = value[1]
            temp_label = value[2]
            temp_path = value[3]
            
            if temp_label == 'V':
                neighbors = temp_neighbors
                distance = temp_distance
                label = temp_label
                path = temp_path
                break
            
            elif temp_label == 'Q':
                label = temp_label
                distance = temp_distance
                path = temp_path
                
            elif temp_label == 'U':
                neighbors = temp_neighbors
                
        yield key, [neighbors, distance, label, path]
        
        
            
if __name__ == '__main__':
    ShortestPathJob.run()

Overwriting shortest_path.py


##### Execute 

We run the driver code and check the output. 

In [4]:
!python driver.py

Shortest path in undirected Graph:
No handlers could be found for logger "mrjob.runner"
Path: [u'1', u'5', u'4'], with distance 2
Shortest path in directed Graph:
Path: [u'1', u'2', u'4', u'5'], with distance 3


-----

### Question 7.1

*Using MRJob, explore the synonyms network data. Consider plotting the degree distribution (does it follow a power law?),and determine some of the key features, like:*

* *number of nodes,* 
* *number links,*
* *average degree (i.e., the average number of links per node),*


*As you develop your code, please be sure to run it locally first (though on the whole dataset). Once you have gotten you code to run locally, deploy it on AWS as a systems test in preparation for our next dataset (which will require AWS).*

#### Solution 

To accomplish this task, we create three jobs. The first, `number_of_nodes.py` computes the number of nodes, and `number_of_links.py` computes the total number of links. Finally, `average_degree.py` computes the average degree across the dataset. In addition, we provide a specific driver file to manage this process. 

##### Driver 

The driver file reads in the synonym NLTK data, and runs both the node and link jobs to determine the total number of nodes and links, respectively. 

In [5]:
%%writefile synDriver.py

from number_of_nodes import numberOfNodes
from number_of_links import numberOfLinks
from average_degree import averageDegree

filenames = ['synNet/synNet.txt']

for filename in filenames:
    
    # Nodes 
    mr_job = numberOfNodes(args = [filename])
    print "Number of Nodes in " + filename
    with mr_job.make_runner() as runner:
        runner.run()
        for line in runner.stream_output():
            print mr_job.parse_output_line(line)
    
    # Links 
    mr_job2 = numberOfLinks(args = [filename])
    print "Number of Links in " + filename
    with mr_job2.make_runner() as runner:
        runner.run()
        for line in runner.stream_output():
            print mr_job2.parse_output_line(line)
            
    # Degree 
    mr_job3 = averageDegree(args = [filename])
    print "Average degree in " + filename
    with mr_job3.make_runner() as runner:
        runner.run()
        for line in runner.stream_output():
            print mr_job2.parse_output_line(line)            

Writing synDriver.py


##### Number of Nodes

This file will compute the number of nodes. This is relatively straightforward, as we simply output the value '1' for each line in our data, which represents the listing of each node. 

In [6]:
%%writefile number_of_nodes.py

from mrjob.job import MRJob

class numberOfNodes(MRJob):
    
    def mapper(self, _, line):
        yield None, 1
        
    def reducer(self, _, counts): 
        yield None, sum(counts)
        
if __name__ == '__main__':
    numberOfNodes.run()

Writing number_of_nodes.py


##### Number of Links 

This class will compute the number of links. This is slightly more subtle, because we may iterate over each link more than once. What we really want to count is the node pairs that are analogous with each link.  

In [7]:
%%writefile number_of_links.py

from mrjob.job import MRJob
from mrjob.step import MRStep 

from ast import literal_eval as load 

class numberOfLinks(MRJob):
    
    def mapper(self, _, line):
        
        # Parse 
        line = line.strip().split('\t')
        node = line[0]
        neighbors = load(line[1])
        
        for neighbor in neighbors: 
            
            # Sort 
            pair = [node, neighbor]
            pair.sort()
            
            yield pair, None
            
    
    def reducer_dedupe(self, pair, _):
        
        # Flatten 
        yield None, pair
        
        
    def reducer_agg(self, _, pairs):
        
        tally = 0
        
        # Aggregate
        for pair in pairs: 
            tally += 1 
            
        yield None, tally
    
        
    def steps(self):
        
        return [MRStep(mapper=self.mapper, 
                          reducer=self.reducer_dedupe), 
                   
                MRStep(reducer=self.reducer_agg)]
        
        
if __name__ == '__main__':
    numberOfLinks.run()

Writing number_of_links.py


##### Average Degree

Here, we implement something similar to the number of links, with the exception that our reducer will do a division aggregation instead to get an average. 

In [8]:
%%writefile average_degree.py 

from __future__ import division 
from mrjob.job import MRJob 

from ast import literal_eval as load 

class averageDegree(MRJob):
    
    def mapper(self, _, line): 
        
        # Parse 
        line = line.strip().split('\t')
        node = line[0]
        neighbors = load(line[1])
        
        yield None, len(neighbors)
        
    
    def reducer(self, _, lengths):
        
        tally = 0 
        count = 0
        
        # Aggregate 
        for value in lengths: 
            count += 1
            tally += value 
            
        yield None, tally / count
        
if __name__ == '__main__':
    averageDegree().run()

Writing average_degree.py


##### Execute 

We now run the driver and execute our two jobs. 

In [9]:
!python synDriver.py

Number of Nodes in synNet/synNet.txt
No handlers could be found for logger "mrjob.runner"
(None, 8271)
Number of Links in synNet/synNet.txt
(None, 30567)
Average degree in synNet/synNet.txt
(None, 7.391367428364164)


### Question 7.2

*Write (reuse your code from 7.0) an MRJob class to find shortest path graph distances, 
and apply it to the NLTK synonyms network dataset.*

*Proof your code's function by running the job:*

- *shortest path starting at "walk" (index=7827) and ending at "make" (index=536)*

*and showing you code's output. Once again, your output should include the path and the distance.*

*As you develop your code, please be sure to run it locally first (though on the whole dataset). 
Once you have gotten you code to run locally, deploy it on AWS as a systems test
in preparation for our next dataset (which will require AWS).*

#### Solution 

Here, we re-use our code specified in Question 7.0. The only change is the driver function, which is modified to use the synonym NLTK dataset. 

In [12]:
%%writefile syn_path_driver.py

#Driver for iterations
import os
from init_data import initGraphJob
from shortest_path import ShortestPathJob

def findShortestPath(filename, startNode, endNode):
    
    mr_job_init = initGraphJob(args = [filename, '--startNode', startNode]) #insert text file here to change 

    with open('working-graph.txt', 'w+') as myfile:
        
        with mr_job_init.make_runner() as runner:
            runner.run()
            
            for line in runner.stream_output():
                myfile.write(line)


    while True:
        with open('newFile.txt', 'w+') as myfile:
            mr_job = ShortestPathJob(args = ['working-graph.txt'])
            
            with mr_job.make_runner() as runner:
                runner.run()
                
                for line in runner.stream_output():
                    output = mr_job.parse_output_line(line)
                    myfile.write(line)
                
                    if output[0] == endNode and output[1][2] == "V":
                        return (output[1][3], output[1][1]) 
                        break
            
        os.rename('newFile.txt', 'working-graph.txt')

print 'Shortest path in NLTK synonym data:'
results = findShortestPath('synNet/synNet.txt', '7827', '536')
print 'Path: ' + str(results[0]) + ', with distance ' + str(results[1])

# Clean 
try: 
    os.remove('newFile.txt')
    os.remove('working-graph.txt')

except: 
    print 'Unable to clean files.'

Overwriting syn_path_driver.py


In [13]:
!python syn_path_driver.py

Shortest path in NLTK synonym data:
No handlers could be found for logger "mrjob.runner"
Path: [u'7827', u'4655', u'631', u'536'], with distance 3


### Question 7.3

*Using MRJob, explore the Wikipedia network data on the AWS cloud. Reuse your code from HW 7.1---does is scale well? 
Be cautioned that Wikipedia is a directed network, where links are not symmetric. So, even though a node may be linked to, it will not appear as a primary record itself if it has no out-links. This means that you may have to ADJUST your code (depending on its design). To be sure of your code's functionality in this context, run a systems test on the directed_toy.txt network.*

#### Solution

We utilize the data found in the S3 bucket. However, because of the `emr` tag we execute the class files directly for the three EDA procedures- number of nodes, number of links, and average degree. 

##### Number of Nodes 

Here we count the number of nodes in the Wikipedia dataset.  

In [1]:
!sudo python number_of_nodes.py -r emr s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt

Got unexpected keyword arguments: ssh_tunnel
no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
using existing scratch bucket mrjob-01bc9d239cbfa41e
using s3://mrjob-01bc9d239cbfa41e/tmp/ as our scratch dir on S3
creating tmp directory /tmp/number_of_nodes.root.20160310.020003.113542
writing master bootstrap script to /tmp/number_of_nodes.root.20160310.020003.113542/b.py

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols

Copying non-input files into s3://mrjob-01bc9d239cbfa41e/tmp/number_of_nodes.root.20160310.020003.113542/files/
Waiting 5.0s for S3 eventual consistency
Creating Elastic MapReduce job flow
Job flow created with ID: j-2SW79CB2KM3IY
Created new job flow j-2SW79CB2KM3IY
Job launched 30.6s ago, status STARTING: Provisioning Ama

##### Number of Links 

Here we count the number of links for the Wikipedia dataset. 

In [3]:
!sudo python number_of_links.py -r emr s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt

Got unexpected keyword arguments: ssh_tunnel
no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
using existing scratch bucket mrjob-01bc9d239cbfa41e
using s3://mrjob-01bc9d239cbfa41e/tmp/ as our scratch dir on S3
creating tmp directory /tmp/number_of_links.root.20160310.024545.134374
writing master bootstrap script to /tmp/number_of_links.root.20160310.024545.134374/b.py

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols

Copying non-input files into s3://mrjob-01bc9d239cbfa41e/tmp/number_of_links.root.20160310.024545.134374/files/
Waiting 5.0s for S3 eventual consistency
Creating Elastic MapReduce job flow
Job flow created with ID: j-1R6M3KZXFLQYM
Created new job flow j-1R6M3KZXFLQYM
Job launched 30.6s ago, status STARTING: Provisioning Ama

### Question 7.4 

*Using MRJob, find shortest path graph distances in the Wikipedia network on the AWS cloud.
Reuse your code from 7.2, but once again be warned of Wikipedia being a directed network.
To be sure of your code's functionality in this context, run a systems test on the directed_toy.txt network.*

*When running your code on the Wikipedia network, proof its function by running the job:*

- *shortest path from "Ireland" (index=6176135) to "University of California, Berkeley" (index=13466359),*

*and show your code's output. Show the shortest path in terms of just page IDS but also in terms of the name of page (show of your MapReduce join skills!!)*

*Once your code is running, find some other shortest paths and report your results.*

### Question 7.5 

*Suppose you wanted to find the largest network distance from a single source,
i.e., a node that is the furthest (but still reachable) from a single source.*

*How would you implement this task? 
How is this different from finding the shortest path graph distances?*

*Is this task more difficult to implement than the shortest path distance?*

*As you respond, please comment on program structure, runtimes, iterations, general system requirements, etc...*

#### Solution

This would be the safe difficulty as finding the shortest path (within MRJob). The argument for this is that to find the shortest path from a starting node to an arbitrary node, I have to compute the distance for all other nodes in the graph. In other words, the distance calculation is identical between the two. The difference between the longest and shortest distance implementation is how the stream is parsed. For the former, I look at the largest value in the set, and for the latter the smallest. 

### Question 7.6 (Optional)

*Using MRJob, write a code to find the largest graph distance and distance-maximizing nodes from a single-source.
Test your code first on the toy networks and synonyms network to proof its function.*

#### Solution

We re-use our `shortest_path` implementation, but we modify the driver to return the maximal element rather than the end-node that is specified. 