CS119 Big Data

Spring 2024

# Part 1 - Functional Programming

## Question 1: Create Arithmetic Functional Functions

In [112]:
from functools import reduce

def add(*args):
    return reduce(lambda x, y: x + y, args, 0)

def sub(*args):
    return reduce(lambda x, y: x - y, args)

def ra_sub(*args):
    if len(args) == 1:
        return args[0]
    else:
        # Unpack the tuple so each element is one argument
        return args[0] - ra_sub(*args[1:]) 


### Test the outputs against known answer

In [113]:
add_result = add(1, 2, 3)
print(add_result)
if add_result == 6:
    print("CORRECT")
else:
    print("INCORRECT")

sub_result = sub(5, 1, 2)
print(sub_result)
if sub_result == 2:
    print("CORRECT")
else:
    print("INCORRECT")

ra_sub_result = ra_sub(5, 1, 2)
print(ra_sub_result)
if ra_sub_result == 6:
    print("CORRECT")
else:
    print("INCORRECT")

6
CORRECT
2
CORRECT
6
CORRECT


## Question 2: Create Zip Function

In [114]:
import numpy as np

def add_to_zipped(zipped, seq):
    # Concatenates the elements of the existing zipped, and the sequence, for each element
    return list(map(lambda zipped, seq: [*zipped, seq], zipped, seq))

def my_zip(*args):
    # Starts with a series of empty lists, one for each element of a given sequence (all must be the same length), and appends to it, one sequence at a time
    return reduce(add_to_zipped, args, [[]]*len(args[0]))

### Test the outputs against known answer

In [115]:
zip_result_1 = my_zip([1,2,3],[4,5,6])
print(zip_result_1)
if zip_result_1 == [[1, 4], [2, 5], [3, 6]]:
    print("CORRECT")
else:
    print("INCORRECT")

zip_result_2 = my_zip([1,2,3],[4,5,6],[7,8,9])
print(zip_result_2)
if zip_result_2 == [[1, 4, 7], [2, 5, 8], [3, 6, 9]]:
    print("CORRECT")
else:
    print("INCORRECT")

[[1, 4], [2, 5], [3, 6]]
CORRECT
[[1, 4, 7], [2, 5, 8], [3, 6, 9]]
CORRECT


## Question 3: Create Zipwith Function

In [116]:
def zipwith(f, *args):
    return list(map(f, *args))

### Test the outputs against known answer

In [117]:
zipwith_result_1 = zipwith(add, [1, 2, 3], [4, 5, 6])
print(zipwith_result_1)
if zipwith_result_1 == [5, 7, 9]:
    print("CORRECT")
else:
    print("INCORRECT")

zipwith_result_2 = zipwith(add, [1, 2, 3], [4, 5, 6], [1, 1, 1])
print(zipwith_result_2)
if zipwith_result_2 == [6, 8, 10]:
    print("CORRECT")
else:
    print("INCORRECT")

[5, 7, 9]
CORRECT
[6, 8, 10]
CORRECT


## Question 4: Create Flatten Function

In [118]:
from functools import reduce

def concat_ints(x, y):
    '''
    Concatenates two values, x and y, that may either be ints or lists of ints, into a single list of ints
    '''
    if isinstance(x, int):
        if isinstance(y, int):
            return [x, y]
        else:
            return [x, *y]
    else:
        if isinstance(y, int):
            return [*x, y]
        else:
            return [*x, *y]

def flatten(tree:list):
    # Recursion base case is if all elements of tree are ints rather than lists
    if reduce(lambda prev, x: prev and isinstance(x, int), tree, True):
        return tree
    else:
        return flatten(reduce(concat_ints, tree))

### Test the outputs against known answer

In [119]:
flatten_result = flatten([1, [2, [3, 4], [5, 6], 7], 8, [9, 10]])
print(flatten_result)
if flatten_result == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]:
    print("CORRECT")
else:
    print("INCORRECT")

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
CORRECT


## Question 5: Create Groupby Function

In [120]:
from copy import deepcopy
from functools import reduce

def add_key_to_dict(d: dict, tup: tuple):
    key, val = tup
    # Need to deepcopy since dicts are mutable - not editing the existing d
    new_dict = deepcopy(d)
    if key in new_dict:
        old_val = d[key]
    else:
        old_val = []
    # This combines dicts into new, updating keys from d with new key
    return new_dict | {key: old_val + [val]}

