# Big Data Analysis

This week is about big data analysis.

## EXERCISE 1: The Three Vs of Big Data

Discuss per table:
1. your own experiences with Big Data and which kinds of challenges you or your organisation are facing. 
2. Are all three Vs required to make a data analytics problem a Big Data problem? 
3. Can you think of any other characteristics which could be helpful to define the meaning of Big Data?

# STOP PLEASE. THE FOLLOWING IS FOR THE NEXT EXERCISE. THANKS.

## EXERCISE 2: Map Reduce Word Count

In the following exercise we will look at the principles of Map Reduce programming.<br/>
Note that this exercise is not assuming any parallelism or distribution of code, but that everything is running on a single computer.

The idea behind Map Reduce can be shown on the simple example of Word Count:

Given a potentially large collection of text documents, the goal is to determine the number of individual words used in these texts.

A straight-foward, procedural approach to solve this problem in Python would be as follows.
Note: The following example is taken from "Data Science from Scratch":

In [1]:
# first we introduce a utility function to tokenize a given document string into a list of words

import re

def tokenize(message):
    message = message.lower()                       # convert to lowercase
    all_words = re.findall("[a-z0-9']+", message)   # extract the words
    return set(all_words)                           # remove duplicates

In [2]:
from collections import Counter

def word_count_old(documents):
    """word count not using MapReduce"""
    return Counter(word
        for document in documents
            for word in tokenize(document))

Let's try this out with a short test set of the following three *documents* (a document is simply a sequence of words):

<pre>{"data science",
 "big data",
 "science fiction"}</pre>

In [3]:
word_count_old({"data science", "big data", "science fiction"})

Counter({'big': 1, 'data': 2, 'fiction': 1, 'science': 2})

This approach reads and counts all word occurences in a sequential way. The more documents and the longer these documents, the longer this approach takes. For a real Big Data setting with large volumes it will however get too slow. Hence we would like to turn this into a parallelisable Map-Reduce approach.

For this we need to provide three things:
1. a mapper function
2. a reducer function
3. some control code that manages the data flow from the inputs through the mapper to the reducer

**Mapper**. The fist step is to write a mapper function that transforms the given input - in our case a text document - into a set of intermediate (key,value) pairs. For the given WordCount example, we write a *wc_mapper()* function that turns a given document into a sequence of word-count pairs. We chose single words as the key because we need this output in the next step to be grouped by word to be able to determine the overall word count. The mapper will emit for each word just the value 1 to indicate that this pair corresponds to one occurrence of the word:

In [4]:
def wc_mapper(document):
    """for each word in the document, emit (word,1)"""
    for word in tokenize(document):
        yield (word, 1)

**Reducer.** The reducer expects as input a key and a list of associated values. In our case, the "plumming" code (which we will discuss in the next step) will call the reducer with a word as key and a list of all its occurrences counts that the mapper above had emitted. The reducer then produces a single word count for the given word by summing over all the individual counts given:

In [5]:
def wc_reducer(word, counts):
    """sum up the counts for a word"""
    yield (word, sum(counts))

Finally, we need the "plumming" code that controls the data flow between mapper and reducer.
Let’s think about how we would do this on just one computer:

**Control Code:** The following code loops through all the documentsm calls the *wc_mapper()* for each document and collects all emitted *(word, 1)* in an intermediate collector dictionary. This internal *collector* dictionary data structure does a lot of useful work for us, including groupping counts for the same word together.

After mapping all input document, the control code then will go through the collected word-count pairs and call the *wc_reducer()* function for each word with all its counts together. 

In [6]:
from collections import defaultdict

def word_count(documents):
    """count the words in the input documents using MapReduce"""

    # place to store grouped values
    collector = defaultdict(list)

    for document in documents:
        for word, count in wc_mapper(document):
            collector[word].append(count)

    return [output
            for word, counts in collector.items()
            for output in wc_reducer(word, counts)]

Finally, let's try this code.

## YOUR TASK 1

Imagine that we have the same three documents as above:
<pre> ["data science", "big data", "science fiction"] </pre>

1. Write the code to call the *word_count()* on those three documents and document the result.
2. Also check what the result of just the *wc_mapper()* is with this input data.

In [7]:
# your code here
docs = ["data science", "big data", "science fiction"] 
word_count(docs)
for doc in docs:
    for x in wc_mapper(doc):
        print (x)
        
