In [1]:
from multiprocessing import Pool
import os
import timeit
import json
import pandas as pd
import nltk
import numpy as np

In a previous lesson, you created a corpus preprocessor to process the featured articles from Wikipedia. In this exercise, you will re-implement the preprocessor to use Python’s multiprocessing library.

Throughout this exercise, you will use Python’s timeit library to measure the performance of certain parts of your code. Implement your code so you can specify the number of processes your code is allowed to use. Report your performance results for N=1, 2, 4, 8, and 16 processes using the following format.

In [2]:
class wiki_articles:  
    def __init__(self, path, filetype, njobs):
        self.json_files = []
        self.all_articles = []
        self.njobs = njobs
        self.get_articles(path, filetype)
        self.read_entire_directory()
    def get_articles(self, path, filetype):
        for file in os.listdir(path):
            if file.endswith(filetype):
                file_path = path + file
                self.json_files.append(file_path)
    def read_articles(self, article_file):
        with open(article_file) as f:
            lines = f.readlines()
        articles = [json.loads(line) for line in lines]
        self.all_articles.append(articles)
    def read_entire_directory(self):
        with Pool(self.njobs) as pool:
            pool.map(self.read_articles, self.json_files)
    def ret_articles(self):
        return self.all_articles

In [3]:
jobs = [1,2,4,8,16]

In [None]:
for i in jobs:
    %time reading_articles = wiki_articles("C:\\Users\\Dan Siegel\\Desktop\\Classes\\550\\data\\wikipedia\\featured-articles\\", "jsonl", i)

b. Tokenize articles

Create a function or class method that tokenizes each of the articles from the previous step. Measure the time it takes to tokenize all of the articles for the different number of processes.

The following provides a rough outline of the code for this task.

In [None]:
class wiki_articles:  
    def __init__(self, path, filetype, njobs):
        self.json_files = []
        self.all_articles = []
        self.njobs = njobs
        self.get_articles(path, filetype)
    def get_articles(self, path, filetype):
        for file in os.listdir(path):
            if file.endswith(filetype):
                file_path = path + file
                self.json_files.append(file_path)
    def read_articles(self):
        for i in self.json_files:
            with open(i) as f:
                lines = f.readlines()
            articles = [json.loads(line) for line in lines]
            self.all_articles.append(articles)
    def tokenize_document(self, document):
        toks = nltk.word_tokenize(document['section_texts'])
        return toks
    def tokenize_documents(self):
        with Pool(njobs) as pool:
            tokenized_docs = pool.map(self.tokenize_document, self.all_articles)
    def ret_articles(self):
        return self.all_articles

In [None]:
for i in jobs:
    reading_articles = wiki_articles("C:\\Users\\Dan Siegel\\Desktop\\Classes\\550\\data\\wikipedia\\featured-articles\\", "jsonl", i)
    %time reading_articles.tokenize_documents()

c. Word Count

Create a function or a class method that counts the number of times each word appears throughout the entire collection of articles. The result should be a Python dictionary containing each word as a key and the word count as a value.

You should use the map-reduce paradigm when implementing your code. The previous two steps demonstrated the use of the map function. The map function simply applies a function across a list of inputs. The map function is naturally scalable as each is the result of applying the function to each input is independent. In a cluster compute environment like Hadoop, the map function is used to distribute tasks across multiple work nodes. Each map task processes data that is local to the computer it is working on and there is no need to share information between the worker nodes.

The reduce function combines the results from all the different map tasks into a single result. The output of each map task is a collection of key/value pairs (in our case a Python dictionary where the key is the word and the value is the word count). The reduce part of the algorithm applies a function to task results that share the same key. For this exercise, that function is the sum function that adds all of the word counts together.

While the details of how Hadoop performs parallel computations is beyond the scope of this exercise, the map-reduce paradigm should give you a general idea of how tasks are distributed across multiple machines.

