#Unstructured Data and MapReduce
###April 1, 2015

In [2]:
rows = [
    """2015-03-22 16:36:13,475 - file - DEBUG - Debug FILE""",
    """2015-03-22 16:36:13,477 - werkzeug - INFO -  * Running on http://0.0.0.0:5000/""",
]
for row in rows:
    print row.split()

['2015-03-22', '16:36:13,475', '-', 'file', '-', 'DEBUG', '-', 'Debug', 'FILE']
['2015-03-22', '16:36:13,477', '-', 'werkzeug', '-', 'INFO', '-', '*', 'Running', 'on', 'http://0.0.0.0:5000/']


In [3]:
for row in rows:
    print row.split('-')

['2015', '03', '22 16:36:13,475 ', ' file ', ' DEBUG ', ' Debug FILE']
['2015', '03', '22 16:36:13,477 ', ' werkzeug ', ' INFO ', '  * Running on http://0.0.0.0:5000/']


In [4]:
for row in rows:
    print row.split(' - ')

['2015-03-22 16:36:13,475', 'file', 'DEBUG', 'Debug FILE']
['2015-03-22 16:36:13,477', 'werkzeug', 'INFO', ' * Running on http://0.0.0.0:5000/']


In [6]:
import datetime
def log_format(dt, source, level, desc):
    # this other form would also work, and be simpler:
    #return ' - '.join([str(dt), source, level, desc])
    return "%s - %s - %s - %s" % (dt, source, level, desc,)

print log_format(datetime.datetime.now(), 'file', 'DEBUG', 'Debug FILE')

2015-04-01 19:13:31.069156 - file - DEBUG - Debug FILE


In [10]:
import re
# regex compile allows us to name fields and parse logs in more flexible way.
# format: (?P<field_name>regex)
regex = r'(?P<date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d+) - (?P<source>\w+) - (?P<level>\w+) - (?P<desc>[\w\W]+)'
log_parser = re.compile(regex)
m = log_parser.search(rows[0]) ##Can test, if the search does not run, it's not parsing
print m.group('date')
print m.group('desc')
print m.groupdict()
m = log_parser.search(rows[1]) ##Can test, if the search does not run, it's not parsing
print '-' * 20
print 'ROW #1'
print m.group('date')
print m.group('desc')
print m.groupdict()

2015-03-22 16:36:13,475
Debug FILE
{'date': '2015-03-22 16:36:13,475', 'source': 'file', 'level': 'DEBUG', 'desc': 'Debug FILE'}
--------------------
ROW #1
2015-03-22 16:36:13,477
 * Running on http://0.0.0.0:5000/
{'date': '2015-03-22 16:36:13,477', 'source': 'werkzeug', 'level': 'INFO', 'desc': ' * Running on http://0.0.0.0:5000/'}


1. Should identify the program ran [cat, sed, sort] and arguments [error.log, -n '/etc/'] passed at each pipe (|)
    * cat error.log | sed -n '/access denied/p' | sort
2. Should identify AJAX call [POST], filename [congruence1.jpg], status call [200]
    * POST /static/img/congruence1.jpg HTTP/1.1 200
    * POST /static/img/team/arnold.png HTTP/1.1 404
3. Should identify gametime [+20 2nd period, +40 3rd period], team [WPG, NYR], who was penalized [Jiri Tlsusty], the penelty [slashing], penalty against [Carl Hagelin].
    * 1ST PERIOD
        * 05:20   WPG Jiri Tlusty  Slashing against  Carl Hagelin
        * 14:25   NYR Mats Zuccarello  Tripping against  Jim Slater
    * 2ND PERIOD
        * 09:31   WPG Mathieu Perreault  Interference against  Dominic Moore
        * 13:31   NYR Rick Nash  Hooking against  Jacob Trouba
    * 3RD PERIOD
        * 19:21   WPG Dustin Byfuglien  Slashing against  Mats Zuccarello

In [26]:
bash_ex = """cat error.log | sed -n '/access denied/p' | sort"""
delim_bash = bash_ex.split(' | ')

In [46]:
regex = r'(?P<program>\w+) (?P<arguments>\w+\W+\w+)'
log_parser = re.compile(regex)

for i in range(len(delim_bash)):
m = log_parser.search(delim_bash[i]) ##Can test, if the search does not run, it's not parsing
print m.group('program')
print m.group('arguments')
print m.groupdict()

cat
error.log
{'program': 'cat', 'arguments': 'error.log'}
access
denied/p
{'program': 'access', 'arguments': 'denied/p'}


AttributeError: 'NoneType' object has no attribute 'group'