#for x in [wc_mapper(doc) for doc in docs]:
#    print(next(x))

('science', 1)
('data', 1)
('big', 1)
('data', 1)
('science', 1)
('fiction', 1)


### MapReduce more general

If you think about it for a minute, all of the word-count-specific code in the previous example is contained in the *wc_mapper* and *wc_reducer* functions. This means that with a couple of changes we have a much more general framework (that still runs on a single machine):

In [8]:
def map_reduce(inputs, mapper, reducer):
    """runs MapReduce on the inputs using mapper and reducer"""
    collector = defaultdict(list)

    for input in inputs:
        for key, value in mapper(input):
            collector[key].append(value)

    return [output
            for key, values in collector.items()
            for output in reducer(key,values)]

## YOUR TASK 2

Use above's *map_reduce()* function and the previously defined *wc_mapper()* and *wc_reducer()* functions to count all words in the same set of there documents than in the previous task.

In [9]:
map_reduce ( docs, wc_mapper, wc_reducer)


[('science', 2), ('data', 2), ('big', 1), ('fiction', 1)]

**Further Generalisation:** 

Above's code gives us the flexibility to solve a wide variety of problems.
Before we proceed, observe that wc_reducer is just summing the values corresponding to each key. This kind of aggregation is common enough that it’s worth abstracting it out:

In [10]:
from functools import partial

def reduce_with(aggregation_fn, key, values):
    """reduces a key-values pair by applying aggregation_fn to the values"""
    yield (key, aggregation_fn(values))

def values_reducer(aggregation_fn):
    """turns a function (values -> output) into a reducer"""
    return partial(reduce_with, aggregation_fn)

sum_reducer = values_reducer(sum)
max_reducer = values_reducer(max)
min_reducer = values_reducer(min)
count_distinct_reducer = values_reducer(lambda values: len(set(values)))

## YOUR TASK 3

Use the appropriate generalised reducer in above's *map_reduce()* function to count all words in the same set of there documents than in the previous task.

In [11]:
# your code here
map_reduce(docs, wc_mapper, sum_reducer)


[('science', 2), ('data', 2), ('big', 1), ('fiction', 1)]

# STOP PLEASE. THE FOLLOWING IS FOR THE NEXT EXERCISE. THANKS.

## EXERCISE 3: More Complex Data Analysis using MapReduce

In this next exercise, we want to further generalise the MapReduce approach and use it to analyse data which has a bit more structure than just linear text.

**Input Data Format**

Assume that we want to analyse a collection of *status updates* which get processed in some web system. Further assume that these status updates are available in JSON format such as the following example:
<pre>
{"id": 1,
 "username" : "joelgrus",
 "text" : "Is anyone interested in a data science book?",
 "created_at" : datetime.datetime(2013, 12, 21, 11, 47, 0),
 "liked_by" : ["data_guy", "data_gal", "mike"] }</pre>

** Example: Filtering with MapReduce: ** (taken from "Data Science from Scratch")

"Let’s say we need to figure out which day of the week people talk the most about data science. In order to find this, we’ll just count how many data science updates there are on each day of the week. This means we’ll need to group by the day of week, so that’s our key. And if we emit a value of 1 for each update that contains “data science,” we can simply get the total number using sum:"

In [12]:
def data_science_day_mapper(status_update):
    """yields (day_of_week, 1) if status_update contains "data science" """
    if "data science" in status_update["text"].lower():
        day_of_week = status_update["created_at"].weekday()
        yield (day_of_week, 1)

## YOUR TASK 4

Use above's *data_science_day_mapper()* function to determine the distribution of updates-per-day_of_week for the following list of status updates:

In [13]:
import datetime

status_updates = [
    {"id": 1,
 "username" : "joelgrus",
 "text" : "Is anyone interested in a data science book?",
 "created_at" : datetime.datetime(2015, 12, 21, 11, 47, 0),
 "liked_by" : ["data_guy", "data_gal", "mike"] },
    {"id": 2,
 "username" : "ben",
 "text" : "data science is fun",
 "created_at" : datetime.datetime(2015, 12, 21, 11, 47, 0),
 "liked_by" : ["mike"] },
    {"id": 3,
 "username" : "uwe",
 "text" : "databases are fun",
 "created_at" : datetime.datetime(2015, 12, 22, 10, 50, 0),
 "liked_by" : ["data_gal"] },
    {"id": 4,
 "username" : "ben",
 "text" : "spark for data science",
 "created_at" : datetime.datetime(2015, 12, 23, 11, 17, 0),
 "liked_by" : ["data_guy", "mike"] },
    {"id": 5,
 "username" : "joelgrus",
 "text" : "what is this big data thing anyway?",
 "created_at" : datetime.datetime(2015, 12, 23, 11, 33, 0),
 "liked_by" : ["data_gal", "data_guy"] },
    {"id": 6,
 "username" : "ben",
 "text" : "lots of data science, lots of fun",
 "created_at" : datetime.datetime(2015, 12, 24, 12, 17, 0),
 "liked_by" : ["mike"] }
]