The following code provides a very rough outline of how to implement word counting using the map-reduce paradigm. You should ensure the map portion of the code runs using multiple processes. The reduce portion will not use multiple processes. Perform the word count task using the different number of processes and report the performance results.

In [None]:
from functools import reduce 
from sklearn.feature_extraction.text import CountVectorizer
from collections import Counter
class wiki_articles:  
    def __init__(self, path, filetype, njobs):
        self.json_files = []
        self.all_articles = []
        self.njobs = njobs
        self.get_articles(path, filetype)
    def get_articles(self, path, filetype):
        for file in os.listdir(path):
            if file.endswith(filetype):
                file_path = path + file
                self.json_files.append(file_path)
    def read_articles(self):
        for i in self.json_files:
            with open(i) as f:
                lines = f.readlines()
            articles = [json.loads(line) for line in lines]
            self.all_articles.append(articles)
    def tokenize_document(self, document):
        toks = nltk.word_tokenize(document['section_texts'])
        return toks
    def tokenize_documents(self):
        with Pool(njobs) as pool:
            tokenized_docs = pool.map(self.tokenize_document, self.all_articles)
    def ret_articles(self):
        return self.all_articles
    def count_words(self, document):
        data = dict(Counter(document['section_texts'].split()))
        return data
    def count_all_words(self):
        word_count_dics = map(self.count_words, self.all_articles)
        word_counts = reduc(dict_sum, word_count_dicts)

In [None]:
for i in jobs:
    reading_articles = wiki_articles("C:\\Users\\Dan Siegel\\Desktop\\Classes\\550\\data\\wikipedia\\featured-articles\\", "jsonl", i)
    %time reading_articles.count_all_words()

# 2 Workflow
Create workflows and the associated tasks for the following:
- Text preprocessing
- Text classification
    - Binary text classification, model evaluation, and selection
    - Multi-class text classification, model evaluation, and selection
- Topic modeling


In [4]:
#Text Preprocessing:
def task1(param1,param2):
    ''' ingest and find all the input file(s) that will be used in the preprocessing
    Args:
        Param1(str): File Path
        Param2(str): File Extension
    Returns:
        List:  all files and their full paths to be read into the workflow
    '''
def task2(param2,param3):
    ''' takes the files identified by task1 and reads them into a list based on their extension type
    Args:
        Param2(str): File Extension
        Param3(list): Full Filepaths of all expected files
    Returns:
        List: all files read into their own list index  
    '''
def task3(param4):
    '''Takes the raw text read which was read into a list and removes punctuation, 
    stopwords, stemms the words 
    Args:
        Param4(list): list of all unprocessed text, each document representing it's own index
    Returns:
        List: List of stemmed words without punctuation or stopwords. 
    '''
#Text classification
def task4(param5, param6, param7):
    '''Takes the cleaned text and runs it through classification models
    Args:
        Param5(list): stemmed words without punctuation or stopwords
        Param6(list): list of models to apply to the corpus
        Param7(binary): if this is a binary classification yes or no
    Returns:
        List: List of stemmed words without punctuation or stopwords and models trained on the data and tested.
        Matrix: returns a confusion matrix with scoring
    '''
#topic modeling
def task5(param5):
    '''Takes the cleaned text and runs it through topic modeling models
    Args:
        Param5(list): stemmed words without punctuation or stopwords
    Returns:
        List: returns predictions of topics 
    '''

In [5]:
workflow = {
    "tasks": [
        (task1, task2),
        (task1, task3),
        (task1, task5),
        (task2, task5),
        (task3, task5),
        (task2, task4),
        (task3, task4)
    ],
    "params": {
        "param": "File Path",
        "param": "File Extension",
        "param": "Full Filepaths of all expected files",
        "param": "list of all unprocessed text, each document representing it's own index",
        "param": "stemmed words without punctuation or stopwords",
        "param": "list of models to apply to the corpus",
        "param": "if this is a binary classification yes or no"
    }
}