## Quiz 4: Hadoop for Fun and Profit 
Conor Meade\
CS 119

In [31]:
import functools
import requests
import string
import os

## 1. Functional Programming [25 points]

### (1.1) add(), sub(), and ra_sub()

In [36]:
def add(*num_list):
    sum = (functools.reduce(lambda x, y: x+y, num_list))
    return sum

print(add(1, 55, 45))
print(add(0, 1, 1, 2, 3, 5, 8, 13))
print(add(1, 2, 3))

def sub(*num_list):
    difference = (functools.reduce(lambda x, y: x-y, num_list))
    return difference

print(sub(5,1,2))

def ra_sub(*num_list):
    difference = (functools.reduce(lambda x, y: y - x, reversed(num_list)))
    return difference

print(ra_sub(5, 1, 2))
print(ra_sub(5))
print(ra_sub(5, 1, 2, 4, 6))

101
33
6
2
6
5
8


### (1.2) zip()

In [35]:
def zip(*num_sequences):
    zipped_lists = [list(map(lambda s: s[i], num_sequences)) for i in range(len(num_sequences[0]))]
    return zipped_lists

print(zip([1], [2]))
print(zip([1, 2, 3], [4, 5, 6]))
print(zip([1, 2, 3], [4, 5, 6], [7, 8, 9]))

[[1, 2]]
[[1, 4], [2, 5], [3, 6]]
[[1, 4, 7], [2, 5, 8], [3, 6, 9]]


### (1.3) zipwith

In [37]:
def zipwith(func, *num_sequences):
    result = list(map(lambda *args: func(*args), *num_sequences))
    return result

print(zipwith(add, [1, 2, 3], [4, 5, 6]))  # [5, 7, 9]
print(zipwith(add, [1, 2, 3], [4, 5, 6], [1, 1, 1]))
print(zipwith(sub, [1, 2, 3], [4, 5, 6], [1, 1, 1]))

[5, 7, 9]
[6, 8, 10]
[-4, -4, -4]


### (1.4) flatten()

In [38]:
def flatten(*tree):
    flat_list = functools.reduce(lambda acc, node: acc + flatten(*node) if isinstance(node, list) else acc + [node], tree, [])
    # flat_list = functools.reduce(lambda x,y: x+y, tree)
    return flat_list

print(flatten([1, [2, [3, 4], [5, 6], 7], 8, [9, 10]]))
print(flatten([[2, 3, 4], 6, [4, 4], [[1, 2], 3, [4, 7, 99]]]))