In [14]:
# your code here
map_reduce(status_updates,data_science_day_mapper, sum_reducer)


[(0, 2), (2, 1), (3, 1)]

## YOUR TASK 5

Modify above's *data_science_day_mapper()* function so that it instead determines the distribution of updates-per-day_of_week for a the user 'ben':

In [15]:
# your code here
def data_science_day_mapper_for_ben(status_update):
    """yields (day_of_week, 1) if status_update contains "data science" """
    if "data science" in status_update["text"].lower() and status_update["username"].lower() == "ben":
        day_of_week = status_update["created_at"].weekday()
        yield (day_of_week, 1)
map_reduce(status_updates,data_science_day_mapper_for_ben, sum_reducer)


[(0, 1), (2, 1), (3, 1)]

## YOUR TASK 6

Write a new *liker_mapper()* function that determines the number of distinct status-likers for each user:

In [16]:
# your code here
def liker_mapper(status_update):
    user = status_update["username"]
    likers = status_update["liked_by"]
    for liker in likers:
        yield (user, liker)
    
map_reduce(status_updates, liker_mapper, count_distinct_reducer)

[('joelgrus', 3), ('ben', 2), ('uwe', 1)]

### Example: Top-K (Sub-)Counting

As a more complicated example, assume that we want to find out for each user the most common word that s/he is using in her status updates. In this case, we would like to group by *username* ('for each user') and count the individual words used per user. To do the latter, we tokenize the status update text in the mapper, and in the reducer we count the number of occurences of each word by a given user - and report just he one with the highest count:

In [17]:
def words_per_user_mapper(status_update):
    user = status_update["username"]
    for word in tokenize(status_update["text"]):
        yield (user, (word, 1))

def most_popular_word_reducer(user, words_and_counts):
    """given a sequence of (word, count) pairs,
    return the word with the highest total count"""

    word_counts = Counter()
    for word, count in words_and_counts:
        word_counts[word] += count

    word, count = word_counts.most_common(1)[0]

    yield (user, (word, count))

user_words = map_reduce(status_updates,
                        words_per_user_mapper,
                        most_popular_word_reducer)
print(user_words)

[('joelgrus', ('data', 2)), ('ben', ('science', 3)), ('uwe', ('fun', 1))]


## YOUR TASK 7

1. Adjust the above most_popular_word_reducer() function so that you only show users who have used a word more than once.
2. How do you need to change above's code to get a general 'Top-k' counter which allows to determine the *k* most common words per user?

In [18]:
# your code here
def most_popular_word_nosingles_reducer(user, words_and_counts):
    """given a sequence of (word, count) pairs,
    return the word with the highest total count"""

    word_counts = Counter()
    for word, count in words_and_counts:
        word_counts[word] += count

    word, count = word_counts.most_common(1)[0]

    if count > 1:
        yield (user, (word, count))

map_reduce(status_updates,
                        words_per_user_mapper,
                        most_popular_word_nosingles_reducer)

# your code here
def most_popular_k_words_reducer(k, user, words_and_counts):
    """given a sequence of (word, count) pairs,
    return the word with the highest total count"""
    word_counts = Counter()
    for word, count in words_and_counts:
        word_counts[word] += count

    top_k = word_counts.most_common(k)
    yield (user, top_k)

map_reduce(status_updates,
                        words_per_user_mapper,
                        partial(most_popular_k_words_reducer, 4))

[('joelgrus', [('data', 2), ('is', 2), ('interested', 1), ('anyone', 1)]),
 ('ben', [('science', 3), ('data', 3), ('fun', 2), ('is', 1)]),
 ('uwe', [('fun', 1), ('are', 1), ('databases', 1)])]