In [50]:
splitter = re.compile("(?P<program>\w+)(?:\s+(?P<arg>\S.*))?")

for cmd in delim_bash:
    m = splitter.search(cmd.strip())
    if m:
        print m.groupdict()

{'program': 'cat', 'arg': 'error.log'}
{'program': 'sed', 'arg': "-n '/access denied/p'"}
{'program': 'sort', 'arg': None}


In [58]:
posts = ["""POST /static/img/congruence1.jpg HTTP/1.1 200""",
"""POST /static/img/team/arnold.png HTTP/1.1 404""",
]

In [72]:
regex = r'(?P<ajax>\w+) (/static/img/(?P<filename>\w+.w+))(HTTP/1.1\s(?P<error>\d[))'

regex = r'(?P<ajax>\w+\s)(?P<filename>/(\w+[\s\S])+\s)HTTP/\d.\d (?P<status>\d{3})'

splitter = re.compile(regex)
for post in posts:
    m = splitter.search(post)
    if m:
        print m.groupdict()

In [98]:
hockey = """
1ST PERIOD
05:20   WPG Jiri Tlusty  Slashing against  Carl Hagelin
14:25   NYR Mats Zuccarello  Tripping against  Jim Slater
2ND PERIOD
09:31   WPG Mathieu Perreault  Interference against  Dominic Moore
13:31   NYR Rick Nash  Hooking against  Jacob Trouba
3RD PERIOD
19:21   WPG Dustin Byfuglien  Slashing against  Mats Zuccarello""" 


In [99]:
regex = r'(\d{2}):(\d{2})\s+([A-Z]+)\s+(\w+\s\w+)\s+(\w+) against\s+(\w+\s\w+)'
parser = re.compile(regex)
add_minutes = 0
for line in hockey:
    if line == "2ND PERIOD":
        add_minutes = 20
    elif line == "3RD PERIOD":
        add_minutes = 40

        
matches = parser.findall(hockey)

for match in matches:
    minutes = int(match[0]) + add_minutes
    print "%d:%s|%s|%s|%s|%s" % (minutes, match[1], match[2], match[3], match[4], match[5])

5:20|WPG|Jiri Tlusty|Slashing|Carl Hagelin
14:25|NYR|Mats Zuccarello|Tripping|Jim Slater
9:31|WPG|Mathieu Perreault|Interference|Dominic Moore
13:31|NYR|Rick Nash|Hooking|Jacob Trouba
19:21|WPG|Dustin Byfuglien|Slashing|Mats Zuccarello


In [77]:
csvfile = [
    '36,0,3,0,1',
    '73,1,3,0,1',
    '30,0,3,0,1',
    '49,1,3,0,1',
    '47,1,11,0,1',
    '47,0,11,1,1',
    '46,0,5,0,1',
    '30,0,3,0,1',
    '52,0,4,0,1',
    '30,0,3,0,1',
]

from collections import defaultdict
age = defaultdict(int)
hits = defaultdict(int)
for row in csvfile:
    a, b, c, d, e = row.split(',')
    age[a] += 1
    hits[d] += 1

for k,v in age.items():
    print k, v
    
for k,v in hits.items():
    print k, v

49 1
46 1
47 2
30 3
36 1
52 1
73 1
1 1
0 9


In [78]:
ipythonlogs = [
    """[I 10:17:42.633 NotebookApp] Using MathJax from CDN: https://cdn.mathjax.org/mathjax/latest/MathJax.js""",
    """[W 10:17:42.670 NotebookApp] Terminals not available (error was No module named terminado)""",
    """[I 10:17:42.670 NotebookApp] Serving notebooks from local directory: /Users/macbook/projects/""",
    """[I 10:17:42.670 NotebookApp] 0 active kernels""",
    """[I 10:17:42.670 NotebookApp] The IPython Notebook is running at: http://localhost:8888/""",
    """[I 10:17:42.670 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).""",
    """[W 10:17:51.036 NotebookApp] 404 GET /api/kernels/ff97dfc7-e80a-49a5-9064-6d68d4fdbeb5/channels?session_id=7B3E0E7D6F024DA4A43433953CAF3B73 (::1): Kernel does not exist: ff97dfc7-e80a-49a5-9064-6d68d4fdbeb5""",
    """[W 10:17:51.051 NotebookApp] 404 GET /api/kernels/ff97dfc7-e80a-49a5-9064-6d68d4fdbeb5/channels?session_id=7B3E0E7D6F024DA4A43433953CAF3B73 (::1) 17.13ms referer=None""",
    """[I 10:18:09.405 NotebookApp] Kernel started: 82092ca9-abb3-4196-a967-0694c8a3bec4""",
    """[W 10:18:56.046 NotebookApp] 404 GET /api/kernels/ff97dfc7-e80a-49a5-9064-6d68d4fdbeb5/channels?session_id=7B3E0E7D6F024DA4A43433953CAF3B73 (::1): Kernel does not exist: ff97dfc7-e80a-49a5-9064-6d68d4fdbeb5""",
    """[W 10:18:56.047 NotebookApp] 404 GET /api/kernels/ff97dfc7-e80a-49a5-9064-6d68d4fdbeb5/channels?session_id=7B3E0E7D6F024DA4A43433953CAF3B73 (::1) 2.11ms referer=None""",
]
### aggregating the info [i] vs warning [w] should be relatively straightforward:
loggertypes = defaultdict(int)
for row in ipythonlogs:
    logtype = row[1]
    loggertypes[logtype] += 1

