# Introduction into MapReduce

(based on https://towardsdatascience.com/a-beginners-introduction-into-mapreduce-2c912bb5e6ac)

Many times, as Data Scientists, we have to deal with huge amount of data. In those cases, many approaches won’t work or won’t be feasible. A massive amount of data is good, it’s very good, and we want to utilize as much as possible.

Here I want to introduce the MapReduce technique, which is a broad technique that is used to handle a huge amount of data. There are many implementations of MapReduce, including the famous Apache Hadoop. Here, I won’t talk about implementations. I’ll try to introduce the concept in the most intuitive way and present examples for both toy and real-life examples.

Let’s start with some straightforward task. You’re given a list of strings, and you need to return the longest string. It’s pretty easy to do in python:

In [64]:
def find_longest_string(list_of_strings):
    longest_string = None
    longest_string_len = 0
    for s in list_of_strings:
        if len(s) > longest_string_len:
            longest_string_len = len(s)
            longest_string = s
    return longest_string

We go over the strings one by one, compute the length and keep the longest string until we finished.


Even for lists with much more than 3 elements it works pretty well, here we try with 3000 elements:

In [65]:
list_of_strings = ['abc', 'python', 'dima']

large_list_of_strings = list_of_strings*1000

print(find_longest_string(large_list_of_strings))

python


But what if we try for 300 million elements?

In [66]:
large_list_of_strings = list_of_strings*100000000

max_length = max(large_list_of_strings, key=len)

Our solution is not scalable. We can do “Horizontal Scaling”, we’ll design our code so it could run in parallel, and it will get much faster when we’ll add more processors and/or CPUs.

To do that, we need to break our code into smaller components and see how we can execute computations in parallel. The intuition is as follows: 1) break our data into many chunks, 2) execute the `find_longest_string` function for every chunk in parallel and 3) find the longest string among the outputs of all chunks.

Our code is very specific and it hard to break and modify, so instead of using the `find_longest_string` function, we’ll develop a more generic framework that will help us perform different computations in parallel on large data.

The two main things we do in our code is computing the `len` of the string and comparing it to the longest string until now. We’ll break our code into two steps: 1) compute the `len` of all strings and 2) select the `max` value.

In [67]:
# step 1:
list_of_string_lens = [len(s) for s in large_list_of_strings]
list_of_string_lens = zip(list_of_strings, list_of_string_lens)

#step 2:
max_len = max(list_of_string_lens, key=lambda t: t[1])
print(max_len)

('python', 6)


(I’m calculating the length of the strings and then `zip` them together because this is much faster than doing it in one line and duplicating the list of strings)

Now our “step 2” gets as input not the original list of strings, but some preprocessed data. This allows us to execute step two using the output of another “step two”s! We’ll understand that better in a bit, but first, let’s give those steps a name. We’ll call “step one” a “mapper” because it maps some value into some other value, and we’ll call “step two” a reducer because it gets a list of values and produces a single (in most cases) value. Here’re two helper functions for mapper and reducer:

In [68]:
mapper = len

def reducer(p, c):
    if p[1] > c[1]:
        return p
    return c

The mapper is just the `len` function. It gets a string and returns its length. The reducer gets two tuples as input and returns the one with the biggest length.

Let’s rewrite our code using `map` and `reduce`, there are even built-in functions for this in python.

In [69]:
from functools import reduce
#step 1
mapped = map(mapper, large_list_of_strings)
mapped = zip(large_list_of_strings, mapped)

#step 2:
reduced = reduce(reducer, mapped)
print(reduced)

('python', 6)


The code does exactly the same thing, it looks bit fancier, but also it is more generic and will help us parallelize it. Let’s look more closely at it:

Step 1 maps our list of strings into a list of tuples using the mapper function (here I use the `zip` again to avoid duplicating the strings).

Step 2 uses the reducer function, goes over the tuples from step one and applies it one by one. The result is a tuple with the maximum length.

Now let's break our input into chunks and understand how it works before we do any parallelization (we’ll use the `chunkify` that breaks a large list into chunks of equal size):

In [70]:
def chunkify(the_list, number_of_chunks):
    chunk_size = len(the_list) // number_of_chunks
   
    chunk_list = [the_list[i:i + chunk_size] for i in range(0, len(the_list), chunk_size)] 
    return chunk_list

data_chunks = chunkify(large_list_of_strings, number_of_chunks=4)

#step 1:
reduced_all = []
for chunk in data_chunks:
    mapped_chunk = map(mapper, chunk)
    mapped_chunk = zip(chunk, mapped_chunk)
    
    reduced_chunk = reduce(reducer, mapped_chunk)
    reduced_all.append(reduced_chunk)
    
#step 2:
reduced = reduce(reducer, reduced_all)
print(reduced)

('python', 6)


In step one, we go over our chunks and find the longest string in that chunk using a map and reduce. In step two, we take the output of step one, which is a list of reduced values, and perform a final reduce to get the longest string. We use `number_of_chunks=4` because this is the number of CPUs I have on my machine.

We are almost ready to run our code in parallel. The only thing that we can do better is to add the first `reduce` step into a single mapper. We do that because we want to break our code into two simple steps and as the first `reduce` works on a single chunk and we want to parallelize it as well. This is how it looks like:

In [71]:
def chunks_mapper(chunk):
    mapped_chunk = map(mapper, chunk) 
    mapped_chunk = zip(chunk, mapped_chunk)
    return reduce(reducer, mapped_chunk)

