###  <font color='green' size=5> Lab 1: Map-Reduce on Hadoop  </font> 

**Data Engineering** <br>
**ANLT 214** <br>
**September 23, 2018** <br>
<br>

<font color='green' size=4> By: Ali Taheri</font>

At this lab, we would have an experience with paradigm of Map-Reduce on Hadoop that is running in aws instance of University of the Pacific. 
We have selecred about 50 S3 log files of Professor.Mike Williamson and we will try to extract some information or insights from them. 

First phase of our work is as following:

<ul>
<li>Capture the <a href="https://en.wikipedia.org/wiki/List_of_HTTP_status_codes">HTTP response codes</a> that are in the logs, and perform the following summary:

<ol>
<li>Get a <strong>list</strong> of all of the <strong>response codes</strong> and <strong>how many times</strong> they occurred.</li>
    

For this part, we develop following mapper and reducer. <br>
The mapper is as following:

In [8]:
#!/usr/bin/env python3
"""mapper.py

This code is for extraction of Http codes.
"""

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    strings = line.split()
    # increase counters
    for k in range(len(strings)):
        if strings[k][-9:] == 'HTTP/1.1"' :
            # write the results to STDOUT (standard output);
            # what we output here will be the input for the
            # Reduce step, i.e. the input for reducer.py
            #
            # tab-delimited; the trivial word count is 1
            print(strings[k+1], 1)

All codes are after 'HTTP/1.1', so we try to find all instances of it and then we could extract respective codes. <br>
The reducer is as following:

In [None]:
#!/usr/bin/env python3
"""reducer.py

This code is for enumeration of codes.
"""

from operator import itemgetter
import sys

current_code = None
current_count = 0
count = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    #word, count = line.split('\t', 1)
    code, count = line.split()

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_code == code:
        current_count += count
    else:
        if current_code:
            # write result to STDOUT
            print (current_code, current_count)
        current_count = count
        current_code = code

# do not forget to output the last word if needed!
if current_code == code:
    print (current_code, current_count)

Now, we go for runing map reduce for all of our log files. Command is as below:

yarn jar /home/ec2-user/hadoop-3.1.1/share/hadoop/tools/lib/hadoop-streaming-3.1.1.jar
-files /home/ec2-user/hadoop-3.1.1/ali/http_classification/mapper.py,
       /home/ec2-user/hadoop-3.1.1/ali/http_classification/reducer.py 
