This is a __data engineering__ exercise where we implement a simplified version of the grep command-line utility to search for data in 54 megabytes worth of articles from *wikipedia*.

For this project we create a **map_reduce** function that uses the *multiprocessing* and *functools* library. MapReduce reduces the time required to complete a process through multiprocessing.

The main parameters of the map_reduce function are the mapper and the reducer functions. There are a few versions of the mapper and reducer functions, each version serving a different purpose. For instance there is a mapper function for the case-sensitive search and another for a non case-sensitive search.

The final output of this project is a dataframe containing the filename, line number, index and an excerpt containing the search string. This can be exported as csv file for use in other projects.

<img src="dataframe-wikipedia-pages.PNG" height=500px width=500px >

<span style="display: block; text-align: center; font-size: 10px;">
      Figure 1. A sample output for this project showing the result for searching the word "data" in the Wikipedia HTML files.
</span>


Load the libraries

In [1]:
import os
import numpy as np
import pandas as pd
import math
import functools
from multiprocessing import Pool

Explore the wiki directory

In [2]:
# Check how many files
file_names = os.listdir("wiki")
print(f'Number of files: {len(file_names)}')

# Contents of a sample file
folder_name = "wiki"
file_name = file_names[0]
with open(os.path.join(folder_name,file_name)) as f:
    lines = [line for line in f.readlines()]

# uncomment the next line to print a sample content
# print(f"Sample file content: {lines}")

Number of files: 999


Create the map_reduce function

In [3]:
def make_chunks(data, num_chunks):
    chunk_size = math.ceil(len(data)/num_chunks)
    return [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]

def map_reduce(data, num_processes, mapper, reducer):
    chunks = make_chunks(data, num_processes)
    pool = Pool(num_processes)
    chunk_results = pool.map(mapper, chunks)
    
    return functools.reduce(reducer, chunk_results)

Count the total number of lines of in all files

In [4]:
def mapper_count(filenames_chunk):
    line_count = 0
    for filename in filenames_chunk:
        with open(os.path.join(folder_name, filename)) as f:
            line_count += len(f.readlines())
    return line_count

def reducer_count(count1, count2):
    return (count1 + count2)

In [5]:
total = map_reduce(file_names, 4, mapper_count, reducer_count)
total

499797

Find the occurrences of the word "data" in all the files

In [6]:
search_string = "data"

def mapper_search_case_insensitive(filenames_chunk):
    occurrences = {}
    for filename in filenames_chunk:
        with open(os.path.join(folder_name, filename)) as f:
            indexes = [index for index,line in enumerate(f.readlines()) if search_string.lower() in line.lower()]
        
        occurrences[filename] = indexes
    return occurrences

def mapper_search(filenames_chunk):
    occurrences = {}
    for filename in filenames_chunk:
        with open(os.path.join(folder_name, filename)) as f:
            indexes = [index for index,line in enumerate(f.readlines()) if search_string in line]
        
        occurrences[filename] = indexes
    return occurrences
            
def reducer_search(occurrences1, occurrences2):
    merged = {}
    merged.update(occurrences1)
    merged.update(occurrences2)
    return merged    



def mapper_search_case_insensitive_pair(filenames_chunk):
    """
    Parameters:
        filenames_chunk - list of filenames
    Outputs:
        Returns a dictionary with the filenames as the keys and the list of index numbers where the search string value is matched.
    """
    occurrences = {}
    for filename in filenames_chunk:
        with open(os.path.join(folder_name, filename)) as f:
            indexes = [(idx) for index,line in enumerate(f.readlines()) for idx in get_indices(line,search_string,index) if search_string.lower() in line.lower()]
            
        occurrences[filename] = indexes
    return occurrences       

# HELPER FUNCTION
def get_indices(string_line, search_string, line_number):
    """
    Helper function.
    Finds the index number that matches the search_string
    Parameters:
        string_line - the line of string to be searched on.
        search_string - the string that willbe searched for in the string_line
        line_number - the line number from the contents of the file
    """
    start_index=0
    indices = []
    while start_index < len(string_line):
        index_num = string_line.lower().find(search_string.lower(), start_index)
        if index_num < 0:
            break
        start_index = index_num + 1
        indices.append((line_number, index_num))
    return indices            
                       

In [7]:
# Use the mapper_search and reducer_search to find the indices of the search_string "data".

results = map_reduce(file_names, 4, mapper_search, reducer_search)

# Show a sample output of the results
print(f"{list(results.keys())[0]} : {results['Bay_of_ConcepciC3B3n.html']}")


Bay_of_ConcepciC3B3n.html : [6, 45, 58, 60, 62, 105, 188, 205]


In [8]:
# Use the mapper_search_case_insensitive and reducer_search to find the line number and index numbers 
# of search_string "data"

results_case_insensitive = map_reduce(file_names, 4, mapper_search_case_insensitive, reducer_search)


Display the filenames and the new indexes that got added in the case insensitive search

In [9]:
# Uncomment the following lines of code to print the data on the additional indices matched when using case insensitive search
# for filename in results:
#     if len(results[filename]) < len(results_case_insensitive[filename]):
#         print(filename," : ", [index_num for index_num in results_case_insensitive[filename] if index_num not in results[filename]])
   