print loggertypes

### How do we aggregate types by minute?
loggertypes = defaultdict(int)
for row in ipythonlogs:
    logtype = row[1:8]
    # "sort" by time instead, so let's reorder the key:
    logtype = ' '.join([logtype.split()[1],logtype.split()[0]]) 
    loggertypes[logtype] += 1

print loggertypes

defaultdict(<type 'int'>, {'I': 6, 'W': 5})
defaultdict(<type 'int'>, {'10:18 W': 2, '10:17 I': 5, '10:18 I': 1, '10:17 W': 3})


**Work through a larger subset of penalties to aggregate**

1. penalties by team
2. penalties by player (either who did it or who it targeted)
3. penalties by period

In [94]:
penalties = """1ST PERIOD
02:16   ANA Tim Jackman  Fighting (maj) against  John Scott
02:16   SJS John Scott  Fighting (maj) against  Tim Jackman
12:53   SJS Adam Burish  Slashing against  Tim Jackman
15:38   SJS Matt Nieto  Hooking against  Matt Beleskey
18:38   ANA Tim Jackman  Hooking against  Tommy Wingels
2ND PERIOD
04:35   SJS Justin Braun  Holding against  Jakob Silfverberg
09:40   SJS Scott Hannan  Hi-sticking against  Sami Vatanen
09:40   ANA Sami Vatanen  Embellishment against  Scott Hannan
09:52   SJS Mirco Mueller  Unsportsmanlike conduct against  Ryan Getzlaf
09:52   ANA Ryan Getzlaf  Unsportsmanlike conduct against  Mirco Mueller
15:23   SJS Adam Burish  Delay Gm - Face-off Violation
18:57   SJS Tommy Wingels  Fighting (maj) against  Corey Perry
18:57   ANA Corey Perry  Fighting (maj) against  Tommy Wingels
3RD PERIOD
07:38   ANA Sami Vatanen  Holding against  Tommy Wingels
10:18   SJS Joe Pavelski  Fighting (maj) against  Ben Lovejoy
10:18   ANA Ben Lovejoy  Fighting (maj) against  Joe Pavelski
10:18   ANA Tim Jackman  Roughing against  Marc-Edouard Vlasic
10:18   ANA Tim Jackman  Roughing against  Marc-Edouard Vlasic
12:58   SJS Justin Braun  Misconduct (10 min) against  Corey Perry
12:58   ANA Corey Perry  Misconduct (10 min) against  Justin Braun
12:58   ANA Corey Perry  Roughing against  Justin Braun
12:58   SJS Justin Braun  Roughing against  Corey Perry
13:56   SJS Adam Burish  Roughing against  Nate Thompson
13:56   SJS Adam Burish  Misconduct (10 min)
13:56   SJS John Scott  Game misconduct
13:56   SJS John Scott  Fighting (maj) against  Tim Jackman
13:56   SJS John Scott  Instigator against  Tim Jackman
13:56   SJS John Scott  Player leaves bench - bench against  Hampus Lindholm
13:56   ANA Nate Thompson  Misconduct (10 min)
13:56   ANA Nate Thompson  Roughing against  Adam Burish
13:56   ANA Nate Thompson  Roughing against  Adam Burish
13:56   ANA Tim Jackman  Misconduct (10 min) against  John Scott
16:54   ANA Matt Beleskey  Misconduct (10 min)
16:54   ANA William Karlsson  Slashing against  Marc-Edouard Vlasic
16:54   ANA Ryan Getzlaf  Fighting (maj) against  James Sheppard
16:54   SJS James Sheppard  Fighting (maj) against  Ryan Getzlaf
16:54   ANA Ryan Kesler  Misconduct (10 min)"""


In [95]:
by_per = re.split(r'(\d{1}\w{2} PERIOD)', penalties)
print by_per

