# MapReduce Assignment

### Importing required Libraries

In [17]:
from collections import defaultdict, Counter
import pandas as pd
import re,datetime
from functools import partial

### Part 1: Completing the In class Notebook

MapReduce is a programming model for performing parallel processing on Big Data. It is powerful, yet relatively simple.

There are two basic steps:
1. _Mapper_ - Turn each item in zero or more key-value pairs.
2. _Reducer_ - Produce output values by grouping together values from each corresponding key.

In [37]:
def tokenize(message):
    message = message.lower()                       # convert to lowercase
    all_words = re.findall("[a-z0-9']+", message)   # extract the words
    return (all_words)                           # remove duplicates

# Old way of counting words
def word_count_old(documents):
    """word count not using MapReudce"""
    return Counter(word
                  for document in documents
                  for word in tokenize(document))

documents = ["data data data science","big data","data problems"]
result = word_count_old(documents)
print(result)

Counter({'data': 5, 'science': 1, 'big': 1, 'problems': 1})


In [38]:
# The mapper functions maps the task
def wc_mapper(document):
    """for each word in document, emit (word,1)"""
    for word in tokenize(document):
        yield (word,1)
        
# The reducer function collects the results
def wc_reducer(word, counts):
    """sum up the counts for a word"""
    yield (word, sum(counts))
    
def word_count(documents):
    """count the words in the input documents using MapReduce"""
    
    # place to store grouped values
    collector = defaultdict(list)
    
    for document in documents:
        for word, count in wc_mapper(document):
            collector[word].append(count)
            
    # add a statement to print the collector here
    print(collector)        
    return [output
            # replace items() with iteritems() if you get an error
           for word, counts in collector.items()
           for output in wc_reducer(word,counts)]

