# Analyzing Wikipedia Pages: Map Reduce implemntation of "grep"
Implementing a simplified version of the grep command-line utility to search for data in 54 megabytes worth of articles

In [1]:
import os

#listdir returns name of entires in the dirctory as a list
file_names = os.listdir("wiki")
#non of files in the directory
print('Total no of files inthe "wiki" directory')
print(len(file_names))    

#printing first three results
for i in range(3):
    print(file_names[i])




Total no of files inthe "wiki" directory
999
Bay_of_ConcepciC3B3n.html
Bye_My_Boy.html
Valentin_Yanin.html


# Reading the first file and storing each line in list 

In [2]:

folder_name = "wiki"
file_name = str(file_names[0])
print(file_name)
#os.path.join joins folder name "/" file name
with open(os.path.join(folder_name, file_name)) as f:
    
    #each line in the directory is being read as list object 
    # and stored in a list
    #f.readlines() returns iterable list
    lines = [line for line in f.readlines()]

Bay_of_ConcepciC3B3n.html


# Adding  custom map reduce function which is built using multiprocessing module to the project

In [3]:
import math
#functools has reduce()- reads results from the parallel processing
import functools
# using the Pool object to create and run a group of processes
from multiprocessing import Pool


#divides data into chunks 
#prepareing data for parallel processing
def make_chunks(data, num_chunks):
    
    #math.ceil() rounds up 
    chunk_size = math.ceil(len(data) / num_chunks)
    
    #chunks_size is my step size
    #each data in list is length chunksize
    return [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]

def map_reduce(data, num_processes, mapper, reducer):
    chunks = make_chunks(data, num_processes)
    #initializing pool object with no of processors 
    pool = Pool(num_processes)
    #pool.map() executes mapper function on the each data in chunks-parallel
    chunk_results = pool.map(mapper, chunks)
    # The Pool.close() method prevents the addition of new processes to the pool
    pool.close()
    # Pool.join() method makes the main program wait for all processes to finish before continuing executing
    pool.join()
    #reduce() combines each result from parallel executions 
    #into a global result-acts two data at a time from left most
    return functools.reduce(reducer, chunk_results)

# Counting the total number of lines on all files

In [4]:
def map_line_count(file_names):
    total = 0
    for fn in file_names:
        #joining wiki/fn aand opening using context manager
        with open(os.path.join("wiki", fn)) as f:
            total += len(f.readlines())
    return total
    
def reduce_line_count(count1, count2):
    return count1 + count2

#8 processors are used here
map_reduce(file_names, 8, map_line_count, reduce_line_count)

499797

# Given a target string , find which line it occurs in each file 
The goal is to locate all lines in all files from the wiki folder that contains a given string.

The mapper function receives a chunk of filenames and calculates all occurrences of the target string on them. If a file contains no occurrences, it is chosen to not include an entry for that file in the result dictionary.

In [5]:
def map_grep(file_names):
    results = {}
    #iterable is name of the file
    for fn in file_names:
        
        # opening one file and extracting each line
        with open(fn) as f:
            lines = [each for each in f.readlines()]
        
        #enumerate allows us to keep track with an index for an iterable
        for line_index, line in enumerate(lines):
            
            #target is the key word that I am looking for in each line
            #checking if the target string is already in the dictionary
            if target in line:
                
                #if fn is not already a key in results then add empty
                if fn not in results:
                    results[fn]= []
                
                #file name is key and adding the index of the line 
                #   where tareget appeared
                # value is list that stores indices
                results[fn].append(line_index)
                
    return results
            
#reducer function to be used in map_reduce
def reduce_grep(lines1, lines2):
    #combining two dictionaries
    lines1.update(lines2)
    return lines1

#geenralising wth path argumet so it can work with custom paths
def mapreduce_grep(path, num_processes):
    # each iterable in for loop are the file name sin the path directory
    file_names = [os.path.join(path, fn) for fn in os.listdir(path)]
    return map_reduce(file_names, num_processes,  map_grep, reduce_grep)

# Finding the occurence of "data"

In [6]:

target = "data"
data_occurrences = mapreduce_grep("wiki", 8)

# Allowing for case insensitive matches
We can allow case insensitive matches by converting both the target string and the file contents to lowercase before we match.

In [7]:
def map_grep_insensitive(file_names):
    #dictionary that stores the results
    results = {}
    for fn in file_names:
        #open each file and read each line in lower case
        with open(fn) as f:
            lines = [line.lower() for line in f.readlines()]
        #enumerate helps us use index as one of iterables    
        for line_index, line in enumerate(lines):
            #target is available globally
            if target.lower() in line:
                #fn is key in the results
                #values are just indices where target appears
                if fn not in results:
                    results[fn] = []
                #index is appended to a list
                #this list contains indices where target appears
                results[fn].append(line_index)
    return results

def mapreduce_grep_insensitive(path, num_processes):
    file_names = [os.path.join(path, fn) for fn in os.listdir(path)]
    return map_reduce(file_names, num_processes,  map_grep_insensitive, reduce_grep)



In [8]:
target = "data"
new_data_occurrences = mapreduce_grep_insensitive("wiki", 8)
#print(new_data_occurrences)

# Checking if both implementations has different result


In [9]:

for fn in new_data_occurrences:
    #checking filename not in data_occurence
    if fn not in data_occurrences:
        print("Found {} new matches on file {}".format(len(new_data_occurrences[fn]), fn))
    #checking if length of list is different in each implementation
    #rem: list has indices where target string apprears
    elif len(new_data_occurrences[fn]) > len(data_occurrences[fn]):
        print("Found {} new matches on file {}".format(len(new_data_occurrences[fn]) - len(data_occurrences[fn]), fn))

Found 1 new matches on file wiki/Table_Point_Formation.html
Found 1 new matches on file wiki/Ingrid_GuimarC3A3es.html
Found 2 new matches on file wiki/Jules_Verne_ATV.html
Found 1 new matches on file wiki/Pictogram.html
Found 2 new matches on file wiki/Claire_Danes.html
Found 1 new matches on file wiki/PTPRS.html
Found 1 new matches on file wiki/A_Beautiful_Valley.html
Found 1 new matches on file wiki/Mudramothiram.html
Found 2 new matches on file wiki/Gordon_Bau.html
Found 1 new matches on file wiki/Embraer_Unidade_GaviC3A3o_Peixoto_Airport.html
Found 3 new matches on file wiki/Code_page_1023.html
Found 1 new matches on file wiki/Cryptographic_primitive.html
Found 1 new matches on file wiki/Alex_Kurtzman.html
Found 1 new matches on file wiki/Filip_Pyrochta.html
Found 1 new matches on file wiki/Morgana_King.html
Found 1 new matches on file wiki/Don_Parsons_(ice_hockey).html
Found 1 new matches on file wiki/Bias.html
Found 2 new matches on file wiki/Tomohiko_ItC58D_(director).html
Found

# Subfunction that stores the indices within a line where "target" substring appears

In [10]:
def find_match_indices(line, target):
    results = []
    # str.find(target, start index, end index)
    #-1 is returned if there is no match
    i = line.find(target, 0)
    #loops till end of the string
    while i != -1:
        results.append(i)
        #fiding if the target appears in the next index 
        i = line.find(target, i + 1)
    return results

# Test implementation
s = "Data science ,data mining,big data.".lower()
print(find_match_indices(s, "data"))

[0, 14, 30]


# Finding all match locations 

We can use the above function in combination with previously created function to store file name , the line withing the filename where "target" appears and the indices in the line where the "target" appears 

In [11]:
def map_grep_match_indices(file_names):
    #dictionary that stores the results
    results = {}
    for fn in file_names:
        #open each file and read each line in lower case
        with open(fn) as f:
            lines = [line.lower() for line in f.readlines()]
        #enumerate helps us use index as one of iterables    
        for line_index, line in enumerate(lines):
            #target is available globally
            match_indices = find_match_indices(line,target.lower())
            if target.lower() in line:
                #fn is key in the results
                #values are just indices where target appears
                if fn not in results:
                    results[fn] = []
                #index is appended to a list
                #this list contains indices where target appears
                results[fn]+= [(line_index, match_index) for match_index in match_indices]
    return results

def mapreduce_grep_match_indices(path, num_processes):
    file_names = [os.path.join(path, fn) for fn in os.listdir(path)]
    return map_reduce(file_names, num_processes,  map_grep_match_indices, reduce_grep)


# Finding "science" in given list of files

In [12]:
target = "science"
occurrences = mapreduce_grep_match_indices("wiki", 8)
#import itertools
#N=3
#out = dict(itertools.islice(occurrences.items(), N)) 
#print(str(out))

# Displaying the results
We will create a CSV file listing all occurrences, with File, line, index and context where the word"science" appears.

In [13]:
import csv

# How many character to show before and after the match
context_delta = 30

with open("results.csv", "w") as f:
    writer = csv.writer(f)
    rows = [["File", "Line", "Index", "Context"]]
    #iterating over key values in dictionary
    for fn in occurrences: 
        with open(fn) as f:
            lines = [line.strip() for line in f.readlines()]
        for line, index in occurrences[fn]:
            #index for start and end of string
            start = max(index - context_delta, 0)
            end   = index + len(target) + context_delta
            rows.append([fn, line, index, lines[line][start:end]])
        
    writer.writerows(rows)

In [14]:
import pandas
df = pandas.read_csv("results.csv")
df.head(10)


Unnamed: 0,File,Line,Index,Context
0,wiki/Valentin_Yanin.html,6,840,"embers of the USSR Academy of Sciences"",""Full ..."
1,wiki/Valentin_Yanin.html,6,890,"ers of the Russian Academy of Sciences"",""Demid..."
2,wiki/Valentin_Yanin.html,66,90,"href=""/wiki/Soviet_Academy_of_Sciences"" class=..."
3,wiki/Valentin_Yanin.html,66,145,"ect"" title=""Soviet Academy of Sciences"">Soviet..."
4,wiki/Valentin_Yanin.html,66,173,"f Sciences"">Soviet Academy of Sciences</a>; he..."
5,wiki/Valentin_Yanin.html,144,1440,"rs_of_the_USSR_Academy_of_Sciences"" title=""Cat..."
6,wiki/Valentin_Yanin.html,144,1502,"rs of the USSR Academy of Sciences"">Full Membe..."
7,wiki/Valentin_Yanin.html,144,1548,rs of the USSR Academy of Sciences</a></li><li...
8,wiki/Valentin_Yanin.html,144,1632,"of_the_Russian_Academy_of_Sciences"" title=""Cat..."
9,wiki/Valentin_Yanin.html,144,1697,"of the Russian Academy of Sciences"">Full Membe..."