['', '1ST PERIOD', '\n02:16   ANA Tim Jackman  Fighting (maj) against  John Scott\n02:16   SJS John Scott  Fighting (maj) against  Tim Jackman\n12:53   SJS Adam Burish  Slashing against  Tim Jackman\n15:38   SJS Matt Nieto  Hooking against  Matt Beleskey\n18:38   ANA Tim Jackman  Hooking against  Tommy Wingels\n', '2ND PERIOD', '\n04:35   SJS Justin Braun  Holding against  Jakob Silfverberg\n09:40   SJS Scott Hannan  Hi-sticking against  Sami Vatanen\n09:40   ANA Sami Vatanen  Embellishment against  Scott Hannan\n09:52   SJS Mirco Mueller  Unsportsmanlike conduct against  Ryan Getzlaf\n09:52   ANA Ryan Getzlaf  Unsportsmanlike conduct against  Mirco Mueller\n15:23   SJS Adam Burish  Delay Gm - Face-off Violation\n18:57   SJS Tommy Wingels  Fighting (maj) against  Corey Perry\n18:57   ANA Corey Perry  Fighting (maj) against  Tommy Wingels\n', '3RD PERIOD', '\n07:38   ANA Sami Vatanen  Holding against  Tommy Wingels\n10:18   SJS Joe Pavelski  Fighting (maj) against  Ben Lovejoy\n10:18   

In [None]:
penalty_string = r'(?P<time>\d{2}:\d{2})   (?P<team>\w{3}) (?P<given_to>\w+ \w+)  (?P<penalty>[\w\(\-\)\s]+)?( against  )?(?P<player>\w+ \w+)?'

#Check in on this stuff ^^ maybe talk to teachers at office hours

#Creating a Mini Map Reduce Process 
##(Not MapReduce, that's Google's Thing)

In [107]:
def mapper(line):
    result = []
    # remove leading and trailing whitespace
    line = line.strip()
    # remove odd symbols from the text
    line = re.sub('[!"§$%&/()=?*#()\[\],.<>:;~_-]',"", line)
    # split the line into words
    words = line.split(" ")
    # insert the cleaned words into the results list
    for word in words:
        result.append((word.lower(), 1))
    # output is a list of (key, value) pairs
    return result

print mapper('the quick brown fox jumped over the lazy dog')

[('the', 1), ('quick', 1), ('brown', 1), ('fox', 1), ('jumped', 1), ('over', 1), ('the', 1), ('lazy', 1), ('dog', 1)]


In [108]:
def reducer(key, values):
    return key, sum(values)

In [109]:
def shuffle(words, print_shuffle=False):
    tmp = ""
    val_list = []
    for i in words:
        if tmp and i[0] != tmp:
            if print_shuffle:
                print tmp, val_list
            print reducer(tmp,val_list)
            val_list=[]
        tmp = i[0]
        val_list.append(i[1])
    # Don't forget to print out the last key value pair!
    if print_shuffle:
        print tmp, val_list
    print reducer(tmp,val_list)

In [104]:
# while this looks "backwards," mapper is called first,
# then we sort,
#then shuffle runs the reducer.
shuffle(sorted(mapper('the quick brown fox jumped over the lazy dog')), print_shuffle=True)

brown [1]
('brown', 1)
dog [1]
('dog', 1)
fox [1]
('fox', 1)
jumped [1]
('jumped', 1)
lazy [1]
('lazy', 1)
over [1]
('over', 1)
quick [1]
('quick', 1)
the [1, 1]
('the', 2)


In [114]:
green_eggs="I will not eat them in a house, i will not eat them with a mouse, i will not eat them in a box i will not eat them with a fox, i will not eat them here of there i will not eat them anywhere, I do not like green eggs and ham i do not like them sam i am."

In [115]:
shuffle(sorted(mapper(green_eggs)))

('a', 4)
('am', 1)
('and', 1)
('anywhere', 1)
('box', 1)
('do', 2)
('eat', 6)
('eggs', 1)
('fox', 1)
('green', 1)
('ham', 1)
('here', 1)
('house', 1)
('i', 9)
('in', 2)
('like', 2)
('mouse', 1)
('not', 8)
('of', 1)
('sam', 1)
('them', 7)
('there', 1)
('will', 6)
('with', 2)


In [116]:
import multiprocessing

def word_mapper(word):
    return (word, 1)

line = 'big big data big data science!'
words = line.split()
pool = multiprocessing.Pool(len(words))

mapped_words = pool.map(word_mapper, words)
pool.terminate()

shuffle(mapped_words)

('big', 2)
('data', 1)
('big', 1)
('data', 1)
('science!', 1)