data_chunks = chunkify(large_list_of_strings, number_of_chunks=4)

#step 1:
mapped = map(chunks_mapper, data_chunks)

#step 2:
reduced = reduce(reducer, mapped)

print(reduced)

('python', 6)


Now we have a nice looking two steps code. Now we can parallelize step 1 using the `multiprocessing` module simply by using the `pool.map` function instead of the regular `map` function:

In [72]:
from multiprocessing import Pool
pool = Pool(4)

data_chunks = chunkify(large_list_of_strings, number_of_chunks=4)

#step 1:
mapped = pool.map(chunks_mapper, data_chunks)

#step 2:
reduced = reduce(reducer, mapped)
print(reduced)

('python', 6)


Our architecture is built using two functions: `map` and `reduce`. Each computation unit maps the input data and executes the initial reduce. Finally, some centralized unit executes the final reduce and returns the output. It looks like this:

![Map-Reduce structure](mapreducestructure.png)

This architecture has two important advantages:

1. It is scalable: if we have more data, the only thing we need to do is to add more processing units. No code change needed!
2. It is generic: this architecture supports a vast variety of tasks, we can replace our `map` and `reduce` function with almost anything and this way computer many different things in a scalable way.

It is important to note that in most cases, our data will be very big and static. It means the breaking into chunks every time is inefficient and actually redundant. So in most applications in real life, we’ll store our data in chunks (or shards) from the very beginning. Then, we’ll be able to do different computations using the MapReduce technique.

Now let's see a more interesting example: Word Count!

Say we have a very big set of news articles and we want to find the top 10 used words not including stop words, how would we do that? First, let's get the data:

In [73]:
from sklearn.datasets import fetch_20newsgroups
import os
data_home = os.path.join(os.getcwd(), 'scikit_learn_data')
news = fetch_20newsgroups(data_home=data_home)

For this post, I made the data 5 times larger so we could see the difference.

In [74]:
data = news.data*5

For each text in the dataset, we want to tokenize it, clean it, remove stop words and finally count the words:

In [75]:
import os
nltk_data_folder = os.path.join(os.getcwd(), 'nltk_data')
os.environ['NLTK_DATA'] = nltk_data_folder

# importing stopwors from nltk library
import nltk
nltk.download('stopwords', download_dir=nltk_data_folder)
from nltk.corpus import stopwords
ENGLISH_STOP_WORDS = set(stopwords.words("english"))

from collections import Counter
import re

def clean_word(word):
    return re.sub(r'[^\w\s]','',word).lower()

def word_not_in_stopwords(word):
    return word not in ENGLISH_STOP_WORDS and word and word.isalpha()
   
    
def find_top_words(data):
    cnt = Counter()
    for text in data:
        tokens_in_text = text.split()
        tokens_in_text = map(clean_word, tokens_in_text)
        tokens_in_text = filter(word_not_in_stopwords, tokens_in_text)
        cnt.update(tokens_in_text)
        
    return cnt.most_common(10)

[nltk_data] Downloading package stopwords to /home/emilie/Documents/00
[nltk_data]     0_academic/002_NUP/001_BigDataAnalytics/2021/000_pract
[nltk_data]     ical/python/DataAnalyticsWithPython/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


Let’s see how much time does it take without MapReduce:

In [76]:
%time find_top_words(data)

CPU times: user 20 s, sys: 3.24 ms, total: 20 s
Wall time: 20 s


[('subject', 61260),
 ('lines', 59120),
 ('organization', 55925),
 ('would', 44345),
 ('one', 43235),
 ('writes', 39180),
 ('article', 33770),
 ('people', 29160),
 ('dont', 29065),
 ('like', 28785)]

Now, let’s write our `mapper`, `reducer` and `chunk_mapper`:

In [77]:
def mapper(text):
    tokens_in_text = text.split()
    tokens_in_text = map(clean_word, tokens_in_text)
    tokens_in_text = filter(word_not_in_stopwords, tokens_in_text)
    return Counter(tokens_in_text)

def reducer(cnt1, cnt2):
    cnt1.update(cnt2)
    return cnt1

def chunk_mapper(chunk):
    mapped = map(mapper, chunk)
    reduced = reduce(reducer, mapped)
    return reduced

The `mapper` gets a text, splits it into tokens, cleans them and filters stop words and non-words, finally, it counts the words within this single text document. The `reducer` function gets 2 counters and merges them. The `chunk_mapper` gets a chunk and does a MapReduce on it. Now let’s run using the framework we built it and see:

In [78]:
%%time

pool = Pool(4)

data_chunks = chunkify(data, number_of_chunks=4)

#step 1:
mapped = pool.map(chunk_mapper, data_chunks)

#step 2:
reduced = reduce(reducer, mapped)

print(reduced.most_common(10))

[('subject', 61260), ('lines', 59120), ('organization', 55925), ('would', 44345), ('one', 43235), ('writes', 39180), ('article', 33770), ('people', 29160), ('dont', 29065), ('like', 28785)]
CPU times: user 495 ms, sys: 483 ms, total: 978 ms
Wall time: 9.09 s


This is much faster! Here, we were able to really utilize our computational power because the task is much more complex and requires more.

To sum up, MapReduce is an exciting and essential technique for large data processing. It can handle a tremendous number of tasks including Counts, Search, Supervised and Unsupervised learning and more. Today there’s a lot of implementations and tools that can make our lives much more comfortable, but I think it is very important to understand the basics.