Dataquest Guided Project - Multiprocessing in Python

This project will try to implement a grep-like functionality for searching for strings in html files. We'll be using Wikipedia articles for their structured data across a wide variety of articles.

In [2]:
import os
file_names = os.listdir("wiki")
file_names = [file for file in file_names if file.endswith('.html')]
display(file_names)

['100_Greatest_Romanians.html',
 '104th_Logistic_Support_Brigade_(United_Kingdom).html',
 '16th_Virginia_Infantry.html',
 '1896_Indiana_Hoosiers_football_team.html',
 '1898_Colgate_football_team.html',
 '1910_in_literature.html',
 '1915_Montana_football_team.html',
 '1951_National_League_tiebreaker_series.html',
 '1953E2809354_FA_Cup_qualifying_rounds.html',
 '1958_Wightman_Cup.html',
 '1988_State_of_Origin_series.html',
 '1st_Strategic_Aerospace_Division.html',
 '2001_Australian_Individual_Speedway_Championship.html',
 '2001_NCAA_Division_I_Field_Hockey_Championship.html',
 '2004_Tuvalu_ADivision.html',
 '2005E2809306_in_Welsh_football.html',
 '2007E2809308_Huddersfield_Town_A.F.C._season.html',
 '2008_Fed_Cup_World_Group_II.html',
 '2009_English_cricket_season.html',
 '2009_World_Junior_Ice_Hockey_Championships_rosters.html',
 '2010_Karshi_Challenger_E28093_Singles.html',
 '2011E2809312_Western_Collegiate_Hockey_Association_women27s_ice_hockey_season.html',
 '2011_ITU_Duathlon_World_

In [3]:
# How many files are we looking at?
display(len(file_names))

999

In [4]:
# Testing what the .html file looks like when it's read in Python.
with open(os.path.join('wiki', file_names[0]), 'r', encoding='utf-8') as file:
    file_text = file.readlines()
display(file_text)

['<!DOCTYPE html>\n',
 '<html class="client-nojs" lang="en" dir="ltr">\n',
 '<head>\n',
 '<meta charset="UTF-8"/>\n',
 '<title>100 Greatest Romanians - Wikipedia</title>\n',
 '<script>document.documentElement.className = document.documentElement.className.replace( /(^|\\s)client-nojs(\\s|$)/, "$1client-js$2" );</script>\n',
 '<script>(window.RLQ=window.RLQ||[]).push(function(){mw.config.set({"wgCanonicalNamespace":"","wgCanonicalSpecialPageName":false,"wgNamespaceNumber":0,"wgPageName":"100_Greatest_Romanians","wgTitle":"100 Greatest Romanians","wgCurRevisionId":739997309,"wgRevisionId":739997309,"wgArticleId":5885981,"wgIsArticle":true,"wgIsRedirect":false,"wgAction":"view","wgUserName":null,"wgUserGroups":["*"],"wgCategories":["Use dmy dates from November 2012","Articles containing Romanian-language text","Greatest Nationals","Lists of Romanian people","Romanian Television","Romanian television series"],"wgBreakFrames":false,"wgPageContentLanguage":"en","wgPageContentModel":"wikitext

In [8]:
# Copied map-reduce function
import math
import functools
from multiprocessing import Pool

def make_chunks(data, num_chunks):
    chunk_size = math.ceil(len(data) / num_chunks)
    return [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]

def map_reduce(data, num_processes, mapper, reducer):
    chunks = make_chunks(data, num_processes)
    pool = Pool(num_processes)
    chunk_results = pool.map(mapper, chunks)
    return functools.reduce(reducer, chunk_results)

In [None]:
# Count how many lines among all the files there are
import time
def map_line_count(file_chunk: list):
    total_lines = 0
    for file_name in file_chunk:
        f_path = os.path.join("wiki", file_name)
        with open(f_path) as file:
            next_line = file.readline()
            while next_line != "":
                total_lines += 1
                next_line = file.readline()
    return total_lines

def reduce_line_count(line_count1: int, line_count2: int):
    return line_count1 + line_count2

start = time.time()
wikipedia_lines = map_reduce(file_names, 4, map_line_count, reduce_line_count)
display(wikipedia_lines)
end = time.time()
process_time = end - start
print(f'Process used {process_time} seconds')
# 4 processes: 0.1118 seconds (on DQ)
# 8 processes: 0.1703 seconds (on DQ)
# 16 processes: 0.2262 seconds (on DQ)

In [None]:
# count all the occurances of "data" in the wiki folder
# map-reduce to a dictionary with filenames and the indexes of lines with the word "data"

def map_data_count(file_chunk: list):
    line_map = {}
    for file_name in file_chunk:
        f_path = os.path.join("wiki", file_name)
        with open(f_path) as file:
            index = 0
            next_line = file.readline()
            while next_line != "":
                if "data" in next_line:
                    if file_name not in line_map:
                        line_map[file_name] = []
                    line_map[file_name].append(index)
                next_line = file.readline()
                index += 1
    return line_map

def reduce_data_count(lines1: dict, lines2: dict):
    file_lines = {}
    file_lines.update(lines1)
    file_lines.update(lines2)
    return file_lines

start = time.time()
data_lines = map_reduce(file_names, 4, map_data_count, reduce_data_count)
end = time.time()
process_time = end - start
# display(data_lines)
print(f'Process used {process_time} seconds')
file_data_counts = [len(instance) for instance in data_lines.values()]
print(sum(file_data_counts))

In [None]:
# count all the occurances of "data" in the wiki folder, regardless of case
# map-reduce to a dictionary with filenames and the indexes of lines with the word "data"

def map_data_count(file_chunk: list):
    line_map = {}
    for file_name in file_chunk:
        f_path = os.path.join("wiki", file_name)
        with open(f_path) as file:
            index = 0
            next_line = file.readline()
            while next_line != "":
                if "data" in next_line.lower():
                    if file_name not in line_map:
                        line_map[file_name] = []
                    line_map[file_name].append(index)
                next_line = file.readline()
                index += 1
    return line_map

def reduce_data_count(lines1: dict, lines2: dict):
    file_lines = {}
    file_lines.update(lines1)
    file_lines.update(lines2)
    return file_lines

start = time.time()
caseless_data_lines = map_reduce(file_names, 4, map_data_count, reduce_data_count)
end = time.time()
process_time = end - start
# display(data_lines)
print(f'Process used {process_time} seconds')
file_data_counts = [len(instance) for instance in caseless_data_lines.values()]
print(sum(file_data_counts))

Original count: 10339
New count: 10504
Difference: 165

In [None]:
# compare sections of new matches
for filename in data_lines.keys():
    lowercase_data = len(data_lines[filename])
    anycase_data = len(caseless_data_lines[filename])
    if anycase_data > lowercase_data:
        print(filename, (anycase_data - lowercase_data))

In [None]:
# return the line index where the "data" word occurs
import re
def map_data_count(file_chunk: list):
    line_map = {}
    data_pat = re.compile(r'data')
    for file_name in file_chunk:
        f_path = os.path.join("wiki", file_name)
        with open(f_path) as file:
            index = 0
            next_line = file.readline().lower()
            while next_line != "":
                matches = re.finditer(data_pat, next_line)
                for match in matches:
                    if file_name not in line_map:
                        line_map[file_name] = []
                    line_map[file_name].append((index, match.start()))
                next_line = file.readline()
                index += 1
    return line_map

def reduce_data_count(lines1: dict, lines2: dict):
    file_lines = {}
    file_lines.update(lines1)
    file_lines.update(lines2)
    return file_lines

start = time.time()
caseless_data_indexes = map_reduce(file_names, 4, map_data_count, reduce_data_count)
end = time.time()
process_time = end - start
# display(data_lines)
print(f'Process used {process_time} seconds')
file_data_counts = [len(instance) for instance in caseless_data_indexes.values()]
print(sum(file_data_counts))

In [None]:
import pandas as pd
pd.options.display.max_columns = None
pd.options.display.max_colwidth = 100

csv_rows = []
for filename, line_index in caseless_data_indexes.items():
    with open(os.path.join("wiki", filename)) as file:
        file_lines = file.readlines()
    for line, index in line_index:
        context = file_lines[line]
        max_line = len(context)
        min_context = index - 25 if index >= 0 else 0
        max_context = index + 25 if (max_line - index) >= 25 else max_line
        
        new_line = [filename, line, index, context[min_context:max_context]]
        csv_rows.append(new_line)
df = pd.DataFrame(csv_rows, columns=['File', 'Line', 'Index', 'Context'])
df.to_csv('wiki_grep_data.csv')

In [None]:
display(df)

SUGGESTIONS FOR IMPROVEMENT FROM DATAQUEST:
    Consider files located in subdirectories.
Use the re module to make it possible to search for regular expressions.
Make it possible to specify the search options rather than having a search function for each set of options.