<a href="https://colab.research.google.com/github/bornsch/IBM-Course/blob/main/postblock2/q4/.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Map Reduce

This notebook is performing map reduce in a simplified manner in Python. Distribution of compute to different nodes is not done here; the purpose rather is to explore how to implement a map or reduce function, assuming that the functionality is provided akin to the libraries mentioned in [Dean and Ghemawat](https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf).


This notebook comprises a section defining identity mappers and reducers, along with a `run` method which you may change if necessary. An intermediate sort function is also provided.

Implement the `mapper` and `reducer` in the Term Vectors section, and use the run cell as provided.


In [1]:
from itertools import groupby
from operator import itemgetter
import re
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
%config Completer.use_jedi = False


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [2]:
# Empty MAPPER
def mapper(key, value):
    """
    Our user defined mapper function .
    : param key :
    : param value :
    """
    yield (key, value)

In [3]:
# Empty REDUCER

def reducer(key , list_value):
    """
    User defined reducer.
    : param key :
    : param list_value :
    """
    yield (key, list_value)

In [4]:
def cleaner(line):
    # lowercase all words and get alphabetical char only and keeping
    # apostrophe for time being
    words = re.findall(r'[a-z\']+' , line.lower())
    for word in words :
        # we will omit apostrophe's assuming users won't type them in a search
        word = word.replace("'" , '')
        if not (word is '' or word in stopwords.words('english')):
            yield word

def intermediate_sort(data):
    """
    collect by key
    """
    data = sorted ( data )
    return [(k, list(tuple(zip(*g))[1])) for k, g  in groupby(data , itemgetter(0))]

def run(sources_dict):
    """
    Since we are focusing on the mapper and reducer functions here we have to
    provide the boiler plate code that a MapReduce library typically would . This
    function does that in a simple way (we ignore distributing it for now).
    : param sources_dict : dictionary where (key,fqfilename), for example ('doc_id','/home/fileX')
    """
    map_result =[]
    reduce_result =[]
    # open the files and apply map to each of them ( could be done in parallel ,
    # but we prefer to keep it simple ) .
    for k , v in sources_dict.items():
        # do map per source
        # this could happen in its own process / worker typically
        f = open(v, 'r')
        map_result += list(mapper(k, f.read()))
        f.close()
#         ::alt
#          with open(v, 'r') as f:
#             for line in f.readlines():
#                 map_result += list(mapper(k, line))
    # this would be written to disk in the original paradigm,
    # but we keep it in memory for ease of use
    intermediate_result = intermediate_sort(map_result)
    # now that the data has been ' collected ' and grouped by key it can be handed
    # to the reducers. They would run over partitions or chunks usually , but we
    # will just iterate through the keys we have and call them
    for elem in intermediate_result:
        reduce_result.append(list(reducer(elem [0], elem [1])))
    return map_result, intermediate_result, reduce_result

  if not (word is '' or word in stopwords.words('english')):


In [5]:
# EXAMPLE
!mkdir -p input/
!echo -e 'D1 : the cat sat on the mat' > input/d1.txt
!echo -e 'D2 : the dog sat on the log' > input/d2.txt

_, _, res = run({'D1': 'input/d1.txt' , 'D2': 'input/d2.txt'})

res

[[('D1', ['D1 : the cat sat on the mat\n'])],
 [('D2', ['D2 : the dog sat on the log\n'])]]

# Term Vector

The paper states:

> Term-Vector per Host: A term vector summarizes the most important words that occur in a document or a set of documents as a list of 〈word, frequency〉 pairs. The map function emits a 〈hostname, term vector〉 pair for each input document (where the hostname is extracted from the URL of the document). The reduce function is passed all per-document term vectors for a given host. It adds these term vectors together, throwing away infrequent terms, and then emits a final〈hostname, term vector〉 pair.

As for

> throwing away infrequent terms

Write your code in such a way that only terms occurring at least twice are retained.

Hint:
  * Consider how they use the word 'frequency' elsewhere in the paper.


In [6]:
# your mapper
# the map function emits a 〈hostname, term vector〉 pair for each input
# document (where the hostname is extracted from the URL of the document)

from urllib.parse import urlparse
import re
from collections import Counter

def mapper(key, value):
    if not key.startswith(('http://', 'https://')):
        key = 'http://' + key

    url = urlparse(key)
    hostname = url.netloc

    words = re.findall(r'\b\w+\b', value.lower())
    term_vector = Counter(words)

    yield (hostname, term_vector)

# The reduce function is passed all per-document term vectors for a given host.
# It adds these term vectors together, throwing away infrequent terms, and then
# emits a final〈hostname, term vector〉 pair.

def reducer(key, list_value):
    combined_term_vector = Counter()

    for term_vector in list_value:
        combined_term_vector.update(term_vector)

    # raw count word frequency (<= 2 for demonstration)
    filtered = {term: count for term, count in combined_term_vector.items() if count >= 2}

    yield (key, filtered)

