In [1]:
%%html
<link rel="stylesheet" href="static/hyrule.css" type="text/css">

# Unstructured Data and Mapreduce

## Objectives

* Gain familiarity with NoSQL and nonrelational data
* Parsing and aggregating unstructured data through the command line and python
* Exposure to the mapreduce framework

## Class Notes

### What is NoSQL?

NoSQL are databases that do not use the traditional table and field based system we are familiar with in SQL databases. Some of the more common NoSQL databases include:

* **Key/Value**: simple variable datastore primary used in databases like Redis and Memcached. They are primary used for caching and fast retrieval of small data.
* **BSON**: a binary JSON derivitive. Primary used in Mongo and CouchDB. rows become "documents" and tables become "collections" (in the mongodb syntax)
* **Graph**: Uses the idea of a relationship from relational databases to make networks. Most common name is neo4j, use cases are best summarised [here](http://neo4j.com/use-cases/).


#### What about data not in a database?
Hadoop, for example, is _not_ a database, but a filesystem (HDFS). If we want to think about how hadoop works, we need to step away from databases and think about how we interact with data in files.

### How we generate structure from unstructured data?

Imagine we have data coming from logs, like so:

```
2013-07-22 16:36:13,475 - file - DEBUG - Debug FILE
2013-07-22 16:36:13,477 - werkzeug - INFO -  * Running on http://0.0.0.0:5000/
```

Often there is some hidden structure to the data, it may not be crystal clear or simple. For example, consider using a str split from python:

In [2]:
rows = [
    """2015-03-22 16:36:13,475 - file - DEBUG - Debug FILE""",
    """2015-03-22 16:36:13,477 - werkzeug - INFO -  * Running on http://0.0.0.0:5000/""",
]
for row in rows:
    print row.split()

['2015-03-22', '16:36:13,475', '-', 'file', '-', 'DEBUG', '-', 'Debug', 'FILE']
['2015-03-22', '16:36:13,477', '-', 'werkzeug', '-', 'INFO', '-', '*', 'Running', 'on', 'http://0.0.0.0:5000/']


1. What does using str.split() do well, in this case?
2. Where does it seem to fall apart?

What could be another delimeter for us to split on?

In [3]:
for row in rows:
    print row.split('-')

['2015', '03', '22 16:36:13,475 ', ' file ', ' DEBUG ', ' Debug FILE']
['2015', '03', '22 16:36:13,477 ', ' werkzeug ', ' INFO ', '  * Running on http://0.0.0.0:5000/']


1. How does using the - delimeter improve?
2. How could we make this even better?

We _could_ continue iterating through to determine what the best "cleaned up" version of our transformation would be.  We _should_ also consider reading through the log and determine the pattern:

```
2015-03-22 16:36:13,475 - file - DEBUG - Debug FILE
```

Really is modeled to be a base string like this:

```
datetime - source - log level - log description
```

We can easily fill in a log format this way to make the logs:

In [4]:
import datetime
def log_format(dt, source, level, desc):
    # this other form would also work, and be simpler:
    #return ' - '.join([str(dt), source, level, desc])
    return "%s - %s - %s - %s" % (dt, source, level, desc,)

print log_format(datetime.datetime.now(), 'file', 'DEBUG', 'Debug FILE')

2015-04-01 16:10:34.870869 - file - DEBUG - Debug FILE


How do we reverse the pattern?

In [5]:
import re
# regex compile allows us to name fields and parse logs in more flexible way.
# format: (?P<field_name>regex)
regex = r'(?P<date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d+) - (?P<source>\w+) - (?P<level>\w+) - (?P<desc>[\w\W]+)'
log_parser = re.compile(regex)
m = log_parser.search(rows[0])
print m.group('date')
print m.group('desc')
print m.groupdict()

2015-03-22 16:36:13,475
Debug FILE
{'date': '2015-03-22 16:36:13,475', 'source': 'file', 'level': 'DEBUG', 'desc': 'Debug FILE'}


### Try it out: Mapping Key Values

With the following three examples, write a regex string (r'') that creates fields based on groups you define. If your search returns None, then the pattern is not matching. Refer to Sally's tools to practice the concept:

[1] should identify the program ran `[cat, sed, sort]` and arguments `[error.log, -n '/etc/']` passed at each pipe (|)
```
cat error.log | sed -n '/access denied/p' | sort
```

[2] should identify AJAX call `[POST]`, filename `[congruence1.jpg]`, status call `[200]`
```
POST /static/img/congruence1.jpg HTTP/1.1 200
POST /static/img/team/arnold.png HTTP/1.1 404
```

[3] should identify gametime [+20 2nd period, +40 3rd period], team `[WPG, NYR]`, who was penalized `[Jiri Tlsusty]`, the penelty `[slashing]`, penalty against `[Carl Hagelin]`.
```
1ST PERIOD
05:20   WPG Jiri Tlusty  Slashing against  Carl Hagelin
14:25   NYR Mats Zuccarello  Tripping against  Jim Slater
2ND PERIOD
09:31   WPG Mathieu Perreault  Interference against  Dominic Moore
13:31   NYR Rick Nash  Hooking against  Jacob Trouba
3RD PERIOD
19:21   WPG Dustin Byfuglien  Slashing against  Mats Zuccarello
```
**Bonus**: Connect the resulting dictionary into a pandas dataframe.

### Aggregating unstructured data
Manipulating the unstructured data into a pandas dataframe for data exploration seems intuitive and incredibly useful, but how about when we already know the shape we need, and we're now looking to optimize performance? Let's practice aggregating data as we processed it.

We can start with a csv file, which while technically structured, can be aggregated more traditionally:

```
36,0,3,0,1
73,1,3,0,1
30,0,3,0,1
49,1,3,0,1
47,1,11,0,1
47,0,11,1,1
46,0,5,0,1
16,0,3,0,1
52,0,4,0,1
21,0,3,0,1
```

We're interested in aggregating two different columns: the first [let's call it age], and the 3rd [let's call it hits]. The pythonist in us says:

1. Find the first column
2. Make that our "key"
3. Set or add to a count [1].

And likewise for the 3rd.

In [6]:
csvfile = [
    '36,0,3,0,1',
    '73,1,3,0,1',
    '30,0,3,0,1',
    '49,1,3,0,1',
    '47,1,11,0,1',
    '47,0,11,1,1',
    '46,0,5,0,1',
    '30,0,3,0,1',
    '52,0,4,0,1',
    '30,0,3,0,1',
]

from collections import defaultdict
age = defaultdict(int)
hits = defaultdict(int)
for row in csvfile:
    a, b, c, d, e = row.split(',')
    age[a] += 1
    hits[d] += 1

for k,v in age.items():
    print k, v
    
for k,v in hits.items():
    print k, v

49 1
46 1
47 2
30 3
36 1
52 1
73 1
1 1
0 9


We know a faster interpretation would be pandas, but instead, consider this counter as a _task_. And if we are not working with structured data, putting the structure into pandas and then shaping the summary is an additional task.

What if we were working with ipython logs?

In [7]:
ipythonlogs = [
    """[I 10:17:42.633 NotebookApp] Using MathJax from CDN: https://cdn.mathjax.org/mathjax/latest/MathJax.js""",
    """[W 10:17:42.670 NotebookApp] Terminals not available (error was No module named terminado)""",
    """[I 10:17:42.670 NotebookApp] Serving notebooks from local directory: /Users/macbook/projects/""",
    """[I 10:17:42.670 NotebookApp] 0 active kernels""",
    """[I 10:17:42.670 NotebookApp] The IPython Notebook is running at: http://localhost:8888/""",
    """[I 10:17:42.670 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).""",
    """[W 10:17:51.036 NotebookApp] 404 GET /api/kernels/ff97dfc7-e80a-49a5-9064-6d68d4fdbeb5/channels?session_id=7B3E0E7D6F024DA4A43433953CAF3B73 (::1): Kernel does not exist: ff97dfc7-e80a-49a5-9064-6d68d4fdbeb5""",
    """[W 10:17:51.051 NotebookApp] 404 GET /api/kernels/ff97dfc7-e80a-49a5-9064-6d68d4fdbeb5/channels?session_id=7B3E0E7D6F024DA4A43433953CAF3B73 (::1) 17.13ms referer=None""",
    """[I 10:18:09.405 NotebookApp] Kernel started: 82092ca9-abb3-4196-a967-0694c8a3bec4""",
    """[W 10:18:56.046 NotebookApp] 404 GET /api/kernels/ff97dfc7-e80a-49a5-9064-6d68d4fdbeb5/channels?session_id=7B3E0E7D6F024DA4A43433953CAF3B73 (::1): Kernel does not exist: ff97dfc7-e80a-49a5-9064-6d68d4fdbeb5""",
    """[W 10:18:56.047 NotebookApp] 404 GET /api/kernels/ff97dfc7-e80a-49a5-9064-6d68d4fdbeb5/channels?session_id=7B3E0E7D6F024DA4A43433953CAF3B73 (::1) 2.11ms referer=None""",
]
### aggregating the info [i] vs warning [w] should be relatively straightforward:
loggertypes = defaultdict(int)
for row in ipythonlogs:
    logtype = row[1]
    loggertypes[logtype] += 1

print loggertypes

### How do we aggregate types by minute?
loggertypes = defaultdict(int)
for row in ipythonlogs:
    logtype = row[1:8]
    # "sort" by time instead, so let's reorder the key:
    logtype = ' '.join([logtype.split()[1],logtype.split()[0]]) 
    loggertypes[logtype] += 1

print loggertypes

defaultdict(<type 'int'>, {'I': 6, 'W': 5})
defaultdict(<type 'int'>, {'10:18 W': 2, '10:17 I': 5, '10:18 I': 1, '10:17 W': 3})


#### Try it out: Aggregating Key Values

[1] Work through a larger subset of penalties to aggregate

1. penalties by team
2. penalties by player (either who did it or who it targeted)
3. penalties by period

```
1ST PERIOD
02:16   ANA Tim Jackman  Fighting (maj) against  John Scott
02:16   SJS John Scott  Fighting (maj) against  Tim Jackman
12:53   SJS Adam Burish  Slashing against  Tim Jackman
15:38   SJS Matt Nieto  Hooking against  Matt Beleskey
18:38   ANA Tim Jackman  Hooking against  Tommy Wingels
2ND PERIOD
04:35   SJS Justin Braun  Holding against  Jakob Silfverberg
09:40   SJS Scott Hannan  Hi-sticking against  Sami Vatanen
09:40   ANA Sami Vatanen  Embellishment against  Scott Hannan
09:52   SJS Mirco Mueller  Unsportsmanlike conduct against  Ryan Getzlaf
09:52   ANA Ryan Getzlaf  Unsportsmanlike conduct against  Mirco Mueller
15:23   SJS Adam Burish  Delay Gm - Face-off Violation
18:57   SJS Tommy Wingels  Fighting (maj) against  Corey Perry
18:57   ANA Corey Perry  Fighting (maj) against  Tommy Wingels
3RD PERIOD
07:38   ANA Sami Vatanen  Holding against  Tommy Wingels
10:18   SJS Joe Pavelski  Fighting (maj) against  Ben Lovejoy
10:18   ANA Ben Lovejoy  Fighting (maj) against  Joe Pavelski
10:18   ANA Tim Jackman  Roughing against  Marc-Edouard Vlasic
10:18   ANA Tim Jackman  Roughing against  Marc-Edouard Vlasic
12:58   SJS Justin Braun  Misconduct (10 min) against  Corey Perry
12:58   ANA Corey Perry  Misconduct (10 min) against  Justin Braun
12:58   ANA Corey Perry  Roughing against  Justin Braun
12:58   SJS Justin Braun  Roughing against  Corey Perry
13:56   SJS Adam Burish  Roughing against  Nate Thompson
13:56   SJS Adam Burish  Misconduct (10 min)
13:56   SJS John Scott  Game misconduct
13:56   SJS John Scott  Fighting (maj) against  Tim Jackman
13:56   SJS John Scott  Instigator against  Tim Jackman
13:56   SJS John Scott  Player leaves bench - bench against  Hampus Lindholm
13:56   ANA Nate Thompson  Misconduct (10 min)
13:56   ANA Nate Thompson  Roughing against  Adam Burish
13:56   ANA Nate Thompson  Roughing against  Adam Burish
13:56   ANA Tim Jackman  Misconduct (10 min) against  John Scott
16:54   ANA Matt Beleskey  Misconduct (10 min)
16:54   ANA William Karlsson  Slashing against  Marc-Edouard Vlasic
16:54   ANA Ryan Getzlaf  Fighting (maj) against  James Sheppard
16:54   SJS James Sheppard  Fighting (maj) against  Ryan Getzlaf
16:54   ANA Ryan Kesler  Misconduct (10 min)
```

[2] In sample.access.log, filter down to the 404s and aggregate by day the full url and client. Example below:

url: `/tag/dork/`  
client: `Mozilla/5.0 (compatible; Ezooms/1.0; ezooms.bot@gmail.com)`
```
127.0.0.1 - - [05/Dec/2011:16:45:39 -0500] "GET /tag/dork/ HTTP/1.0" 404 29262 "-" "Mozilla/5.0 (compatible; Ezooms/1.0; ezooms.bot@gmail.com)"
```

For question 2, let's use python [standard input](http://stackoverflow.com/questions/1450393/how-do-you-read-from-stdin-in-python) and save it as a script so it should work like below, printing to standard out.

```sh
cat sample.access.log | log_agg.py
```


### What is mapreduce?
Mapreduce is a concept of splitting work across a system of small computers, but ultimately, explains processing in the following steps:

1. Map: Produce key|value pairs depending on the task
2. Shuffle|Sort: Sorts the key|value pairs
3. Reduce: Combines pairs into a single output.

Ideally this follows the pattern across a distributed network of computers:

1. Mappers: computers tasked to take in data and run a map task
2. Sorters: computers tasked to take the data from the mappers and sort
3. Reducers: computers tasked to take in the sorted data and reduce the information

<img src='img/mapreduce.jpg' width='800'>

The three tasks are technically synchronous. This means that the computers will not start reducing until all mappers have completed their jobs. It's important for the processor or system in place to do its best to split the data processing evenly: if you have one mapper doing all the work, there is no advantage!

Tasks also also (typically) solved linearly. That is, we want to limit the number of passes through the data. So , consider how we could solve each of these reducer tasks:

`count`: We've solved this already! Take the previous count and add to it. mapreduce will want to presort keys so it is not searching for keys on the fly. What is something that could be solved similar to count?

`max or min`: How would we solve these?

Where would machine learning make sense in map reduce? Where would it not?

#### Sidenote: What is MapReduce?
MapReduce is Google's internal platform, though it has since been generalized.

#### How can we reproduce this behavior on our own computers?
There's two relatively straight forward ways we could consider emulating mapreduce:

1. Run python scripts in shell with the shell sort command to get a sense of how data is funnelled:
    ```sh
    cat data.txt | mapper.py | sort | reducer.py
    ```
2. Run a python script with multiprocessing pools to see how processing can be expedited with multiple core computers.


#### mapper example

A mapper should take in a queue of data, and then spit out a queue of data. For passing data between functions, we'll use a list of tuples (since they are immutable); otherwise we'd use standard out.

In [8]:
def mapper(line):
    result = []
    # remove leading and trailing whitespace
    line = line.strip()
    # remove odd symbols from the text
    line = re.sub('[!"§$%&/()=?*#()\[\],.<>:;~_-]',"", line)
    # split the line into words
    words = line.split(" ")
    # insert the cleaned words into the results list
    for word in words:
        result.append((word, 1))
    # output is a list of (key, value) pairs
    return result

print mapper('the quick brown fox jumped over the lazy dog')

[('the', 1), ('quick', 1), ('brown', 1), ('fox', 1), ('jumped', 1), ('over', 1), ('the', 1), ('lazy', 1), ('dog', 1)]


#### reducer example
Our reducer takes in a key and list of values to reduce:

In [9]:
def reducer(key, values):
    return key, sum(values)

#### shuffle example

Shuffle runs very similarly to how we've done aggregations before:

In [10]:
def shuffle(words, print_shuffle=False):
    tmp = ""
    val_list = []
    for i in words:
        if tmp and i[0] != tmp:
            if print_shuffle:
                print tmp, val_list
            print reducer(tmp,val_list)
            val_list=[]
        tmp = i[0]
        val_list.append(i[1])
    # Don't forget to print out the last key value pair!
    if print_shuffle:
        print tmp, val_list
    print reducer(tmp,val_list)

In [11]:
# while this looks "backwards," mapper is called first,
# then we sort,
#then shuffle runs the reducer.
shuffle(sorted(mapper('the quick brown fox jumped over the lazy dog')))

('brown', 1)
('dog', 1)
('fox', 1)
('jumped', 1)
('lazy', 1)
('over', 1)
('quick', 1)
('the', 2)


You can view another example by echoing or cat text into the mapper.py file, sorting, and then running reducer.py:

```sh
echo 'big big data big data science!' | python mapper.py | sort | python reducer.py
```

Likewise, our example using multiprocessing for a map function:

In [12]:
import multiprocessing

def word_mapper(word):
    return (word, 1)

line = 'big big data big data science!'
words = line.split()
pool = multiprocessing.Pool(len(words))

mapped_words = pool.map(word_mapper, words)
pool.terminate()

shuffle(mapped_words)

('big', 2)
('data', 1)
('big', 1)
('data', 1)
('science!', 1)


## Practice 

Pick one of the larger tasks above (either the long penalty list or the access log) and practice the mapper | reducer split. For the most part, this should be taking your code apart a bit and determining what you are mapping into key value pairs, and how you are reducing (the reducing will probably be very similar, if not exactly, to the above)

## Review, Reading, Next Steps

#### More on multiprocessing in python
* [mapreduce with python multiprocessing](https://mikecvet.wordpress.com/2010/07/02/parallel-mapreduce-in-python/)
* [parallelization in ipython](http://ipython.org/ipython-doc/dev/parallel/)

#### Hadoop
For those who want to further engage themselves and see how a _basic_ hadoop system works, consider:

* Reading through the two tutorials on [setting up a hadoop cluster](http://www.michael-noll.com/tutorials/running-hadoop-on-ubuntu-linux-single-node-cluster/) and [running python scripts in hadoop](http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/). Consider using a virtual machine to keep everything tidy so it does not mess with your computer!
* [Hortonworks](http://hortonworks.com/tutorials/#tuts-analysts) provides a variety of tutorials for data analysts and scientists to learn hadoop and hadoop's scripting languages, and they have a sandbox virtual machine you can practice on.
* This [Spark](http://lintool.github.io/SparkTutorial/) tutorial can start you in the more "new" direction, while Spark currently remains hot technology.

Additional reading:

* [Forbes: Is it Time for Hadoop Alternatives?](http://www.forbes.com/sites/johnwebster/2014/12/08/is-it-time-for-hadoop-alternatives/)
* [IBM: What is MapReduce?](http://www-01.ibm.com/software/data/infosphere/hadoop/mapreduce/)
* [Wakari MapReduce IPython notebook](https://www.wakari.io/sharing/bundle/nkorf/MapReduce%20Example)