In [1]:
#!/usr/bin/env python
# coding: utf8
"""
Example of multi-processing with Joblib and getting entity relations by thread.

Prerequisites: pip install joblib
"""
from __future__ import print_function, unicode_literals

from pathlib import Path
from joblib import Parallel, delayed
from functools import partial
import pkg_resources
#pkg_resources.require('SpaCy==2.1.0')
import plac
import spacy
from spacy.util import minibatch
#import neuralcoref
import pandas as pd
import pathos.multiprocessing

@plac.annotations(
    model=("Model name (needs tagger)", "positional", None, str),
    n_jobs=("Number of workers", "option", "n", int),
    batch_size=("Batch-size for each process", "option", "b", int),
    limit=("Limit of entries from the dataset", "option", "l", int),
)

def init():
    global nlp
    nlp = spacy.load('en_core_web_sm')
# def main(output_dir=Path('/test'), model="en_core_web_sm", 
#          n_jobs=5, batch_size=1000, limit=1000):
#     nlp = spacy.load(model)  # load spaCy model
#     # serialize_spacy(nlp)
#     print("Loaded model '%s'" % model)
#     if not output_dir.exists():
#         output_dir.mkdir()
#     # load and pre-process emails
#     print("Loading data...")
#     base_df, focused_df = load_processed_data()
#     message_ids = focused_df.iloc[:, 0].tolist()
#     print("Processing texts...")
#     p = multiprocessing.Pool(12)
#     p.map(process_texts(nlp, b, focused_df['Subject'])
# #     partitions = minibatch(message_ids, size=batch_size)
# #     executor = Parallel(n_jobs=n_jobs, backend="multiprocessing", prefer="processes")
# #     do = delayed(partial(process_texts))
# #     tasks = (do(i, batch, output_dir, focused_df=focused_df, base_df=base_df) for i, batch in enumerate(partitions))
# #     executor(tasks)    


def load_processed_data(data="emails_processed.csv"):
    base_df = pd.read_csv(data, nrows=5000)
    # base_df = base_df.set_index('Message-ID', drop=False)
    # base_df = base_df[0:5000]
    focused_df = base_df[['Message-ID', 'Subject']]
    return base_df, focused_df
    
# def process_texts(nlp, batch_id, message_ids, output_dir, focused_df, base_df):
#     batch_df = focused_df[focused_df['Message-ID'].isin(message_ids)]
#     print(nlp.pipe_names)
#     out_path = Path(output_dir) / ("%d.pkl" % batch_id)
#     print("Processing batch", batch_id)
#     threads = process_threads(nlp, batch_df, base_df)
#     output = open(out_path, 'wb')
#     pickle.dump(threads, output)
#     output.close()
#     print("Processed {} emails in batch {}".format(len(message_ids), batch_id))
#     return threads


def clean_threads(subject_df, subject_col='Subject'):
    subject_df = subject_df.astype(str)

    for index, row in subject_df.iterrows():
        row[subject_col] = row[subject_col].replace('Re:', '')
        row[subject_col] = row[subject_col].replace('re:', '')
        row[subject_col] = row[subject_col].replace('RE:', '')
        row[subject_col] = row[subject_col].replace('FW:', '')
        row[subject_col] = row[subject_col].strip()
    return subject_df


def process_threads(nlp, subject_df, base_df, subject_col='Subject', content_col='content'):
    subject_df = clean_threads(subject_df)

    # Group by subject
    subject_dict = subject_df.groupby(subject_col).groups

    # Get content for emails in same thread
    thread_dict = defaultdict(list)
    thread_dict_processed = defaultdict(list)
    for key, value in subject_dict.items():
        for value in subject_dict[key]:
            thread_dict[key].append(base_df.loc[value][content_col])

    for key in thread_dict.keys():
        thread_dict_processed[key] = ' '.join(thread_dict[key])

    # Pass processed strings to SpaCy pipeline
    with mp.Pool(initializer=init) as pool:
        for key in thread_dict_processed.keys():
            thread_dict_processed[key] = pool.map(nlp.pipe, thread_dict_processed[key])

    return thread_dict_processed


NameError: Annotating non-existing argument: model

In [None]:
main()


In [None]:
batch_size = 1000
n_jobs = 5
output_dir = Path('/test')

In [None]:
def load_processed_data(data="emails_processed.csv"):
    base_df = pd.read_csv(data, nrows=5000)
    focused_df = base_df[['Message-ID', 'Subject']]
    return base_df, focused_df

In [None]:
nlp = spacy.load("en_core_web_sm")  # load spaCy model
print("Loaded model '%s'" % "en_core_web_sm")
# load and pre-process emails
print("Loading data...")
base_df, focused_df = load_processed_data()
message_ids = focused_df.iloc[:, 0].tolist()
print("Processing texts...")
partitions = minibatch(message_ids, size=batch_size)
executor = Parallel(n_jobs=n_jobs, backend="multiprocessing", prefer="processes")
do = delayed(partial(process_texts, nlp))
tasks = (do(i, batch, output_dir, focused_df=focused_df, base_df=base_df) for i, batch in enumerate(partitions))
executor(tasks)   