def group_by(func, seq):
    func_outputs =list(map(func, seq)) 
    zipped = my_zip(func_outputs, seq)
    return reduce(add_key_to_dict, zipped, {})

### Test the outputs against known answer

In [121]:
grouby_result = group_by(len, ["hi", "dog", "me", "bad", "good"])
print(grouby_result)
if grouby_result == {2: ["hi", "me"], 3: ["dog", "bad"], 4: ["good"]}:
    print("CORRECT")
else:
    print("INCORRECT")

{2: ['hi', 'me'], 3: ['dog', 'bad'], 4: ['good']}
CORRECT


# Part 2 - Confirming Hadoop Installation

## Question 1: Aquire the Cluster

![P2Q1](img/P2Q1.png)


## Question 2: Load the data into the master

Make quiz4 directory on hdfs:

**hadoop fs -mkdir /quiz4**

Get assignment file via curl and put it in the new folder as access.log:

**curl -sS https://raw.githubusercontent.com/singhj/big-data-repo/refs/heads/main/datasets/access.log | hadoop fs -put - /quiz4/access.log**

View the outputs:

**hadoop fs -cat /quiz4/access.log | head**

output is top 10 rows of raw file, as expected

![P2Q2](img/P2Q2.png)

## Question 3: Run Wordcount on five-books

Run hadoop jar command to create mapreduce job:

**hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar wordcount /five-books /books-count**

Get results of job once complete:

**hadoop fs -get /books-count**

View results:

![P2Q3_1](img/P2Q3_1.png)

![P2Q3_2](img/P2Q3_2.png)


## Question 4: Run Wordcount using mapper_noll and aggregate

Run mapred streaming command to create mapreduce job:

**mapred streaming -file ~/big-data-repo/hadoop/mapper_noll.py -mapper mapper_noll.py -input /five-books -reducer aggregate -output /books-stream-count**

Get results of job onto master once complete:

**hadoop fs -get /books-stream-count**

View results:

![P2Q4](img/P2Q4.png)

## Question 5: Run wordcount using mapper_noll and reducer_noll

Run mapred streaming command to create mapreduce job:

**mapred streaming -file ~/big-data-repo/hadoop/mapper_noll.py -file ~/big-data-repo/hadoop/reducer_noll.py -mapper mapper_noll.py -reducer reducer_noll.py -input /five-books -output /books-my-own-counts**

Get results of job onto master once complete:

**hadoop fs -get /books-my-own-counts**

View results, which are word counts formatted accoring to custom reducer, as expected

![P2Q5](img/P2Q5.png)

# Part 3 - Analyzing Server Logs

### Question 1: Get the percentage of each request type (GET, PUT, POST, etc)

#### Command and Results:

Put files from master onto hdfs after uploading