[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[2, 3, 4, 6, 4, 4, 1, 2, 3, 4, 7, 99]


### (1.5) group_by()

In [39]:
def group_by(func, num_sequences):
    func_dict = {}

    for s in num_sequences:
        k = func(s)
        if k in func_dict.keys():
            func_dict[k].append(s)
        else:
            func_dict[k] = [s]
    return func_dict


group_by(len, ["hi", "dog", "me", "bad", "good"])

{2: ['hi', 'me'], 3: ['dog', 'bad'], 4: ['good']}

## 2. Confirming Hadoop Installation [15 points]


### (2.1) Acquire the cluster

![Acquire Cluster](Part2/create_cluster.jpeg)

After changing settings to allow for any ip to access, not just internal, and inputting the settings that are provided in Professor J's directions, I was able to create and run my cluster.

### (2.2)  Load the data into the master, move the data into HDFS

![Load Data, move data into HDFS](Part2/MoveFilesHDFS.png)

First I SSH'd into my clusted confirmed my hadoop version (`hadoop version`) and cloned the repo (`git clone https://github.com/singhj/big-data-repo.git`). These both ran fine and ouputted what was expected. Then I was able to use mkdir to create my directories in the hadoop fs. No errors here and ls returned this new directoires so that worked fine. `hadoop fs -put ~/big-data-repo/five-books/* /user/singhj/five-books` put the five-books data into my hadoop file system and `hadoop fs -ls /user/singhj/five-books` allowed me to confirm that all five books of data made it into my cluster. Ouput can be seen in attached picture.

### (2.3)  Without writing any code of your own, verify that you have a good installation of hadoop by running wordcount on five-books. The command is similar to...
![Books Count 1](Part2/books_count_1.png)
![Books Count 2](Part2/books_count_2.png)

Running `hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar wordcount /user/singhj/five-books /books-count` was used to confirm I had a good installation of hadoop. This process showed a mapping and reduce function applied to the five-books data in order to confirm Hadoop working. The output can be seen in the attached pictures. We can see a mapreduce job percentage completion breakdown in the middle of the first picture along with a successful MR job completed message. I then fetch the /books-count directory using `hadoop fs -get /books-count`. I return the results using `ls -la books-count/`. This looks to have worked, it returns:

```
total 320
drwxr-xr-x  2 cmeade6479 cmeade6479   4096 Oct 12 17:23 .
drwxr-xr-x 12 cmeade6479 cmeade6479   4096 Oct 12 17:23 ..
-rw-r--r--  1 cmeade6479 cmeade6479      0 Oct 12 17:23 _SUCCESS
-rw-r--r--  1 cmeade6479 cmeade6479 105799 Oct 12 17:23 part-r-00000
-rw-r--r--  1 cmeade6479 cmeade6479 103061 Oct 12 17:23 part-r-00001
-rw-r--r--  1 cmeade6479 cmeade6479 104969 Oct 12 17:23 part-r-00002
```

### (2.4)  Run wordcount using the provided mapper_noll.py and the default reducer aggregate

![mapper 1](Part2/mapred_1.png)
![mapper 2](Part2/mapred_2.png)
![mapper 3](Part2/mapred_3.png)

Similar to the last question, I apply the commands seen in the directions and was able to complete the MR tasks without issues. I fetch the results using  `hadoop fs -get /books-stream-count` and that runs without error. Running `ls -la books-stream-count/` to confirm word count is working and it looks like it does. That command returns a success message:

```
total 116
drwxr-xr-x  2 cmeade6479 cmeade6479  4096 Oct 12 17:27 .
drwxr-xr-x 13 cmeade6479 cmeade6479  4096 Oct 12 17:27 ..
-rw-r--r--  1 cmeade6479 cmeade6479     0 Oct 12 17:27 _SUCCESS
-rw-r--r--  1 cmeade6479 cmeade6479 34743 Oct 12 17:27 part-00000
-rw-r--r--  1 cmeade6479 cmeade6479 34964 Oct 12 17:27 part-00001
-rw-r--r--  1 cmeade6479 cmeade6479 33989 Oct 12 17:27 part-00002
```


### (2.5)  Run wordcount using the provided mapper_noll.py and the provided reducer reducer_noll.py

![mapper 4](Part2/mapred_4.png)
![mapper 5](Part2/mapred_5.png)
![mapper 6](Part2/mapred_6.png)

Similar to the previous two questions, I apply the commands seen in the directions and was able to complete the MR tasks without big issues. There was a small issue where the `-files` tag was not working so I used `-file` like in the last question and was able to run the command without issue. I fetch the results using  `hadoop fs -get /books-my-own-counts` and that runs without error. Running `ls -la books-my-own-counts/` to confirm word count is working and it looks like it does. That command returns a success message:

```
total 244
drwxr-xr-x  2 cmeade6479 cmeade6479  4096 Oct 12 17:31 .
drwxr-xr-x 14 cmeade6479 cmeade6479  4096 Oct 12 17:31 ..
-rw-r--r--  1 cmeade6479 cmeade6479     0 Oct 12 17:31 _SUCCESS
-rw-r--r--  1 cmeade6479 cmeade6479 79255 Oct 12 17:31 part-00000
-rw-r--r--  1 cmeade6479 cmeade6479 79515 Oct 12 17:31 part-00001
-rw-r--r--  1 cmeade6479 cmeade6479 77539 Oct 12 17:31 part-00002
```

## 3. Analyzing Server Logs [55 points]


### (3.1)  What is the percentage of each request type (GET, PUT, POST, etc.)

For this question, I wrote my own mapper and reduce functions, `request-type-mapper.py` and `request-type-reducer-count.py`. The mapper will return each request type along with a 1 for each row with that request type. This will be like: \

GET 1 \
POST 1 \
GET 1 \
GET 1 \
POST 1 \
HEAD 1 \
...

For the reducer function, it will take these line by line counts of 1 and return a count value for each request.  
I clone the repo for this project using `git clone https://github.com/ConorMeade/Quiz4`. 

Next, run the map reduce command

```console
$ mapred streaming -file ~/Quiz4/request-type-mapper.py ~/Quiz4/request-type-reducer-count.py \
-mapper request-type-mapper.py   \
-reducer request-type-reducer-count.py \
-input /user/cmeade/access.log.txt \
-output /request-type-counts
```

`/request-type-counts` will have the part output files
```console
$ hdfs dfs -ls /request-type-counts
```
Found 4 items \
-rw-r--r--   1 cmeade6479 hadoop          0 2024-10-13 00:52 /request-type-counts/_SUCCESS \
-rw-r--r--   1 cmeade6479 hadoop         21 2024-10-13 00:52 /request-type-counts/part-00000 \
-rw-r--r--   1 cmeade6479 hadoop          0 2024-10-13 00:52 /request-type-counts/part-00001 \
-rw-r--r--   1 cmeade6479 hadoop          9 2024-10-13 00:52 /request-type-counts/part-00002


The counts are split acoss different parts (00000 and 00002). So, combine all the part output files and output that into a text file req_counts
```console
$ hdfs dfs -text /request-type-counts/part* > req_counts.txt
```

```text
<!-- req_counts.txt -->
LongValueSum:GET     33414
LongValueSum:POST    44584
LongValueSum:HEAD    253
```


Clean up cluster space by deleting the directory since the txt file has been generated.
```console
$ hdfs dfs -rm -r /request-type-counts
```

### Post Process Step (3.1)

The request-type-reducer-count reducer file (see below) will give the text ouput of the counts. Use get_percentages_req() post processing to determine the percentage of each request.

In [40]:
def get_percentages_req():
    total_reqs = 0
    request_percentages = {}
    with open('req_counts.txt', 'r') as c:
        for line in c:
            # print(line)
            if line is not None or line != '\n':
                # varrying number of spaces so split on space and remove list elems that are empty/only spaces
                line_list = line.strip().split(' ')
                request_type, count = [item for item in line_list if item.strip() != ""]
                count = int(count)
                request_percentages[request_type] = count
                total_reqs += count

    # calculate percentage
    for request_type, count in request_percentages.items():
        percentage = (count / total_reqs) * 100
        print(f"{request_type}\t{count}\t{percentage:.2f}%")

get_percentages_req()


GET	33414	42.70%
POST	44584	56.98%
HEAD	253	0.32%


### Mapper and Reducer functions

Here are the mapper and reducer functions used in map reduce processing. mapper() used for debugging locally.

#### MAPPER (3.1)

In [31]:
#!/usr/bin/env python
'''request-type-mapper.py'''
import sys
import re


def main(argv):
    line = sys.stdin.readline()
    pattern = re.compile(r'\"(\w+)\s')
    try:
        while line:
            match = pattern.findall(line)
            if match:
                method = match[0] # Extract the request method (e.g., GET, POST, HEAD)
                print(f"LongValueSum:{method}\t1")
            line = sys.stdin.readline()
                
    except EOFError as error:
        return None


if __name__ == "__main__":
    main(sys.argv)


def mapper(debug=False):
    # actually see what function returns, debug
    output_filename = 'mapper_output.txt'
    with open(output_filename, 'w') as output_file:
        with open('access.log.txt', 'r') as f:
            for line in f:
                # line = sys.stdin.readline()
                pattern = re.compile(r'\"(\w+)\s')
                match = pattern.findall(line)
                if match:
                    method = match[0]  # Extract the request method (e.g., GET, POST)
                    output_file.write(f"{method} 1\n")
                    # print(f"{method}\t1")

mapper(debug=True)

#### REDUCER (3.1)

In [41]:
#!/usr/bin/env python

'''request-type-reducer-count.py'''
import sys

request_count = {}


for line in sys.stdin:
    method, count = line.strip().split('\t', 1)
    try:
        count  = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    if method not in request_count:
        request_count[method] = 0

    request_count[method] += count

for method, count in request_count.items():
    print(f"{method}\t{count}")


KeyboardInterrupt: 

### (3.2) What percent of the responses fall into each of the following five types?
For this question, I wrote my own mapper and reduce functions, `request-code-mapper.py` and `request-code-reducer-count.py`. The mapper will return each request type along with a 1 for each row with that request type. This will be like: \
 200	1 \
 200	1 \
 200	1 \
 200	1 \
 200	1 \
 301	1 \
 200	1 \
 301	1 \
 301	1 \
 404	1 \
 301	1 \
 200	1 



To use the log file, mapper, and reducer, I have to pull them into the repo that exists in the cluster
```console
$ cd Quiz4
$ git pull
```

Next, run the map reduce command

```console
$ mapred streaming -file ~/Quiz4/request-code-mapper.py ~/Quiz4/request-code-reducer-count.py \
-mapper request-code-mapper.py   \
-reducer request-code-reducer-count.py \
-input /user/cmeade/access.log.txt \
-output /request-code-counts
```

`/request-code-counts` will have the part output files
```console
$hdfs dfs -ls /request-code-counts
```
Found 4 items \
-rw-r--r--   1 cmeade6479 hadoop          0 2024-10-13 02:39 /request-code-counts/_SUCCESS \
-rw-r--r--   1 cmeade6479 hadoop         15 2024-10-13 02:39 /request-code-counts/part-00000 \
-rw-r--r--   1 cmeade6479 hadoop         29 2024-10-13 02:39 /request-code-counts/part-00001 \
-rw-r--r--   1 cmeade6479 hadoop         27 2024-10-13 02:39 /request-code-counts/part-00002


The counts are split acoss different parts (00000, 00001, and 00002). So, combine all the part output files and output that into a text file code_counts.
```console
$ hdfs dfs -text /request-code-counts/part* > code_counts.txt
```
```text
<!-- code_counts.txt -->
LongValueSum:303     1857
LongValueSum:405     1
LongValueSum:301     957
LongValueSum:304     115
LongValueSum:400     1
LongValueSum:403     63
LongValueSum:200     70559
LongValueSum:206     125
LongValueSum:404     4573
```


Clean up cluster space by deleting the directory since the txt file has been generated.
```console
$ hdfs dfs -rm -r /request-code-counts
```


### Post Process Step (3.2)

The request-code-reducer-count reducer file (see below) will give the text ouput of the counts. Use get_percentages_code() post processing to determine the percentage of each request.

In [45]:
def get_percentages_code():
    response_code_counts = {
        "Informational responses (100–199)": 0,
        "Successful responses (200–299)": 0, 
        "Redirection messages (300–399)": 0,
        "Client error responses (400–499)": 0,
        "Server error responses (500–599)": 0
    }

    with open('code_counts.txt', 'r') as c:
        for line in c:
            # print(line)
            if line is not None or line != '\n':
                # varrying number of spaces so split on space and remove list elems that are empty/only spaces
                line_list = line.strip().split(' ')
                response_code, count = [item for item in line_list if item.strip() != ""]
                try:
                    response_code = int(response_code)
                    count = int(count)
                except ValueError:
                    # count or response_code was not a number, so silently
                    # ignore/discard this line
                    continue
                if 100 <= response_code < 200:
                    response_code_counts["Informational responses (100–199)"] += count
                elif 200 <= response_code < 300:
                    response_code_counts["Successful responses (200–299)"] += count
                elif 300 <= response_code < 400:
                    response_code_counts["Redirection messages (300–399)"] += count
                elif 400 <= response_code < 500:
                    response_code_counts["Client error responses (400–499)"] += count
                elif 500 <= response_code < 600:
                    response_code_counts["Server error responses (500–599)"] += count
                else:
                    print(f"Unknown Code reached {response_code}")

                
                total_reqs = sum(response_code_counts.values())

    for response, count in response_code_counts.items():
        percentage = (count / total_reqs) * 100
        print(f"{response}\t{percentage:.2f}%")

get_percentages_code()



Informational responses (100–199)	0.00%
Successful responses (200–299)	90.33%
Redirection messages (300–399)	3.74%
Client error responses (400–499)	5.93%
Server error responses (500–599)	0.00%


### Mapper and Reducer functions

Here are the mapper and reducer functions used in map reduce processing. reducer function is pretty much the same except for changing variable names to account for request values vs return code values. mapper_code() used for debugging locally.

#### MAPPER (3.2)

In [None]:
#!/usr/bin/env python
'''request-code-mapper.py'''
import sys
import re


def main(argv):
    line = sys.stdin.readline()
    pattern = re.compile(r'\" \d{3}')
    try:
        while line:
            match = pattern.findall(line)
            if match:
                method = match[0] # Extract the request method (e.g., GET, POST, HEAD)
                print(f"LongValueSum:{method}\t1")
            line = sys.stdin.readline()
                
    except EOFError as error:
        return None


if __name__ == "__main__":
    main(sys.argv)
def mapper_code(debug=False):
    # actually see what function returns
    output_filename = 'mapper_log_code_output.txt'
    with open(output_filename, 'w') as output_file:
        with open('access.log.txt', 'r') as f:
            for line in f:
                # line = sys.stdin.readline()
                pattern = re.compile(r'\ \d{3}')
                match = pattern.findall(line)
                if match:
                    response_code = match[0]  # Extract the request response_code (e.g., 200, 400, 401, 500)
                    output_file.write(f"{response_code}\t1\n")
                    # print(f"{method}\t1")

mapper_code(debug=True)


#### REDUCER (3.2)

In [None]:
#!/usr/bin/env python

'''request-code-reducer-count.py'''
import sys

request_count = {}


for line in sys.stdin:
    code, count = line.strip().split('\t', 1)
    try:
        count  = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    if code not in request_count:
        request_count[code] = 0

    request_count[code] += count

for code, count in request_count.items():
    print(f"{code}\t{count}")


### (3.3) What 5 IP addresses generate the most client errors

For this question, I wrote my own mapper and reduce functions, `id-addr-mapper.py` and `id-addr-reducer.py`. The mapper will return each ip address that has a client error (400–499) along with a 1 for each row with that request type. This will be like: \

To use the log file, mapper, and reducer, I have to pull them into the repo that exists in the cluster.
```console
$ cd Quiz4
$ git pull
```

Next, run the map reduce command

```console
$ mapred streaming -file ~/Quiz4/ip-addr-mapper.py ~/Quiz4/ip-addr-reducer.py \
-mapper ip-addr-mapper.py   \
-reducer ip-addr-reducer.py \
-input /user/cmeade/access.log.txt \
-output /ip_address_client_errors_counts
```

`/ip_address_client_errors_counts` will have the part output files
```console
$ hdfs dfs -ls /ip_address_client_errors_counts
```
Found 4 items \
-rw-r--r--   1 cmeade6479 hadoop          0 2024-10-14 16:59 /ip_address_client_errors_counts/_SUCCESS \
-rw-r--r--   1 cmeade6479 hadoop       7897 2024-10-14 16:59 /ip_address_client_errors_counts/part-00000 \
-rw-r--r--   1 cmeade6479 hadoop       7887 2024-10-14 16:59 /ip_address_client_errors_counts/part-00001 \
-rw-r--r--   1 cmeade6479 hadoop       7643 2024-10-14 16:59 /ip_address_client_errors_counts/part-00002



The counts are split acoss different parts (00000, 00001, and 00002). So, combine all the part output files and output that into a text file code_counts.
```console
$ hdfs dfs -text /ip_address_client_errors_counts/part* > ip_counts.txt
```

This ouputs a long file in comparison to the first two questions, the format will look like:

```text
<!-- ip_counts.txt -->
LongValueSum:1.53.169.162       3
LongValueSum:101.100.129.49     1
LongValueSum:101.108.192.95     1
LongValueSum:101.51.59.107      1
LongValueSum:102.105.72.99      1
LongValueSum:102.156.146.164    1
LongValueSum:102.159.154.249    1
LongValueSum:102.42.167.175     1
LongValueSum:103.105.28.177     2
LongValueSum:103.105.35.113     4
LongValueSum:103.129.32.65      17
...
```


Clean up cluster space by deleting the directory since the txt file has been generated. Can also delete access log to clear space.
```console
$ hdfs dfs -rm -r /ip_address_client_errors_counts
$ hdfs dfs -rm -r  /user/cmeade/access.log.txt
```

### Post Process Step  (3.3)

The ip-addr-reducer reducer file will give the text ouput of the counts. Use top_five_ips_client_errors() post processing to sort these counts and return the top 5. After post processing, it can be determined that the top 5 IPs with the most client errors (in descending order) are 173.255.176.5, 212.9.160.24, 13.77.204.88, 51.210.243.185, and 193.106.30.100.

In [3]:
def top_five_ips_client_errors():
    ip_counts = {}
    with open('ip_counts.txt', 'r') as c:
        for line in c:
            # print(line)
            if line is not None or line != '\n':
                # varrying number of spaces so split on space and remove list elems that are empty/only spaces
                line_list = line.strip().split(' ')
                ip, count = [item for item in line_list if item.strip() != ""]
                count = int(count)
                ip_counts[ip] = count
                # total_reqs += count

    sorted_ip_counts = sorted(ip_counts.items(), key=lambda x: x[1], reverse=True)

    return sorted_ip_counts[0:5]

top_five_ips_client_errors()

[('LongValueSum:173.255.176.5', 2059),
 ('LongValueSum:212.9.160.24', 126),
 ('LongValueSum:13.77.204.88', 78),
 ('LongValueSum:51.210.243.185', 58),
 ('LongValueSum:193.106.30.100', 53)]

### Mapper and Reducer functions

Here are the mapper and reducer functions used in map reduce processing.

#### MAPPER (3.3)

In [None]:
#!/usr/bin/env python
'''id-addr-mapper.py'''
import sys
import re


def main(argv):
    line = sys.stdin.readline()
    pattern = re.compile(r'(?P<ip>\d{1,3}(?:\.\d{1,3}){3}) .*" [4][0-9][0-9] ')
    try:
        while line:
            match = pattern.findall(line)
            if match:
                ip = match[0] # Extract the request method (e.g., GET, POST, HEAD)
                print(f"LongValueSum:{ip}\t1")
            line = sys.stdin.readline()
                
    except EOFError as error:
        return None


if __name__ == "__main__":
    main(sys.argv)

#### REDUCER (3.3)

In [None]:
#!/usr/bin/env python

'''id-addr-reducer-count.py'''
import sys

ip_count = {}


for line in sys.stdin:
    ip, count = line.strip().split('\t', 1)
    try:
        count  = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    if ip not in ip_count:
        ip_count[ip] = 0

    ip_count[ip] += count

for ip, count in ip_count.items():
    print(f"{ip}\t{count}")


## 4. Presidential Speeches [15 points]

First we fetch the prez_speeches.zip file within the cluster

```console
$ wget https://raw.githubusercontent.com/singhj/big-data-repo/main/datasets/prez_speeches.zip
```

Then we unzip the speeches and put all directoires with .txt files into the GitHub repo under ~/Quiz4/uncompressed_speeches

We then put these files into hdfs

```console
$ hadoop fs -mkdir /user/cmeade/prez_speeches
$ hadoop fs -put ~/Quiz4/uncompressed_speeches /user/singhj/prez_speeches -- put this in the wrong dir accidentally
```

Confirm the files got there
```console
$ hdfs dfs -ls /user/singhj/prez_speeches
```

Found 43 items
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:23 /user/singhj/prez_speeches/adams
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:22 /user/singhj/prez_speeches/arthur
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:21 /user/singhj/prez_speeches/bharrison
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:22 /user/singhj/prez_speeches/buchanan
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:21 /user/singhj/prez_speeches/bush
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:21 /user/singhj/prez_speeches/carter
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:22 /user/singhj/prez_speeches/cleveland
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:23 /user/singhj/prez_speeches/clinton
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:21 /user/singhj/prez_speeches/coolidge
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:21 /user/singhj/prez_speeches/eisenhower
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:21 /user/singhj/prez_speeches/fdroosevelt
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:21 /user/singhj/prez_speeches/fillmore
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:23 /user/singhj/prez_speeches/ford
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:23 /user/singhj/prez_speeches/garfield
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:22 /user/singhj/prez_speeches/grant
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:22 /user/singhj/prez_speeches/gwbush
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:22 /user/singhj/prez_speeches/harding
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:22 /user/singhj/prez_speeches/harrison
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:23 /user/singhj/prez_speeches/hayes
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:21 /user/singhj/prez_speeches/hoover
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:22 /user/singhj/prez_speeches/jackson
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:20 /user/singhj/prez_speeches/jefferson
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:23 /user/singhj/prez_speeches/johnson
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:22 /user/singhj/prez_speeches/jqadams
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:22 /user/singhj/prez_speeches/kennedy
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:21 /user/singhj/prez_speeches/lbjohnson
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:21 /user/singhj/prez_speeches/lincoln
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:23 /user/singhj/prez_speeches/madison
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:21 /user/singhj/prez_speeches/mckinley
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:22 /user/singhj/prez_speeches/monroe
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:21 /user/singhj/prez_speeches/nixon
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:20 /user/singhj/prez_speeches/obama
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:21 /user/singhj/prez_speeches/pierce
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:21 /user/singhj/prez_speeches/polk
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:23 /user/singhj/prez_speeches/reagan
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:23 /user/singhj/prez_speeches/roosevelt
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:22 /user/singhj/prez_speeches/taft
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:22 /user/singhj/prez_speeches/taylor
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:21 /user/singhj/prez_speeches/truman
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:22 /user/singhj/prez_speeches/tyler
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:20 /user/singhj/prez_speeches/vanburen
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:22 /user/singhj/prez_speeches/washington
drwxr-xr-x   - cmeade6479 hadoop          0 2024-10-15 22:22 /user/singhj/prez_speeches/wilson


With all the files loaded into hdfs, now the directory can be passed in as input to the MR command.

```console
$ mapred streaming -file ~/Quiz4/prez_speeches_mapper.py ~/Quiz4/prez_speeches_reducer.py \
-mapper prez_speeches_mapper.py   \
-reducer prez_speeches_reducer.py \
-input /user/singhj/prez_speeches/*/* \
-output /president_speech_valence/all_prez
```


If running on an individual president, the command would be:

```console
$ mapred streaming -file ~/Quiz4/prez_speeches_mapper.py ~/Quiz4/prez_speeches_reducer.py \
-mapper prez_speeches_mapper.py   \
-reducer prez_speeches_reducer.py \
-input /user/singhj/prez_speeches/<president last name>/<president last name> \
-output /president_speech_valence/<president last name>
```

`/president_speech_valence/all_prez` will have the part output files
```console
$ hdfs dfs -ls /president_speech_valence/all_prez
```


The mapper outputs split acoss different parts. So, combine all the part output files and output that into a result txt file.
```console
$ hdfs dfs -text /president_speech_valence/<president last name>/part* > <president name>_results.txt
OR (to get all presidents)
$ hdfs dfs -text /president_speech_valence/all_prez > president_results.txt

```
hdfs dfs -text /president_speech_valence/harrison_2/part* 

### Post Process Step (4)
The mapper will print output of the form
(president name, valence score) 
adams	-2

The reducer will give output parts that when combined together will look like: \
"{predident_name}\ttotal\t{total_valence_words}"
adams   total   93

"{name}\t{net_v_score}"
adams   53



### Mapper and Reducer functions (4)

Here are the mapper and reducer functions used in map reduce processing. Maintain a dict with valence words and values. Clean text and remove stop words. Ouput president name and corresponding valence score for each valence word used. Reducer will take in valence output and get the net valence score and the total number of valence words.

#### MAPPER (4)

In [28]:
#!/usr/bin/env python3


'''prez_speeches_mapper.py'''
import sys
import re
import requests
import string
import os

stopwords_list = requests.get("https://gist.githubusercontent.com/rg089/35e00abf8941d72d419224cfd5b5925d/raw/12d899b70156fd0041fa9778d657330b024b959c/stopwords.txt").content
stopwords = list(set(stopwords_list.decode().splitlines()))
valence_words_data = requests.get("https://raw.githubusercontent.com/fnielsen/afinn/master/afinn/data/AFINN-en-165.txt").content
valencewords = list(set(valence_words_data.decode().splitlines()))
valence_dict = {}

for v in valencewords:
    word, score = v.split('\t')
    valence_dict[word] = score

def remove_stopwords(words):
    list_ = re.sub(r"[^a-zA-Z0-9]", " ", words.lower()).split()
    return [itm for itm in list_ if itm not in stopwords]

def clean_text(text):
    text = text.lower()
    text = re.sub('\[.*?\]', '', text)
    text = re.sub('[%s]' % re.escape(string.punctuation), ' ', text)
    text = re.sub('[\d\n]', ' ', text)
    return ' '.join(remove_stopwords(text))

def valence(text):
    return calc_valence(text)

def calc_valence(text):
    v = []
    if isinstance(text, bytes):
        text = text.decode('utf-8')
    text_list = text.split(' ')
    for w in text_list:
        if w in valence_dict:
            v.append(valence_dict[w])

    return v

# def main(argv):
def main(argv):
    president_name = 'missing prez name'
    line = sys.stdin.readline()
    try:
        while line:
            clean_line = clean_text(line) # returns a line as a space-seperated line, or a sentence if you will
            # fetch president name
            if "mapreduce_map_input_file" in os.environ:
                president_file_name = os.environ['mapreduce_map_input_file']
                president_name = re.sub(r'.*/|_speeches_\d+\.txt$', '', president_file_name)
            valence_vals = valence(clean_line)
            for v in valence_vals:
                print(f"{president_name}\t{v}")
            line = sys.stdin.readline()
    except EOFError as error:
        return None

if __name__ == "__main__":
    main(sys.argv)

#### REDUCER (4)

In [13]:
#!/usr/bin/env python
'''prez_speeches_reducer.py'''
import sys

valence_aggregate = {}
valence_words_ct = {}

for line in sys.stdin:
# with open('adams.txt', 'r') as c:  # debug local file
    president_name, valence_score = line.strip().split('\t', 1)
    try:
        valence_score  = int(valence_score)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    if president_name not in valence_aggregate:
        valence_aggregate[president_name] = 0

    valence_aggregate[president_name] += valence_score
    
    if president_name not in valence_words_ct:
        valence_words_ct[president_name] = 0

    valence_words_ct[president_name] += 1

for name, total_valence_words in valence_words_ct.items():
    print(f"{name}\t{total_valence_words}")

for name, net_v_score in valence_aggregate.items():
    print(f"{name}\ttotal\t{net_v_score}")


KeyboardInterrupt: 

#### (4) calc_valence(text) is a function that you write. Be sure to test this function under any imaginable conditions, for example:
- When text is empty,
- When text is a string of non-printable characters,
- When text is a bytecode string,


In [42]:
def valence(text):
    return calc_valence(text)

def calc_valence(text):
    v = []
    if isinstance(text, bytes):
        text = text.decode('utf-8')
    text_list = text.split(' ')
    for w in text_list:
        if w in valence_dict:
            v.append(valence_dict[w])

    return v

In [43]:
calc_valence('''<title="Address to Congress on Yalta">
<date="March 1, 1945">
I hope that you will pardon me for this unusual posture of sitting down during the presentation of what I want to say, but I know that you will realize that it makes it a lot easier for me not to have to carry about ten pounds of steel around on the bottom of my legs; and also because of the fact that I have just completed a fourteen-thousand-mile trip.
First of all, I want to say, it is good to be home.
It has been a long journey. I hope you will also agree that it has been, so far, a fruitful one.''')
# calc_valence on empty string
calc_valence('') # works, returns nothing
# calc_valence on string of non printable chars
non_printable = ''.join(chr(i) for i in range(32))
print(non_printable)
calc_valence(non_printable) # works, returns nothing
# calc_valence on a bytecode string
byte_str = b'foobar'
print(byte_str)
calc_valence(byte_str) 

 	

b'foobar'


[]

[2 points] How much data, in bytes, was emitted by the mappers?

When running on Taft's speeches, 252743 bytes were outputted by the mappers

When running on all speeches,

## 5. Hadoop Errors [15 points]

### 5.1 Where (what server & location) did the divide-by-zero error messages show up and how many did you find?

MR command:

```console
mapred streaming -files ~/Quiz4/mapper_noll_part5.py ~/big-data-repo/hadoop/reducer_noll.py \
-mapper mapper_noll_part5.py   \
-reducer reducer_noll.py \
-input /user/singhj/five-books \
-output /books-counts-part-5
```


#### MAPPER (5)

In [None]:
#!/usr/bin/env python

'''mapper_noll_part5.py'''
import sys, re
import random

def main(argv):
    line = sys.stdin.readline()
    pattern = re.compile("[a-zA-Z][a-zA-Z0-9]*")
    try:
        while line:
            x = 1 / random.randint(0,99)
            for word in pattern.findall(line):
                print ("LongValueSum:" + word.lower() + "\t" + "1")
                # x = 1 / random.randint(0,99)
            line = sys.stdin.readline()
    except EOFError as error:
        return None

if __name__ == "__main__":
    main(sys.argv)

### 5.2 How many such messages did you find? Is the count you found consistent with what you might expect from random.randint(0,99)?