Create a list of documents where there is some overlap in the words in each document (don't use more than a total of about 5-6 words). Use your word_count function on this list.

e.g., ["data science", "big data", "science fiction", "data mining"]

Add a print statment to the function so you can see the values in collector after the mapper function has run.

What if a document has more than one occurence of a word? e.g., "data data science" Can you alter the tokenize function to fix this problem?

In [39]:
# make a list of documents here
documents = ["data science", "big data", "science fiction", "data mining"]
word_count(documents)

defaultdict(<class 'list'>, {'data': [1, 1, 1], 'science': [1, 1], 'big': [1], 'fiction': [1], 'mining': [1]})


[('data', 3), ('science', 2), ('big', 1), ('fiction', 1), ('mining', 1)]

In [40]:
def map_reduce(inputs, mapper, reducer):
    """runs MapReduce on input using functions mapper and reducer"""
    collector = defaultdict(list)
    
    # write a for loop over the inputs that calls mapper
    collector = defaultdict(list)
    
    for input in inputs:
        for key, value in mapper(input):
            collector[key].append(value)
            
    # add a statement to print the collector here
    #print(collector)        
    
    # write a return statement that calls the reducer
    return [output
            # replace items() with iteritems() if you get an error
           for input, values in collector.items()
           for output in reducer(input,values)]
word_counts = map_reduce(documents, wc_mapper, wc_reducer)
print(word_counts)

[('data', 3), ('science', 2), ('big', 1), ('fiction', 1), ('mining', 1)]


In [41]:
def reduce_values_using(aggregation_fn, key, values):
    """reduces a key-values pair by applying aggregation_fn"""
    yield (key, aggregation_fn(values))
    
def values_reducer(aggregation_fn):
    """turns a functions (values->output) into a reducer that
    maps (key, values)->(key, output)"""
    return partial(reduce_values_using, aggregation_fn)

sum_reducer = values_reducer(sum)
max_reducer = values_reducer(max)
min_reducer = values_reducer(min)
count_distinct_reducer = values_reducer(lambda values: len(set(values)))

In [25]:
word_counts = map_reduce(documents, wc_mapper, sum_reducer)
print(word_counts)

[('data', 3), ('science', 2), ('big', 1), ('fiction', 1), ('mining', 1)]


In [26]:
word_counts = map_reduce(documents, wc_mapper, max_reducer)
print(word_counts)

[('data', 1), ('science', 1), ('big', 1), ('fiction', 1), ('mining', 1)]


In [27]:
word_counts = map_reduce(documents, wc_mapper, min_reducer)
print(word_counts)

[('data', 1), ('science', 1), ('big', 1), ('fiction', 1), ('mining', 1)]


In [28]:
word_counts = map_reduce(documents, wc_mapper, count_distinct_reducer)
print(word_counts)

[('data', 1), ('science', 1), ('big', 1), ('fiction', 1), ('mining', 1)]


In [29]:
status_updates = [
    {"id": 1, 
     "username" : "joelgrus", 
     "text" : "Is anyone interested in a data science book?",
     "created_at" : datetime.datetime(2013, 12, 21, 11, 47, 0),
     "liked_by" : ["data_guy", "data_gal", "bill"] },
    # add your own
    {"id": 2, 
     "username" : "dnisarg13", 
     "text" : "I love nirva & data science",
     "created_at" : datetime.datetime(2013, 4, 21, 11, 47, 0),
     "liked_by" : ["krupa", "data_gal", "jahanvi"] },
    {"id": 3, 
     "username" : "dnisarg13", 
     "text" : "I love nirva & data science",
     "created_at" : datetime.datetime(2013, 4, 21, 11, 47, 0),
     "liked_by" : ["krupa", "data_gal", "jahanvi"] },
    {"id": 4, 
     "username" : "nirvavyas", 
     "text" : "i love nisarg",
     "created_at" : datetime.datetime(2013, 6, 21, 11, 47, 0),
     "liked_by" : ["data_guy", "nisarg", "nirva"] }
]

In [178]:
def data_science_day_mapper(status_update):
    """yields (day_of_week, 1) if status_update contains "data science" """
    if "data science" in status_update["text"].lower():
        day_of_week = status_update["created_at"].weekday()
        yield (day_of_week, 1)
        
data_science_days = map_reduce(status_updates, 
                               data_science_day_mapper, 
                               sum_reducer)
print(data_science_days)

[(5, 1), (6, 2)]


Let's imagine another task. Let's say we want to profile each user by the most common word they put their status update. There are really three possible approaches. Which is right?
1. key is username, values are words and counts
2. key is word, values are usernames and counts
3. key is username and word, values are counts

Let's define a mapper and reducer for this task.

In [42]:
def words_per_user_mapper(status_update):
    user=status_update["username"]
    for word in tokenize(status_update["text"]):
        yield (user,(word,1))
            
def most_popular_word_reducer(user, words_and_counts):
    """given a sequence of (word, count) pairs, 
    return the word with the highest total count"""
    word_counts = Counter()
    for word, count in words_and_counts:
        word_counts[word] += count
    
    # find most common word and retun that (key,value) pair
    word,count = word_counts.most_common(1)[0]
    
    yield (user,(word,count))
    
user_words = map_reduce(status_updates,
                        words_per_user_mapper, 
                        most_popular_word_reducer)
print(user_words)

[('joelgrus', ('is', 1)), ('dnisarg13', ('i', 2)), ('nirvavyas', ('i', 1))]


In [180]:
def liker_mapper(status_update):
    """return (user,liker) pairs"""
    user = status_update["username"]
    
    for liker in status_update["liked_by"]:
        yield (user,liker)
distinct_likers_per_user = map_reduce(status_updates,
                                     liker_mapper,
                                     count_distinct_reducer)
print(distinct_likers_per_user)

[('joelgrus', 3), ('dnisarg13', 3), ('nirvavyas', 3)]


Let's end this lesson by defining a mapper and reducer for matrix multiplication. Let's assume an $m\times n$ matrix $A$, and an $n\times k$ matrix $B$.

$C_{ij} = \sum_{l=1}^n A_{il}B_{lj}$

Assume the matrices
A = [[3, 2, 0],
    [0, 0, 0]]
B = [[4, -1, 0],
    [10, 0, 0],
    [0, 0, 0]]
are stored in a common list organized as so:

entries = [("A",0,0,3), ("A",0,1,2),
            ("B",0,0,4), ("B",0,1,-1), ("B",1,0,10)]

Our mapper will return the key-value pair ((row,col) of $C$, (col of $A$, value of $A$)) for elements of $A$ and ((row,col) of $C$, (row of $B$, value of $B$)) for elements of $B$.

In [183]:
def matrix_multiply_mapper(n, element):
    """n is the common dimension (columns of A, rows of B)
    element is a tuple (matrix_name, i, j, value)"""
    matrix, i, j, value = element

    # if matrix is A then output the key-value pairs ((i,column), (j,value)) over all columns of C
    if matrix == 'A':
        for column in range(n):
            yield ((i,column),(j,value))
    else:
        for row in range(n):
            yield ((row,j),(i,value))
            
    
    # else if matrix is B then output the key-value pairs ((row, j), (i, value)) over all rows of C
     
def matrix_multiply_reducer(n, key, indexed_values):
    results_by_index = defaultdict(list)
    
    # this reducer works the same as the word count reducer,
    # collecting all the pairs of A and B for each element of C
    for index,value in indexed_values:
        results_by_index[index].append(value)
        
    # sum up all the products of the positions with two (non-zero) results
    sum_product = sum(results[0]*results[1]
                      for results in results_by_index.values()
                      if len(results) == 2)
    # finally if the terms are != 0 then yield (key, value), where value is the result of the sum-product
    if sum_product != 0:
        yield (key,sum_product)

In [184]:
entries = [("A", 0, 0, 3), ("A", 0, 1,  2),
           ("B", 0, 0, 4), ("B", 0, 1, -1), ("B", 1, 0, 10)]
mapper = partial(matrix_multiply_mapper, 3) # what does partial do here?
reducer = partial(matrix_multiply_reducer, 3)
map_reduce(entries,mapper,reducer) # [((0, 0), 32), ((0, 1), -3)]

[((0, 0), 32), ((0, 1), -3)]

## My Detailed Solution for Mapper of Mappers in Class Notebook
### Part 1. Design the Mappers of Mapper
#### Designing the mapper of mapper. I've designed the mapper inside mapper. The inherent method of calling mappers. Mapping can be parallelized so I did design the mapper which can be called and executed within the different parent mapper.

#### Input the File names from User to Map & Reduce

In [113]:
#importing names of files
filenames = ["data/genesis.txt",
            "data/Luke.txt",
            "data/Kings.txt"]

### Designning the Sub Mapper which will get call on the main mappers.
#### I've designed the mappers of mappers below which uses the following mappers

#### I've designed the custom mappers who maps locally withing one main mapper for each files .
- Mapping files locally
- One Main function to map everything
- One only Fn calling require for n number of files
- Multitasker mapper function for each and every work.

In [30]:
#creating lists

mapper_list1 = []
mapper_list2 = []
mapper_list3 = []

#creating sub mapper 1
def sub_mapper1(file):    
        with open(f1,'r') as file:
            for line in file:
                for word in line.strip().split():
                    mapper_list1.append((word,1))
            return mapper_list1

#creating sub mapper 2
def sub_mapper2(file):                
        with open(f2,'r') as file:
            for line in file:
                for word in line.strip().split():
                    mapper_list2.append((word,1))
            return mapper_list2

#creating sub mapper 3
def sub_mapper3(file):                
        with open(f3,'r') as file:
            for line in file:
                for word in line.strip().split():
                    mapper_list3.append((word,1))
            return mapper_list3

#### Internal behavior of main mapper below 
#### Submappers Calling Internally in the main mapper

- f1,f2,f3 = filenames
- sub_mapper1(f1)
- sub_mapper2(f2)
- sub_mapper3(f3)

### Mapper of Mapper
#### Main Mapper function within the mapper. Which uses sub mappers to map different files.

In [159]:
#desgning the main mapper
def mapper0f_mapper(filenames):
    f1,f2,f3 = filenames
    return sub_mapper1(f1),sub_mapper2(f2),sub_mapper3(f3)

### Reducer for Mappers of Mappers'
#### Unified reducer to reduce to the N most occuring words in the files
- Reducing output of mappers for Sum_Reducer
- One fn for every sum_reducer
- Calling within simplified MapReduce Procedure
- More procedural dependence on given files within simplified single main MapReduce procedure.

In [145]:
#implement the most popular word reducer
def most_popular_word_reducer(mapper_list,n):
    """given a sequence of (word, count) pairs, 
    return the word with the highest total count"""
    c = []
    word_counts = Counter()
    for word, count in mapper_list:
        word_counts[word] += count
    
    # find most common word and retun that (key,value) pair
    word,count = word_counts.most_common(n)[n-1]
    
    c.append((word,count))
    return c

### Top function for getting top buzzwords from the list of documents
- Creates Customization for getting desired top N words
- Calls within single reducer for n number of files
- Optimized for as Time & Space complexity because of task handling methods.

In [32]:
#creating list to pop n most occuring word elements
pop = []

def top(mapper_list,n):
    for i in range(1,n+1):
        mp = most_popular_word_reducer(mapper_list,i)
        #list append
        pop.append(mp)
    
    print("The top %d words in the document are:" %n)
    
    #generating dataframe for better presentation
    df = pd.DataFrame(pop,columns=['(Word,Frquency)'])
    return df

### Part 2: Design Reducer which returns the top n words from list of document files
### Document one MapReduce Results
#### Displaying top 10 words occuring in the first document.

In [140]:
#calling top function to MapReduce the given documents.
top(mapper_list1,10)

The top 10 words in the document are:


Unnamed: 0,"(Word,Frquency)"
0,"(the, 2401)"
1,"(and, 2354)"
2,"(of, 1339)"
3,"(And, 1239)"
4,"(his, 637)"
5,"(he, 634)"
6,"(to, 603)"
7,"(unto, 588)"
8,"(in, 574)"
9,"(that, 470)"


#### Displaying top 10 words occuring in the second document.

In [144]:
top(mapper_list2,10)

The top 10 words in the document are:


Unnamed: 0,"(Word,Frquency)"
0,"(the, 1307)"
1,"(and, 1185)"
2,"(of, 777)"
3,"(And, 694)"
4,"(he, 625)"
5,"(to, 529)"
6,"(that, 443)"
7,"(unto, 391)"
8,"(in, 342)"
9,"(they, 330)"


#### Displaying top 10 words occuring in the third document.

In [147]:
top(mapper_list3,10)

The top 10 words in the document are:


Unnamed: 0,"(Word,Frquency)"
0,"(the, 2115)"
1,"(and, 1247)"
2,"(of, 1150)"
3,"(And, 588)"
4,"(to, 460)"
5,"(he, 442)"
6,"(in, 437)"
7,"(that, 323)"
8,"(his, 311)"
9,"(unto, 279)"


### Some Tweaks in the same procedure
### Automation in given calling procedure for ease of usage
- Processes Every Documents in the list
- Outputs the dataframes which contains top N buzzwords

In [51]:
doclists = [mapper_list1,mapper_list2,mapper_list3]
def auto_top(doclists):
    for doclist in doclists:
        return top(doclist,10)

### Part 3: Write a mapper and reducer to process all the files available at https://www.ssa.gov/oact/babynames/names.zip

#### Getting files in list

In [1]:
#getting filenames into list
#babynamefiles = ["names/yob1880.txt","names/yob1881.txt"]
babynamefiles = ["names/yob1880.txt","names/yob1881.txt","names/yob1882.txt","names/yob1883.txt","names/yob1884.txt","names/yob1885.txt","names/yob1886.txt","names/yob1887.txt","names/yob1888.txt","names/yob1889.txt","names/yob1890.txt","names/yob1891.txt","names/yob1892.txt","names/yob1893.txt","names/yob1894.txt","names/yob1895.txt","names/yob1896.txt","names/yob1897.txt","names/yob1898.txt","names/yob1899.txt","names/yob1900.txt","names/yob1901.txt","names/yob1902.txt","names/yob1903.txt","names/yob1904.txt","names/yob1905.txt","names/yob1906.txt","names/yob1907.txt","names/yob1908.txt","names/yob1909.txt","names/yob1910.txt","names/yob1911.txt","names/yob1912.txt","names/yob1913.txt","names/yob1914.txt","names/yob1915.txt","names/yob1916.txt","names/yob1917.txt","names/yob1918.txt","names/yob1919.txt","names/yob1920.txt","names/yob1921.txt","names/yob1922.txt","names/yob1923.txt","names/yob1924.txt","names/yob1925.txt","names/yob1926.txt","names/yob1927.txt","names/yob1928.txt","names/yob1929.txt","names/yob1930.txt","names/yob1931.txt","names/yob1932.txt","names/yob1933.txt","names/yob1934.txt","names/yob1935.txt","names/yob1936.txt","names/yob1937.txt","names/yob1938.txt","names/yob1939.txt","names/yob1940.txt","names/yob1941.txt","names/yob1942.txt","names/yob1943.txt","names/yob1944.txt","names/yob1945.txt","names/yob1946.txt","names/yob1947.txt","names/yob1948.txt","names/yob1949.txt","names/yob1950.txt","names/yob1951.txt","names/yob1952.txt","names/yob1953.txt","names/yob1954.txt","names/yob1955.txt","names/yob1956.txt","names/yob1957.txt","names/yob1958.txt","names/yob1959.txt","names/yob1960.txt","names/yob1961.txt","names/yob1962.txt","names/yob1963.txt","names/yob1964.txt","names/yob1965.txt","names/yob1966.txt","names/yob1967.txt","names/yob1968.txt","names/yob1969.txt","names/yob1970.txt","names/yob1971.txt","names/yob1972.txt","names/yob1973.txt","names/yob1974.txt","names/yob1975.txt","names/yob1976.txt","names/yob1977.txt","names/yob1978.txt","names/yob1979.txt","names/yob1980.txt","names/yob1981.txt","names/yob1982.txt","names/yob1983.txt","names/yob1984.txt","names/yob1985.txt","names/yob1986.txt","names/yob1987.txt","names/yob1988.txt","names/yob1989.txt","names/yob1990.txt","names/yob1991.txt","names/yob1992.txt","names/yob1993.txt","names/yob1994.txt","names/yob1995.txt","names/yob1996.txt","names/yob1997.txt","names/yob1998.txt","names/yob1999.txt","names/yob2000.txt","names/yob2001.txt","names/yob2002.txt","names/yob2003.txt","names/yob2004.txt","names/yob2005.txt","names/yob2006.txt","names/yob2007.txt","names/yob2008.txt","names/yob2009.txt","names/yob2010.txt","names/yob2011.txt","names/yob2012.txt","names/yob2013.txt","names/yob2014.txt","names/yob2015.txt","names/yob2016.txt",]

### Mapper of Files
#### I'm designing the mapper for all files which maps data from every file into (NAME,FREQUENCY) pair.
- Input the list of various files
- Gives output bu mapping those file parallelly in different namelists
- Input mapperlist to reducers

In [2]:
#create a empty list
namelist = []

#mapper fn
def mapper(namefiles):
    for namefile in namefiles:
        #for all files, open each file
        with open(namefile,'r') as file:
            for line in file:
                for word in line.strip().split(','):
                    #append word into list
                    namelist.append((word))
    #return final list
    return namelist

### The List values removal function which is supporting function of reducer

In [3]:
#for removing values
def remove_values_from_list(the_list, val):
        while val in the_list:
            the_list.remove(val)

### The Reducer for finding & grouping the name data with correspoding frequency of occurence
#### The Reducer returns,
####     -  Baby names starting from perticular letter
####     -  Occurences of those names
####     -  Top N values starting with perticular character.

In [4]:
#reducer fn
def reducer(namelist,startingwith,topnwords):
    #organizing values
    remove_values_from_list(namelist,"F" and "M")
    
    #extracting values
    list_names = namelist[0::2]
    list_count = namelist[1::2]
    
    #creating dataframe for clean representation
    babyname_list = pd.DataFrame(
    {'Names': list_names,
     'Names total Count': list_count
    })
    
    #extracting required rows
    babyname_list['Starting with?'] = babyname_list['Names'].str.extract('(^%s*)'%startingwith, expand=False).str.strip()
    babyname_list.sort_values(by='Names total Count', ascending=0)
    result = babyname_list.loc[babyname_list['Starting with?'] == "M"]
    
    return result.head(topnwords)

### MapReduce Routine which incorporates my custom designed Mapper + Reducer
- Unified modelling of mapper+reducer in singular function
- Contains Mappers & Reducer functionality for list of files

In [5]:
def MapReduce(namefiles,namelist,startingwith,topnwords):
    #Calling Mapper
    mapper(namefiles)
    #returning result from reducer
    return reducer(namelist,startingwith,topnwords)

### Calling MapReduce Routine for Final Results

In [8]:
# passing filename with starting letter that we wanna extract & no of top results.
MapReduce(babynamefiles,namelist,"M",10)

Unnamed: 0,Names,Names total Count,Starting with?
0,Mary,F,M
6,Minnie,F,M
30,Maude,F,M
42,Mattie,F,M
81,Marie,F,M
84,May,F,M
105,Mae,F,M
120,Mollie,F,M
150,Matilda,F,M
180,Mildred,F,M


### Part 4: Writing Mapper and Reducer that returns the top 'n' names from a list of files that contain (anywhere within the name) a given string.

#### Getting filenames into list

In [9]:
#babynamefiles = ["names/yob1880.txt","names/yob1881.txt"]
babynamefiles = ["names/yob1880.txt","names/yob1881.txt","names/yob1882.txt","names/yob1883.txt","names/yob1884.txt","names/yob1885.txt","names/yob1886.txt","names/yob1887.txt","names/yob1888.txt","names/yob1889.txt","names/yob1890.txt","names/yob1891.txt","names/yob1892.txt","names/yob1893.txt","names/yob1894.txt","names/yob1895.txt","names/yob1896.txt","names/yob1897.txt","names/yob1898.txt","names/yob1899.txt","names/yob1900.txt","names/yob1901.txt","names/yob1902.txt","names/yob1903.txt","names/yob1904.txt","names/yob1905.txt","names/yob1906.txt","names/yob1907.txt","names/yob1908.txt","names/yob1909.txt","names/yob1910.txt","names/yob1911.txt","names/yob1912.txt","names/yob1913.txt","names/yob1914.txt","names/yob1915.txt","names/yob1916.txt","names/yob1917.txt","names/yob1918.txt","names/yob1919.txt","names/yob1920.txt","names/yob1921.txt","names/yob1922.txt","names/yob1923.txt","names/yob1924.txt","names/yob1925.txt","names/yob1926.txt","names/yob1927.txt","names/yob1928.txt","names/yob1929.txt","names/yob1930.txt","names/yob1931.txt","names/yob1932.txt","names/yob1933.txt","names/yob1934.txt","names/yob1935.txt","names/yob1936.txt","names/yob1937.txt","names/yob1938.txt","names/yob1939.txt","names/yob1940.txt","names/yob1941.txt","names/yob1942.txt","names/yob1943.txt","names/yob1944.txt","names/yob1945.txt","names/yob1946.txt","names/yob1947.txt","names/yob1948.txt","names/yob1949.txt","names/yob1950.txt","names/yob1951.txt","names/yob1952.txt","names/yob1953.txt","names/yob1954.txt","names/yob1955.txt","names/yob1956.txt","names/yob1957.txt","names/yob1958.txt","names/yob1959.txt","names/yob1960.txt","names/yob1961.txt","names/yob1962.txt","names/yob1963.txt","names/yob1964.txt","names/yob1965.txt","names/yob1966.txt","names/yob1967.txt","names/yob1968.txt","names/yob1969.txt","names/yob1970.txt","names/yob1971.txt","names/yob1972.txt","names/yob1973.txt","names/yob1974.txt","names/yob1975.txt","names/yob1976.txt","names/yob1977.txt","names/yob1978.txt","names/yob1979.txt","names/yob1980.txt","names/yob1981.txt","names/yob1982.txt","names/yob1983.txt","names/yob1984.txt","names/yob1985.txt","names/yob1986.txt","names/yob1987.txt","names/yob1988.txt","names/yob1989.txt","names/yob1990.txt","names/yob1991.txt","names/yob1992.txt","names/yob1993.txt","names/yob1994.txt","names/yob1995.txt","names/yob1996.txt","names/yob1997.txt","names/yob1998.txt","names/yob1999.txt","names/yob2000.txt","names/yob2001.txt","names/yob2002.txt","names/yob2003.txt","names/yob2004.txt","names/yob2005.txt","names/yob2006.txt","names/yob2007.txt","names/yob2008.txt","names/yob2009.txt","names/yob2010.txt","names/yob2011.txt","names/yob2012.txt","names/yob2013.txt","names/yob2014.txt","names/yob2015.txt","names/yob2016.txt",]

### Designing Mapper
#### I'm designing the mapper for all files which maps data from every file into (NAME,FREQUENCY) pair.
- Input the list of various files
- Gives output bu mapping those file parallelly in different namelists
- Input mapperlist to reducers

In [10]:
namelist = []
def mapper(namefiles):
    for namefile in namefiles:
        #opening file
        with open(namefile,'r') as file:
            for line in file:
                for word in line.strip().split(','):
                    #appending pair into list
                    namelist.append((word))
    return namelist

### The Reducer for finding & grouping the name data with correspoding frequency of occurence
#### The Reducer returns,
####     -  Baby names which contains letters from perticular word record
####     -  Occurences of those names
####     -  Top N values which contains those perticular characters.

In [11]:
def reducer(namelist,contains,topnwords):
    remove_values_from_list(namelist,"F" and "M")
    
    #organizing list
    list_names = namelist[0::2]
    list_count = namelist[1::2]
    
    #creating dataframe for better representation
    babyname_list = pd.DataFrame(
    {'Names': list_names,
     'Names total Count': list_count
    })
    
    #extracting required rows from datafame
    babyname_list['Contains?'] = babyname_list['Names'].str.extract('(%s)'%contains, expand=False).str.strip()
    babyname_list.sort_values(by='Names total Count', ascending=0)
    result = babyname_list.loc[babyname_list['Contains?'] == "ar"]
    
    #return the results list
    return result.head(topnwords)

### MapReduce Routine which incorporates my custom designed Mapper + Reducer

In [12]:
def MapReduce(namefiles,namelist,contains,topnwords):
    mapper(namefiles)
    return reducer(namelist,contains,topnwords)

### Calling MapReduce Routine for Final Results

In [13]:
#fn call for final output
MapReduce(babynamefiles,namelist,"ar",10)

Unnamed: 0,Names,Names total Count,Contains?
0,Mary,F,ar
69,Pearl,F,ar
81,Marie,F,ar
108,Harriet,F,ar
111,Caroline,F,ar
135,Charlotte,F,ar
195,Maria,F,ar
204,Marion,F,ar
243,Carolyn,F,ar
306,Marian,F,ar