-mapper /home/ec2-user/hadoop-3.1.1/ali/http_classification/mapper.py 
-reducer /home/ec2-user/hadoop-3.1.1/ali/http_classification/reducer.py 
-input ali_logfiles/* 
-output ali_http_results

With running above yarn command, result would be written in a new folder with name of "ali_http_results" on HDFS and it is as below with running following command:

[ec2-user@ip-172-31-10-136 ~]$ hdfs dfs -cat ali_http_results/part-00000

200 792	
204 1	
206 78	
301 7	
307 43	
403 1	
404 3	

Report of hadoop for this job is as following:

<img src="1.jpg">

Second part of work is:

<li>Then <strong>aggregate</strong> these response codes into their <strong>generalized categories</strong>:

<ul>
<li><code>1xx</code>: info, <code>2xx</code>: success, <code>3xx</code>: redirection, <code>4xx</code>: client errors, and <code>5xx</code>: server errors</li>
<li>So you should have 2 summaries, one that has the individual code &amp; the number of times it occurred, and another that has the generalized higher level code &amp; the number of times it occurred.</li>
</ul></li>
</ol></li>

For finding categories, we could do it in above reducer, but we made another reducer as below for more experience.

In [None]:
#!/usr/bin/env python3
"""aggregator.py

This code is for enumeration of general categories.
"""

from operator import itemgetter
import sys

current_code = None
current_count = 0
count = None
cat_numbers={"1xx":0 , "2xx":0 , "3xx":0 , "4xx":0 , "5xx":0}

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
   
    code, count = line.split()

    # convert count (currently a string) to int
    try:
        count = int(count)
    
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_count:
        if current_code[0] == code[0] :
            current_count += count
            current_code = code
        else:
            # write result to STDOUT
            cat_numbers[current_code[0]+"xx"] = current_count
            current_count = count
            current_code = code
           
        
    else:
        current_count = count
        current_code = code
        

# do not forget to output the last code if needed!
if current_code[0] == code[0]:
    cat_numbers[current_code[0]+"xx"] += current_count
    
total = sum(cat_numbers.values())
for cat in cat_numbers:
    print(cat ,":", cat_numbers[cat] , "   "+str(round(100 * cat_numbers[cat] / total))+"%")
print("Total = " +str(total))

Now, we run following yarn commad:

yarn jar /home/ec2-user/hadoop-3.1.1/share/hadoop/tools/lib/hadoop-streaming-3.1.1.jar
-files /home/ec2-user/hadoop-3.1.1/ali/http_categorization/mapper.py,
/home/ec2-user/hadoop-3.1.1/ali/http_categorization/reducer.py
-mapper /home/ec2-user/hadoop-3.1.1/ali/http_categorization/mapper.py
-reducer /home/ec2-user/hadoop-3.1.1/ali/http_categorization/reducer.py 
-input ali_logfiles/* 
-output ali_http_cat_results


After running of yarn, results are as following:

[ec2-user@ip-172-31-10-136 ~]$ hdfs dfs -cat ali_http_cat_results/part-00000

 1xx : 000  ------  0%	
 2xx : 871  ------ 94%	
 3xx : 050  ------ 5%	
 4xx : 004  ------ 0%	
 5xx : 000  ------ 0%	
 Total = 925

We could see that a lot of http codes are reporing successful operations and we have a tiny number of client errors. <br>
Report of hadoop is as below:

<img src="2.jpg">

At this section we want to do a fun part! We want to extract all IP addresses that Professor Mike has connected with those addresses to aws. 

<li><strong>Discover</strong> my (Prof. Mike&#39;s) <strong>home IP address</strong> and what my <a href="https://docs.aws.amazon.com/general/latest/gr/acct-identifiers.html"><strong>canonical user id</strong></a> is at AWS.

<ul>
<li>This is perhaps a bit trickier, but maybe more fun, too. Remember what courses I teach (Data Wrangling, Intro to Data Viz, Software Methods, Visual Storytelling, and Dynamic Visualization, along with this course).</li>
</ul></li>
</ul>

Our mapper would be as following:

In [None]:
#!/usr/bin/env python3
"""mapper.py

This code is for extraction of ip addresses and aws canoical user ID's.
Ali Taheri
"""

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    strings = line.split()
    # increase counters
    for k in range(len(strings)):
        if strings[k][-6:] == '+0000]' :
            # write the results to STDOUT (standard output);
            # what we output here will be the input for the
            # Reduce step, i.e. the input for reducer.py
            #
            # tab-delimited; the trivial word count is 1
            print(strings[k+1], 1)
        elif ((strings[k][0:4] == "data") or (strings[k][0:4] == "intr") or (strings[k][0:4] == "soft") \
            or (strings[k][0:4] == "visu") or (strings[k][0:4] == "dyna")) and strings[k][-7:] != ".tar.gz" :
            print(strings[k-1], 1)

In above code, we extract all ip addresses and canonical user id's in logfiles. <br>
Reducer is as below:

In [None]:
#!/usr/bin/env python3
"""reducer.py

This code is for enumeration of ip addresses and aws canoical user ID's.
Ali Taheri
"""

from operator import itemgetter
import sys

current_code = None
current_count = 0
count = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    #word, count = line.split('\t', 1)
    code, count = line.split()

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_code == code:
        current_count += count
    else:
        if current_code:
            # write result to STDOUT
            print (current_code, current_count)
        current_count = count
        current_code = code

# do not forget to output the last word if needed!
if current_code == code:
    print (current_code, current_count)


Now, we could run following yarn command:

yarn jar /home/ec2-user/hadoop-3.1.1/share/hadoop/tools/lib/hadoop-streaming-3.1.1.jar
-files /home/ec2-user/hadoop-3.1.1/ali/ip_address/mapper.py,
/home/ec2-user/hadoop-3.1.1/ali/ip_address/reducer.py 
-mapper /home/ec2-user/hadoop-3.1.1/ali/ip_address/mapper.py 
-reducer /home/ec2-user/hadoop-3.1.1/ali/ip_address/reducer.py 
-input  ali_logfiles/* 
-output ali_ip_cid_results

Results are as following:

[ec2-user@ip-172-31-10-136 ~]$ hdfs dfs -cat ali_ip_cid_results/part-00000

10.233.7.37   ----    1	
136.60.153.92  ----    1	
138.9.5.26 ---- 1	
138.9.5.27 ---- 81	
138.9.5.39 ---- 233	
138.9.5.61 ---- 773	
138.9.57.32 ---- 1	
5.189.142.136 ---- 1	
52.95.24.235 ---- 1	
54.212.180.219 ---- 1	
66.192.183.244 ---- 1	
71.199.44.171 ---- 1	
99.73.92.216 ---- 110	
f0a3b2b89cd97cc38e2763d36fd696e8d106c2ed968e0c81643c6688f994076c ---- 1203	

With a survey, we could find that subnet of 138.9.x.x is for University of the Pacific in Stockton and it shows that Professor Mike is connecting to aws a lot from Stockton campus of UOP. <br> <br>
It seems that ip address of 99.73.92.216 is for house of Professor Mike. It proves that he lives in San Mateo and his internet provider is AT&T (U-verse). <br> <br>

Other ip addresses are very rare and maybe he connected to aws from those places. They are from following cities:<br>
1- Herriman, Utah <br>
2- Salt Lake City, Utah <br>
3- Seattle, Washigton <br>
4- Boardman, Oregon <br>
5- Columbus, Ohio <br>

Those ip addresses do not prove that he was in those places, because sometimes providers are using ip addresses in different places. Specially this could happrn in mobile networks. <br> <br>

Finally, his canonical user id is "f0a3b2b89cd97cc38e2763d36fd696e8d106c2ed968e0c81643c6688f994076c" that has been repeated for 1203 times. <br> <br>

Report of Hadoop is as below:


<img src="3.jpg">