## Initial Load, Mapper, and Reducer Definitions

In [1]:
import csv
import json

INITIAL_CSV = 'M.csv'
JSON_FILE_NAME = 'dictionary.json'

def convert_csv_to_json():
    # initialize dictionary
    adjacency_dict = {}
    # read in file
    with open(INITIAL_CSV, newline='') as f:
        reader = csv.reader(f)
        # for each row
        for row in reader:
            # get the three values
            input_link = row[0]
            output_link = row[1]
            # if the input link is not in the dict add it
            if input_link not in adjacency_dict.keys():
                adjacency_dict[input_link] = [output_link]
            # otherwise just append the output link to its list
            else:
                adjacency_dict[input_link].append(output_link)
            # if the output link is not in the dict, add it with empty list
            if output_link not in adjacency_dict.keys():
                adjacency_dict[output_link] = []
    # get initial ranks, return the new dict containing all information
    result = {in_link: (1.0/len(adjacency_dict), out_links) for in_link, out_links in adjacency_dict.items()}
    # write to json file
    with open(JSON_FILE_NAME, "w") as outfile:  
        json.dump(result, outfile)
    return result.keys()

# writes the results of the mapping piece of PageRank to a small json called {link}.json
def map(link):
    with open(JSON_FILE_NAME) as json_file: 
        dictionary = json.load(json_file) 
    p = dictionary[str(link)][0]/len(dictionary[str(link)][1])
    map_result = {out_link: (p, dictionary[out_link][1]) for out_link in dictionary[str(link)][1]}
    with open(link + ".json", "w") as outfile:  
        json.dump(map_result, outfile)

# reduces all of the results of the mapping jsons and overwrites the dictionary.json
def reduce(links):
    new_ranks = {}
    for link in links:
        with open(link + '.json') as json_file: 
            result = json.load(json_file)
        for key, value in result.items():
            if key not in new_ranks:
                new_ranks[key] = value
            else:
                new_ranks[key] = (value[0] + new_ranks[key][0], new_ranks[key][1])
    with open(JSON_FILE_NAME, "w") as outfile:  
        json.dump(new_ranks, outfile)

## MapReduce

In [2]:
NUM_ITERATIONS = 2            # set number of iterations
links = convert_csv_to_json() # convert the csv to json and get the links

# iterate the MapReduce job the desired number of times
for i in range(NUM_ITERATIONS):
    for link in links:
        map(link)
    reduce(links)

## Results

In [3]:
# print results
with open(JSON_FILE_NAME) as json_file: 
    dictionary = json.load(json_file)
print({key: dictionary[key][0] for key in sorted(dictionary.keys())})

{'1': 0.3333333333333333, '2': 0.2222222222222222, '3': 0.2222222222222222, '4': 0.2222222222222222}