**hadoop fs -put quiz4/Part3Question1_reducer.py quiz4/Part3Question1_mapper.py /quiz4/**

MapReduce Job

**mapred streaming -file ~/quiz4/Part3Question1_mapper.py -file ~/quiz4/Part3Question1_reducer.py -mapper Part3Question1_mapper.py -reducer Part3Question1_reducer.py -input /quiz4/access.log -output /quiz4/Part3Question1**

Get Results from hdfs to master:

**hadoop fs -get /quiz4/Part3Question1 \\quiz4**

Post-Processing to turn counts into Percentages.
Bash command that loops through concatenated reducer results twice - first to sum up total occurences, second time to divide by the sum

**awk 'NR==FNR{sum+= $2; next}{$2/=sum; print $0}' <(cat quiz4/Part3Question1/\*) <(cat quiz4/Part3Question1/\*) > quiz4/Part3Question1Results.txt**

Results:

![P3Q1](img/P3Q1.png)

I did this as one mapreduce job to simply create the counts of each unique term, then a simple bash command to transform the counts into percentages. This could not have been done as one single job to calculate the percents with multiple reduce tasks, since we can't know the overall count of all keys until all reducers are finished, and therefore can't calculate the percentage within any given reducer. So I framed the problem as a word counting, and a postprocessing command. This method is generalizeable to extremely large data since there were only 3 possible requests, so the bash computation to convert to percentages was trivial and did not necessitate another mapreduce job.

#### Part3Question1_mapper.py

In [None]:
#!/usr/bin/env python
import sys, shlex


def main(argv):
    line = sys.stdin.readline()
    try:
        while line:
            # Using shlex to split since it more naturally splits on the file - it uses spaces for delimeters, with quoted fields that sometimes contain spaces
            linelist = shlex.split(line)
            # Only consider input if we get full row exactly - may be slightly different if delimiter changes
            if len(linelist) == 11:
                # The command with the parameter is the 6th column
                request = linelist[5]
                # The request type itself is space seperated from the rest of the command
                request_type = request.split(' ')[0]
                print("RequestType:" + request_type.upper() + "\t" + "1")
            line = sys.stdin.readline()
    except EOFError as error:
        return None

if __name__ == "__main__":
    main(sys.argv)

#### Part3Question1_reducer.py (same as reducer_noll.py)

In [None]:
#!/usr/bin/env python
"""reducer.py"""

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print ('%s\t%s' % (current_word, current_count))
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print ('%s\t%s' % (current_word, current_count))

### Question 2: Get the percentage of each response type (100-199, 200-299, etc)

Very similar process to Question 1 - the main difference is the mapper file

#### Command and Results:

Put files from master onto hdfs

**hadoop fs -put quiz4/Part3Question2_reducer.py quiz4/Part3Question2_mapper.py /quiz4/**

MapReduce Job

**mapred streaming -file ~/quiz4/Part3Question2_mapper.py -file ~/quiz4/Part3Question2_reducer.py -mapper Part3Question2_mapper.py -reducer Part3Question2_reducer.py -input /quiz4/access.log -output /quiz4/Part3Question2**

Get Results from hdfs to master:

**hadoop fs -get /quiz4/Part3Question2 \\quiz4**

Post-Processing to turn counts into Percentages, same command as before with different files

**awk 'NR==FNR{sum+= $2; next}{$2/=sum; print $0}' <(cat quiz4/Part3Question2/\*) <(cat quiz4/Part3Question2/\*) > quiz4/Part3Question2Results.txt**

Results:

![P3Q2](img/P3Q2.png)

#### Part3Question2_mapper.py

In [None]:
#!/usr/bin/env python
import sys, shlex

def main(argv):
    line = sys.stdin.readline()
    try:
        while line:
            linelist = shlex.split(line)
            # Only consider input if we get full row exactly - may be slightly different if delimiter changes
            if len(linelist) == 11:
                response = linelist[6]
                # Only need to consider leading digit of the response code for grouping
                response_type = response[0] + "00-" + response[0] + "99"
                print("ResponseType:" + response_type + "\t" + "1")
            line = sys.stdin.readline()
    except EOFError as error:
        return None

if __name__ == "__main__":
    main(sys.argv)

#### Part3Question2_reducer.py (again, same as reducer_noll.py)

In [None]:
#!/usr/bin/env python
"""reducer.py"""

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print ('%s\t%s' % (current_word, current_count))
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print ('%s\t%s' % (current_word, current_count))

### Question 3: Get the 5 IP addresses that return the most client errors

Very similar process to Question 1 and 2 - the main differences are the mapper file, and the command at the end 

#### Command and Results:

Put files from master onto hdfs

**hadoop fs -put quiz4/Part3Question3_reducer.py quiz4/Part3Question3_mapper.py /quiz4/**

MapReduce Job

**mapred streaming -file ~/quiz4/Part3Question3_mapper.py -file ~/quiz4/Part3Question3_reducer.py -mapper Part3Question3_mapper.py -reducer Part3Question3_reducer.py -input /quiz4/access.log -output /quiz4/Part3Question3**

Get Results from hdfs to master:

**hadoop fs -get /quiz4/Part3Question3 \\quiz4**

Post-Processing to sort the counts, and take the top five

**cat quiz4/Part3Question3/\* | sort -r -n -k 2 | head -5 > quiz4/Part3Question3Results.txt**

This command requires the -r, -n, and -k 2 flags to sort the results in descending order, numerically instead of alphabetically, and by the second column which is the count. 

Results:

![P3Q3](img/P3Q3.png)

This approach of using a post-processing command is somewhat less scalable than the question 1 and 2 approaches, since there may be a significant amount of unique IP addresses in extremely large data. The Mapreduce job did reduce the size of the file that needs to be processed from 78,252 lines, 12 columns down to 803 lines, 2 columns of the reducer output. If working with data that contains millions or billions of unique IP addresses, extra thought will be required. This could be a simple change, though - for example, you could pass the results into another mapreduce job that filtered the IP address counts by counts greater than some threshold (maybe 10), which would greatly reduce the number of rows and thus the sorting workload.

#### Part3Question3_mapper.py

In [None]:
#!/usr/bin/env python
import sys, shlex

def main(argv):
    line = sys.stdin.readline()
    try:
        while line:
            linelist = shlex.split(line)
            # Only consider input if we get full row exactly - may be slightly different if delimeter changes
            if len(linelist) == 11:
                response = linelist[6]
                # Is a client error if the response code starts with a 4
                if response[0] == '4':
                    ip_address = linelist[0]
                    print("IPAddress:" + ip_address + "\t" + "1")
            line = sys.stdin.readline()
    except EOFError as error:
        return None

if __name__ == "__main__":
    main(sys.argv)

#### Part3Question3_reducer.py (again, same as reducer_noll.py)

In [None]:
#!/usr/bin/env python
"""reducer.py"""

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print ('%s\t%s' % (current_word, current_count))
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print ('%s\t%s' % (current_word, current_count))

# Part 4 - Presidential Speeches

Similar process again as each Question in Part 3, with custom made mapper and reducer files

#### Command and Results:

Put files from master onto hdfs

**hadoop fs -put quiz4/Part4_reducer.py quiz4/Part4_mapper.py /quiz4/**

MapReduce Job

**mapred streaming -file ~/quiz4/Part4_mapper.py -file ~/quiz4/Part4_reducer.py -mapper Part4_mapper.py -reducer Part4_reducer.py -input /quiz4/prez_speeches/\* -output /quiz4/Part4**

Get Results from hdfs to master:

**hadoop fs -get /quiz4/Part4 \\quiz4**

Post-Processing to sort the counts in descending order by the second column, which is valence

**cat  quiz4/Part4/\* | sort -r -n -k 2 > quiz4/Part4Results.txt**

Results:

![P4_1](img/P4_1.png)

No pattern here between jumps out to me - about either predidential dispoisition or current events at the time of their presidency.

![P4_2](img/P4_2.png)

Map Wrote 3,300,913 total bytes

## Valence Function (for testing)

In [None]:
import requests
import re
import string

def remove_stopwords(stopwords, words):
    list_ = re.sub(r"[^a-zA-Z0-9]", " ", words.lower()).split()
    return [itm for itm in list_ if itm not in stopwords]

def clean_text(stopwords, text):
    text = text.lower()
    text = re.sub('\[.*?\]', '', text)
    text = re.sub('[%s]' % re.escape(string.punctuation), ' ', text)
    text = re.sub('[\d\n]', ' ', text)
    return ' '.join(remove_stopwords(stopwords,  text))

def calc_word_valence(word, afinn_dict):
    if word in afinn_dict:
        return int(afinn_dict[word])
    else:
        return None

def calc_valence(text, afinn_dict):
    '''
    Gets the valence of a line of cleaned text, returned as a list of valences at each word
    '''
    # At this point they will have been cleaned, so we assume a space seperator
    word_valences = list(map(lambda word: calc_word_valence(word, afinn_dict), text.split(' ')))
    return list(filter(lambda valence: valence is not None, word_valences))

def valence(text):
    '''
    Gets the valence of a line of raw text
    '''
    # Using afinn_dict and stopwords as inputs so I don't have to load them anew for every line - just once at beginning of mapper
    stopwords_list = requests.get("https://gist.githubusercontent.com/rg089/35e00abf8941d72d419224cfd5b5925d/raw/12d899b70156fd0041fa9778d657330b024b959c/stopwords.txt").content
    stopwords = list(set(stopwords_list.decode().splitlines()))

    afinn = requests.get('https://raw.githubusercontent.com/fnielsen/afinn/master/afinn/data/AFINN-en-165.txt').content.decode().splitlines()
    afinn_dict = dict(map(lambda x: (x.split('\t')), afinn))
    
    if type(text) != str:
        text = text.decode()
    return calc_valence(clean_text(stopwords, text), afinn_dict)

## Part4_mapper_tests.py

Testing a large variety of edge cases using the mapper form of the function, each described in function comment. All tests were passed in the .py file.

I repeated these tests for the form of the function that will be tested by the grader (where valence()'s only input is the text), and also passed all tests

In [None]:
import unittest
from Part4_mapper import valence, get_afinn_dict
import dis
import requests

class TestValence(unittest.TestCase):
    def setUp(self):
        self.afinn_dict = get_afinn_dict()
        stopwords_list = requests.get("https://gist.githubusercontent.com/rg089/35e00abf8941d72d419224cfd5b5925d/raw/12d899b70156fd0041fa9778d657330b024b959c/stopwords.txt").content
        self.stopwords = list(set(stopwords_list.splitlines()))

    def test_normal(self):
        '''
        A typical sequence of three words
        '''
        self.assertEqual(valence('yeah winner worst', self.afinn_dict, self.stopwords),[1, 4, -3])

    def test_empty(self):
        '''
        Empty string input
        '''
        self.assertEqual(valence('', self.afinn_dict, self.stopwords),[])

    def test_nonword(self):
        '''
        Words not in afinn dictionary should be skipped
        '''
        self.assertEqual(valence('qqqqqq', self.afinn_dict, self.stopwords),[])

    def test_quotes(self):
        '''
        Words in quotes should still parse correctly
        '''
        self.assertEqual(valence('"yeah" "winner worst"', self.afinn_dict, self.stopwords),[1, 4, -3])

    def test_seperators(self):
        '''
        Testing that various seperators are removed, and special characters ignored
        '''
        self.assertEqual(valence('yeah\twinner\tworst', self.afinn_dict, self.stopwords),[1, 4, -3])
        self.assertEqual(valence('yeah\t\twinner\t\tworst', self.afinn_dict, self.stopwords),[1, 4, -3])
        self.assertEqual(valence('yeah\nwinner\nworst\t\n', self.afinn_dict, self.stopwords),[1, 4, -3])
        self.assertEqual(valence('yeah! *winner[\n]worst$%^&', self.afinn_dict, self.stopwords),[1, 4, -3])
    
    def test_nonprintable(self):
        '''
        Only nonprintable characters are removed
        '''
        self.assertEqual(valence('\n', self.afinn_dict, self.stopwords),[])
        self.assertEqual(valence('\n*@$%&($\n', self.afinn_dict, self.stopwords),[])

    def ex_function():
        '''
        Function to get bytecode of in below test - clean and true both have valences of 2 - no other words in bytecode are present
        '''
        clean = True
    
    def test_bytecode_string(self):
        '''
        Bytecode string should interpret the given instructions
        '''
        bc_string = dis.Bytecode(self.ex_function).dis()
        self.assertEqual(valence(bc_string, self.afinn_dict, self.stopwords),[2, 2])

    def test_bytestring(self):
        '''
        Byte strings should be decoded first
        '''
        self.assertEqual(valence(b'yeah winner worst', self.afinn_dict, self.stopwords),[1, 4, -3])

if __name__ == '__main__':
    unittest.main()

## Part4_mapper.py

This valence function is not exactly the same as the form that I changed so it could be used for testing - I refactored the function inputs/outputs slightly so that the stopwords and afinn dictionary did not have to be reloaded on every call to valence(), which happends on every line in the map

In [None]:
#!/usr/bin/env python

import sys
from pathlib import Path
import os
import requests
import re
import string

def remove_stopwords(stopwords, words):
    list_ = re.sub(r"[^a-zA-Z0-9]", " ", words.lower()).split()
    return [itm for itm in list_ if itm not in stopwords]

def clean_text(stopwords, text:str):
    text = text.lower()
    text = re.sub(r'\[.*?\]', '', text)
    text = re.sub(r'[%s]' % re.escape(string.punctuation), ' ', text)
    text = re.sub(r'[\d\n]', ' ', text)
    return ' '.join(remove_stopwords(stopwords, text))

def get_afinn_dict():
    '''
    Create a dict from the afinn data, for easier lookup of each word
    '''
    afinn = requests.get('https://raw.githubusercontent.com/fnielsen/afinn/master/afinn/data/AFINN-en-165.txt').content.decode().splitlines()
    return dict(map(lambda x: (x.split('\t')), afinn))

def calc_word_valence(word, afinn_dict):
    if word in afinn_dict:
        return int(afinn_dict[word])
    else:
        return None

def calc_valence(text, afinn_dict):
    '''
    Gets the valence of a line of cleaned text, returned as a list of valences at each word
    '''
    # At this point they will have been cleaned, so we assume a space seperator
    word_valences = list(map(lambda word: calc_word_valence(word, afinn_dict), text.split(' ')))
    return list(filter(lambda valence: valence is not None, word_valences))

def valence(text, afinn_dict, stopwords):
    '''
    Gets the valence of a line of raw text
    '''
    # Using afinn_dict and stopwords as inputs so I don't have to load them anew for every line - just once at beginning of mapper
    if type(text) != str:
        text = text.decode()
    return calc_valence(clean_text(stopwords, text), afinn_dict)

def main(argv):
    stopwords_list = requests.get("https://gist.githubusercontent.com/rg089/35e00abf8941d72d419224cfd5b5925d/raw/12d899b70156fd0041fa9778d657330b024b959c/stopwords.txt").content
    stopwords = list(set(stopwords_list.splitlines()))
    afinn_dict = get_afinn_dict()
    line = sys.stdin.readline()
    filename = Path(os.environ['mapreduce_map_input_file']).stem
    pres = filename.split('_')[0]
    try:
        while line:
            valencelist = valence(line, afinn_dict, stopwords)
            for v in valencelist: print(pres.title() + "\t" + str(v)) 
            line = sys.stdin.readline()
    except EOFError as error:
        return None

if __name__ == "__main__":
    main(sys.argv)


## Part4_reducer.py

In [None]:
#!/usr/bin/env python
"""reducer.py"""

from operator import itemgetter
import sys

current_pres = None
current_pres_count = 0
current_pres_valence_sum = 0
pres = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    pres, valence = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        valence = int(valence)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_pres == pres:
        current_pres_count += 1
        current_pres_valence_sum += valence
    else:
        if current_pres:
            avg_valence = current_pres_valence_sum/current_pres_count
            # write result to STDOUT
            print ('%s\t%s' % (current_pres, avg_valence))
        current_pres_count = 1
        current_pres_valence_sum = valence
        current_pres = pres

# do not forget to output the last word if needed!
# Avoid divide by zero error
if current_pres == pres and current_pres_count != 0:
    avg_valence = current_pres_valence_sum/current_pres_count
    print ('%s\t%s' % (current_pres, avg_valence))

# Part 5 - Hadoop Errors

When using the modified mapper_noll, all 13 Map task attempts that ran failed, across 5 unique tasks (several had retries that also failed). 

![P5_1](img/P5_1.png)

2 more that were in progress received a kill command, which appears to trigger after one task fails four times - in this case it was task_1728644793739_0003_m_000002

![P5_2](img/P5_2.png)

When navigating to the logs of each of these 13 failed tasks, the log files all show the same error printed to stderr, the divide by zero error, 13 total error messages. Here is one example:

![P5_3](img/P5_3.png)

These failed tasks were spread across both worker nodes: /default-rack/quiz4-cluster-w-0.c.cs119-quiz-4.internal:8042 and /default-rack/quiz4-cluster-w-1.c.cs119-quiz-4.internal:8042

![P5_4](img/P5_4.png)

Comparing this to the logs of the successfull version of this task (From Part 2 Question 5) we can see that each task is charged with analyzing anywhere from 500 to 4,800 lines:

![P5_5](img/P5_5.png)

We can verify that records refers to lines of text here, since in this example there were 11 tasks and 35,119 total lines of text across the five books - an average of around 3,000 per task is about right. The chance of failure at each line is 1/100, since this is the chance the randomly generated number is 0.
Therefore, even the smallest task has only a .99^500=0.65% chance of completing successfully, and the larger tasks are much more unlikely. It is no suprise to see every one of them fail.