# Write example .txt files

page1_content = """
Everything about the destination makes it a paradise for party people.
The Thai people’s culture, Bangkok’s infamous party and red light scene to
hosting Asia’s largest parties.
"""

page2_content = """
The weather is hot and sticky all year round, the traffic is crazy, yet the people still come.
Bangkok now holds the title of the world’s most-visited city.
"""

with open('page1.txt', 'w') as file1:
        file1.write(page1_content)

with open('page2.txt', 'w') as file2:
        file2.write(page2_content)

In [7]:
x, y, res = run({'www.somesite.com/page/1': 'page1.txt', 'www.somesite.com/page/2': 'page2.txt'})

In [13]:
# x

[('www.somesite.com',
  Counter({'everything': 1,
           'about': 1,
           'the': 2,
           'destination': 1,
           'makes': 1,
           'it': 1,
           'a': 1,
           'paradise': 1,
           'for': 1,
           'party': 2,
           'people': 2,
           'thai': 1,
           's': 3,
           'culture': 1,
           'bangkok': 1,
           'infamous': 1,
           'and': 1,
           'red': 1,
           'light': 1,
           'scene': 1,
           'to': 1,
           'hosting': 1,
           'asia': 1,
           'largest': 1,
           'parties': 1})),
 ('www.somesite.com',
  Counter({'the': 5,
           'weather': 1,
           'is': 2,
           'hot': 1,
           'and': 1,
           'sticky': 1,
           'all': 1,
           'year': 1,
           'round': 1,
           'traffic': 1,
           'crazy': 1,
           'yet': 1,
           'people': 1,
           'still': 1,
           'come': 1,
           'bangkok': 1,
           'n

In [14]:
# y

[('www.somesite.com',
  [Counter({'everything': 1,
            'about': 1,
            'the': 2,
            'destination': 1,
            'makes': 1,
            'it': 1,
            'a': 1,
            'paradise': 1,
            'for': 1,
            'party': 2,
            'people': 2,
            'thai': 1,
            's': 3,
            'culture': 1,
            'bangkok': 1,
            'infamous': 1,
            'and': 1,
            'red': 1,
            'light': 1,
            'scene': 1,
            'to': 1,
            'hosting': 1,
            'asia': 1,
            'largest': 1,
            'parties': 1}),
   Counter({'the': 5,
            'weather': 1,
            'is': 2,
            'hot': 1,
            'and': 1,
            'sticky': 1,
            'all': 1,
            'year': 1,
            'round': 1,
            'traffic': 1,
            'crazy': 1,
            'yet': 1,
            'people': 1,
            'still': 1,
            'come': 1,
            'bangkok'

In [15]:
# res

[[('www.somesite.com',
   {'the': 7,
    'party': 2,
    'people': 3,
    's': 4,
    'bangkok': 2,
    'and': 2,
    'is': 2})]]

In [16]:
# def m(key, value):
#     if not key.startswith(('http://', 'https://')):
#         key = 'http://' + key

#     url = urlparse(key)
#     hostname = url.netloc

#     words = re.findall(r'\b\w+\b', value.lower())
#     term_vector = Counter(words)

#     return [(hostname, term_vector)]

# a = m('www.somesite.com/page/1', page1_content)
# b = m('www.somesite.com/page/2', page2_content)

# print(a)
# print(b)

[('www.somesite.com', Counter({'s': 3, 'the': 2, 'party': 2, 'people': 2, 'everything': 1, 'about': 1, 'destination': 1, 'makes': 1, 'it': 1, 'a': 1, 'paradise': 1, 'for': 1, 'thai': 1, 'culture': 1, 'bangkok': 1, 'infamous': 1, 'and': 1, 'red': 1, 'light': 1, 'scene': 1, 'to': 1, 'hosting': 1, 'asia': 1, 'largest': 1, 'parties': 1}))]
[('www.somesite.com', Counter({'the': 5, 'is': 2, 'weather': 1, 'hot': 1, 'and': 1, 'sticky': 1, 'all': 1, 'year': 1, 'round': 1, 'traffic': 1, 'crazy': 1, 'yet': 1, 'people': 1, 'still': 1, 'come': 1, 'bangkok': 1, 'now': 1, 'holds': 1, 'title': 1, 'of': 1, 'world': 1, 's': 1, 'most': 1, 'visited': 1, 'city': 1}))]


In [17]:
# def r(key, list_value):
#     combined_term_vector = Counter()

#     for term_vector in list_value:
#         combined_term_vector.update(term_vector)

#     filtered = {term: count for term, count in combined_term_vector.items() if count >= 2}

#     return key, filtered

# c = r(a[0][0], [a[0][1], b[0][1]])

# print(c)

('www.somesite.com', {'the': 7, 'party': 2, 'people': 3, 's': 4, 'bangkok': 2, 'and': 2, 'is': 2})