In [10]:
def mapper_search_case_ins_list2(filenames_chunk):
    """
    Mapper function.
    Takes the line_number and index pair from results_case_insensitive_pair results.
    Creates a variable named context that is an excerpt from the line in file that contained the search_string.
    
    Outputs:
        Outputs an array of arrays containing the filename, line_number, index and context.
    """
    results_array = []
    for filename in filenames_chunk:
        for line_number,index in results_case_insensitive_pair[filename]:
            with open(os.path.join(folder_name, filename)) as f:
                line = f.readlines()[line_number]
            start_idx = index - 20
            end_idx = index + 10
            if start_idx < 0:
                start_idx = 0
            if len(line) < end_idx:
                end_idx = len(line)
            context = line[start_idx: end_idx]
            results_array.append([filename, line_number, index, context])
            
    return results_array
  
    
def reducer_search_case_ins_list3(occurrences1, occurrences2):
    """
    Reducer function.
    Parameters:
        occurrences1 - a list containing rows of filenames, line_number, index, context
        occurrences2 - a list containing rows of filenames, line_number, index, context
    Output:
        Returns the concatenated numpy arrays created from the lists occurrences1, occurrences2
        
    """
    arr = np.array(occurrences1)
    arr2 = np.array(occurrences2)
    
    if len(arr.shape) == len(arr.shape):
        combined = np.concatenate([arr,arr2])
        print(".")
        return combined
    if len(arr2.shape) < 2 and len(arr.shape) == 2:
        
        combined = np.concatenate([arr,arr2.reshape(-1,1)], axis=0)
        return combined
    
    if len(arr.shape) < 2 and len(arr2.shape) == 2:
        combined = np.concatenate([arr.reshape(-1,1), arr2], axis=0)
        return combined

    
def mapper_search_case_ins_list(filenames_chunk):
    """
    Mapper function.
    Parameters:
        filenames_chunk - list of filenames from the wiki folder
        
    Outputs:
        Returns the lists of filenames, line number, index, context
    """
    occurrences = []
    for filename in filenames_chunk:
        with open(os.path.join(folder_name, filename)) as f:
            rows_list = [rows for index,line in enumerate(f.readlines()) for rows in get_comp_list(filename, line,search_string,index) if search_string.lower() in line.lower()]
            
        occurrences.append(rows_list)
    return occurrences       

def reducer_search_case_ins_list(occurrences1, occurrences2):
    """
    Reducer function.
    
    Parameters:
        occurrences1 - a list
        occurrences2 - a list
        
    Outputs:
        Returns a merged occurrences1, occurrences2
    """
    merged = []
    merged.append(occurrences1)
    merged.append(occurrences2)
    print(".")
    return merged

    

In [11]:
# HELPER FUNCTIONS
def get_comp_list(filename, string_line, search_string, line_number):
    """
    Helper function.
    Finds the index number that matches the search_string
    Parameters:
        string_line - the line of string to be searched on.
        search_string - the string that will be searched for in the string_line
        line_number - the line number from the contents of the file
    """
    start_index=0
    filename_details_rows = []
    while start_index < len(string_line):
        index_num = string_line.lower().find(search_string.lower(), start_index)
        if index_num < 0:
            break
        start_index = index_num + 1
        filename_details_rows.append([filename, line_number, index_num, get_line_context(string_line, index_num) ])
    return filename_details_rows 


def get_line_context(string_line, index_num):
    """
    Helper function.
    Parameters:
        string_line - a line of string from a file.
        index_num - the index corresponding to the index of the matched search string value
    
    Outputs:
        Returns an excerpt from the string_line sliced from the computed start and end indexes.
    """
    context_length = 10
    start_idx = index_num - context_length
    if start_idx < 0:
        start_idx = 0
    end_idx = index_num + context_length 
    if len(string_line) < end_idx:
        end_idx = len(string_line)
    
    return string_line[start_idx:end_idx]

In [12]:
results_case_insensitive_pair = map_reduce(file_names, 4, mapper_search_case_insensitive_pair, reducer_search)

In [13]:
comp_data_list = map_reduce(file_names, 4, mapper_search_case_ins_list2, reducer_search_case_ins_list3)

.
.
.


In [14]:
df = pd.DataFrame(comp_data_list , columns=["File","Line","Index","Context"])

Display the final dataframe output

In [15]:
df

Unnamed: 0,File,Line,Index,Context
0,Bay_of_ConcepciC3B3n.html,6,422,"""Coordinates on Wikidata"",""All"
1,Bay_of_ConcepciC3B3n.html,45,628,"-quiriquina.jpg 2x"" data-file-"
2,Bay_of_ConcepciC3B3n.html,45,650,"ta-file-width=""960"" data-file-"
3,Bay_of_ConcepciC3B3n.html,58,447,"l photos, and other data for t"
4,Bay_of_ConcepciC3B3n.html,58,692,"l photos, and other data for t"
...,...,...,...,...
20620,William_McDonald_(Australian_politician).html,117,40,"s"" class=""catlinks"" data-mw=""i"
20621,William_McDonald_(Australian_politician).html,200,1111,"ef=""https://www.wikidata.org/w"
20622,William_McDonald_(Australian_politician).html,200,1161,"=""Link to connected data repos"
20623,William_McDonald_(Australian_politician).html,200,1205,""" accesskey=""g"">Wikidata item<"
