In [1]:
import os
import re
import time
import json
from pathlib import Path
from functools import partial
from multiprocessing import Lock, Process, cpu_count

import numpy as np
from joblib import Parallel, delayed
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from gensim.models.doc2vec import Doc2Vec



### `multiprocessing` simple example

In [2]:
lock = Lock()

def spawn(num):
    with lock:
        print("Process [{}]: {}".format(os.getpid(), num))
        

for i in range(25):
    Process(target=spawn, args=(i,)).start()

Process [762620]: 0
Process [762623]: 1
Process [762626]: 2
Process [762631]: 3
Process [762632]: 4
Process [762635]: 5
Process [762636]: 6

Process [762637]: 7Process [762642]: 8
Process [762645]: 9
Process [762648]: 10
Process [762653]: 11
Process [762658]: 12
Process [762661]: 13
Process [762669]: 15
Process [762666]: 14
Process [762670]: 16
Process [762673]: 17
Process [762677]: 19
Process [762676]: 18
Process [762680]: 20
Process [762683]: 21
Process [762686]: 22
Process [762695]: 23
Process [762698]: 24


### `joblib` simple example

In [3]:
lock = Lock()

def f(num):
    with lock:
        print(num**2)
    time.sleep(2)


parallel = Parallel(n_jobs=4, backend="multiprocessing", verbose=50, 
                    batch_size=1, max_nbytes=None, mmap_mode=None)

_ = parallel(delayed(f)(i) for i in range(25))

0
1
4
9
[Parallel(n_jobs=4)]: Using backend MultiprocessingBackend with 4 concurrent workers.
16
25
36
49
[Parallel(n_jobs=4)]: Done   1 tasks      | elapsed:    2.0s
[Parallel(n_jobs=4)]: Done   2 tasks      | elapsed:    2.0s
[Parallel(n_jobs=4)]: Done   3 tasks      | elapsed:    2.0s
[Parallel(n_jobs=4)]: Done   4 tasks      | elapsed:    2.1s
64
81
100
121
[Parallel(n_jobs=4)]: Done   5 tasks      | elapsed:    4.0s
[Parallel(n_jobs=4)]: Done   6 tasks      | elapsed:    4.0s
[Parallel(n_jobs=4)]: Done   7 tasks      | elapsed:    4.0s
[Parallel(n_jobs=4)]: Done   8 tasks      | elapsed:    4.1s
144
169
196
225
[Parallel(n_jobs=4)]: Done   9 tasks      | elapsed:    6.0s
[Parallel(n_jobs=4)]: Done  10 tasks      | elapsed:    6.0s
[Parallel(n_jobs=4)]: Done  11 tasks      | elapsed:    6.0s
[Parallel(n_jobs=4)]: Done  12 tasks      | elapsed:    6.1s
256
289
324
361
[Parallel(n_jobs=4)]: Done  13 tasks      | elapsed:    8.0s
[Parallel(n_jobs=4)]: Done  14 tasks      | elapsed:   

### `multiprocessing` applicable example

#### Defining helper functions

In [6]:
def filter_stocks(stocks):
    return [s for s in stocks if s["wiki"] is not None]


def read_jsonl_file(filename, processing_func=None):
    objects = list()
    with open(filename, "r", encoding="utf8") as fp:
        for line in fp:
            obj = json.loads(line)
            objects.append(obj)
    if processing_func:
        objects = processing_func(objects)
    return objects


def tokenize(text):
    stop_words = set(stopwords.words("english"))
    tokenized = [token.lower() for token in word_tokenize(text) if token.isalpha()]
    filtered = [token for token in tokenized if not token in stop_words]
    return filtered


def read_and_process_articles(article_paths):
    articles = list()
    for i, path in enumerate(article_paths):
        
        # Assering that loop index matches with filename of the article
        assert i == int(path.stem)
        
        with open(path, "r", encoding="utf-8") as fp:
            full_txt = fp.read()
            
        articles.append(tokenize(full_txt))
    
    return articles
        

def get_model_str_repr(model, epochs_param_str_marker="ep"):
    model_str = str(model).replace("/", "-")
    epochs = model.epochs
    return f"{model_str[:-1]},{epochs_param_str_marker}{epochs})"

#### Defining core functions

In [7]:
LOCK = Lock()

def multiproccess_print(status_msg, lock=LOCK):
    if lock:
        with lock:
            print("[PID: {}] {}".format(os.getpid(), status_msg))
    else:
        print(status_msg)


def infer_article_vectors(model, articles, lock=LOCK, vectors_per_article=100, epochs=20):
    
    # Initializing empty array for article vectors
    avs = np.empty((len(articles), vectors_per_article, model.vector_size))
    
    # Inferring vectors for articles
    for i, article in enumerate(articles):
        for j in range(vectors_per_article):
            status_msg = ("Inferring vector #{} for article {}.txt with model {}"
                              .format(j+1, i, get_model_str_repr(model)))
            multiproccess_print(status_msg, lock)  # Displaying status message
            avs[i, j, :] = model.infer_vector(article, epochs=epochs)
    
    return avs


def infer_entity_vectors(model, stock, lock=LOCK, vectors_per_entity=10, epochs=20):
    
    # Initializing empty arrays for entity vectors
    evs_full = np.empty((vectors_per_entity, model.vector_size))
    evs_summary = np.empty((vectors_per_entity, model.vector_size))
    evs_child = np.empty((len(stock["entities"]), vectors_per_entity, model.vector_size))
    
    # Displaying status message
    status_msg = ("Inferring entity {} vectors with model {}"
                      .format(stock["ticker"], get_model_str_repr(model)))
    multiproccess_print(status_msg, lock)
    
    # Inferring vectors for the parent entity
    for i in range(vectors_per_entity):
        evs_full[i, :] = model.infer_vector(stock["content"], epochs=epochs)
        evs_summary[i, :] = model.infer_vector(stock["summary"], epochs=epochs)
        
    # Inferring vectors for child entities
    for i, child_entity in enumerate(stock["entities"]):
        for j in range(vectors_per_entity):
            evs_child[i, j, :] = model.infer_vector(child_entity["summary"], epochs=epochs)
    
    return evs_full, evs_summary, evs_child

            
def infer_vectors(articles, stocks, model_path, lock=LOCK, vectors_per_article=100, 
                  vectors_per_entity=10, epochs=20):
    # Load model
    model = Doc2Vec.load(str(model_path))
    model_str_repr = get_model_str_repr(model)
    
    # Set up directories for article vectors
    avs_filename = ("vpa{vpa}.ep{ep}.av.vectors.npy"
                        .format(vpa=vectors_per_article, ep=epochs))
    avs_path = (Path(PATH_TO_RESULTS) 
                   / Path("multiprocess" if lock else "single_process")
                   / Path(model_str_repr)  
                   / Path(ARTICLE_VECTORS_DIR)
                   / avs_filename)
    
    # Infer article vectors and save them to a file if they don't already exist
    if not avs_path.exists():
        avs_path.parent.mkdir(parents=True, exist_ok=True)
        avs = infer_article_vectors(model, articles, lock=lock, 
                                    vectors_per_article=vectors_per_article, 
                                    epochs=epochs)
        multiproccess_print(("Saving article vectors for {} to {}..."
                                 .format(model_str_repr, avs_path)), lock)
        np.save(avs_path, avs)
    else:
        multiproccess_print(f"Article vectors for {model_str_repr} already exist.", lock)
    
    for stock in stocks:
        # Set up directories for entity vectors
        entity = stock["ticker"]
        evs_filename = ("{ticker}.vpe{vpe}.ep{ep}.ev.vectors.npy"
                            .format(ticker=entity, vpe=vectors_per_entity, ep=epochs))
        evs_path = (Path(PATH_TO_RESULTS) 
                       / Path("multiprocess" if lock else "single_process")
                       / Path(model_str_repr)  
                       / Path(ENTITY_VECTORS_DIR)
                       / evs_filename)
        
        # Infer entity vectors and save them to a file if they don't already exist
        if not Path(str(evs_path) + ".npz").exists(): # NumPy array archives have additional .npy suffix
            evs_path.parent.mkdir(parents=True, exist_ok=True)
            evs_full, evs_summary, evs_child = infer_entity_vectors(model, stock, lock=lock, 
                                                                    vectors_per_entity=vectors_per_entity,
                                                                    epochs=epochs)
            multiproccess_print(("Saving entity {} vectors for {} to {}..."
                                     .format(entity, model_str_repr, evs_path)), lock)
            np.savez(evs_path, evs_full=evs_full, evs_summary=evs_summary, evs_child=evs_child)
        else:
            multiproccess_print(f"Entity {entity} vectors for {model_str_repr} already exist.", lock)

#### Setting up paths

In [8]:
PATH_TO_STOCKS = "./stocks/"
PATH_TO_MODELS = "./models/"
PATH_TO_ARTICLES = "./articles/"
PATH_TO_RESULTS = "./results/"
ARTICLE_VECTORS_DIR = "article_vectors"
ENTITY_VECTORS_DIR = "entity_vectors"

stocks_path = list(Path(PATH_TO_STOCKS).glob("*.jsonl"))[0]
model_paths = [path for path in Path(PATH_TO_MODELS).glob("*/*")
               if path.suffix == ""]
article_paths = sorted(list(Path("./articles/").glob("*.txt")), 
                       key=lambda x: int(x.stem))

#### Reading data

In [9]:
stocks = read_jsonl_file(stocks_path)
articles = read_and_process_articles(article_paths)

#### Without `multiprocessing`

In [12]:
t0 = time.time()

for model_path in model_paths:
    infer_vectors(articles[:1], stocks[:3], model_path, vectors_per_article=10)
    
t1 = time.time() - t0
print("Single CPU core bound task performance: {:.0f}s".format(t1))

[PID: 762600] Article vectors for Doc2Vec(dm-c,d100,n20,w4,mc5,s1e-05,t4,ep20) already exist.
[PID: 762600] Entity AMD vectors for Doc2Vec(dm-c,d100,n20,w4,mc5,s1e-05,t4,ep20) already exist.
[PID: 762600] Entity AA vectors for Doc2Vec(dm-c,d100,n20,w4,mc5,s1e-05,t4,ep20) already exist.
[PID: 762600] Entity BABA vectors for Doc2Vec(dm-c,d100,n20,w4,mc5,s1e-05,t4,ep20) already exist.
[PID: 762600] Article vectors for Doc2Vec(dm-c,d100,n20,w3,mc5,s1e-05,t4,ep20) already exist.
[PID: 762600] Entity AMD vectors for Doc2Vec(dm-c,d100,n20,w3,mc5,s1e-05,t4,ep20) already exist.
[PID: 762600] Entity AA vectors for Doc2Vec(dm-c,d100,n20,w3,mc5,s1e-05,t4,ep20) already exist.
[PID: 762600] Entity BABA vectors for Doc2Vec(dm-c,d100,n20,w3,mc5,s1e-05,t4,ep20) already exist.
[PID: 762600] Article vectors for Doc2Vec(dm-c,d100,n20,w2,mc5,s1e-05,t4,ep20) already exist.
[PID: 762600] Entity AMD vectors for Doc2Vec(dm-c,d100,n20,w2,mc5,s1e-05,t4,ep20) already exist.
[PID: 762600] Entity AA vectors for Doc

#### With `multiprocessing`

In [13]:
t0 = time.time()

lock = Lock()
processes = []

for model_path in model_paths:
    p = Process(target=infer_vectors, args=(articles[:1], stocks[:3], model_path),
                kwargs=dict(lock=lock, vectors_per_article=10))
    processes.append(p)
    p.start()
    multiproccess_print("Process [PID:{}] started".format(p.pid))
    
for p in processes:
    p.join()
    
t1 = time.time() - t0
print("Multiple CPU core bound task performance: {:.0f}s".format(t1))

[PID: 762600] Process [PID:763268] started
[PID: 762600] Process [PID:763269] started
[PID: 762600] Process [PID:763270] started
[PID: 762600] Process [PID:763271] started
[PID: 763270] Article vectors for Doc2Vec(dm-c,d100,n20,w2,mc5,s1e-05,t4,ep20) already exist.
[PID: 763270] Entity AMD vectors for Doc2Vec(dm-c,d100,n20,w2,mc5,s1e-05,t4,ep20) already exist.
[PID: 763271] Article vectors for Doc2Vec(dm-c,d100,n20,w1,mc5,s1e-05,t4,ep20) already exist.
[PID: 763270] Entity AA vectors for Doc2Vec(dm-c,d100,n20,w2,mc5,s1e-05,t4,ep20) already exist.
[PID: 763271] Entity AMD vectors for Doc2Vec(dm-c,d100,n20,w1,mc5,s1e-05,t4,ep20) already exist.
[PID: 763271] Entity AA vectors for Doc2Vec(dm-c,d100,n20,w1,mc5,s1e-05,t4,ep20) already exist.
[PID: 763271] Entity BABA vectors for Doc2Vec(dm-c,d100,n20,w1,mc5,s1e-05,t4,ep20) already exist.
[PID: 763270] Entity BABA vectors for Doc2Vec(dm-c,d100,n20,w2,mc5,s1e-05,t4,ep20) already exist.
[PID: 763269] Article vectors for Doc2Vec(dm-c,d100,n20,w3

#### Checking for side effects
The purpose of this check is to see wether using `multiprocessing` affected inferred vectors in any way comparing them to the same vectors computer linearly.

In [14]:
def cosine_similarity(v1, v2):
    return np.einsum('ij,ij->i', v1, v2) / (np.linalg.norm(v1, axis=1)*np.linalg.norm(v2, axis=1))

In [15]:
# avs - article vectors
avs_single_paths = sorted(list((Path(PATH_TO_RESULTS) / "single_process").glob("*/article_vectors/*.npy")),
                         key=lambda x: int(re.search("w\d{1}", str(x)).group()[1:]))
avs_multi_paths = sorted(list((Path(PATH_TO_RESULTS) / "multiprocess").glob("*/article_vectors/*.npy")),
                        key=lambda x: int(re.search("w\d{1}", str(x)).group()[1:]))


for paths in zip(avs_single_paths, avs_multi_paths):
    assert paths[0].parent.parent.name == paths[1].parent.parent.name
    model_str = print(paths[0].parent.parent.name)
    
    avs_single = np.load(paths[0])[0, :, :]
    avs_multi = np.load(paths[1])[0, :, :]
    
    mean_similarity = cosine_similarity(avs_single, avs_multi).mean()
    print("Mean cosine similarity between articles:", mean_similarity,  "\n")

In [16]:
evs_single_paths = sorted(list((Path(PATH_TO_RESULTS) / "single_process").glob("*/entity_vectors/*.npz")),
                         key=lambda x: int(re.search("w\d{1}", str(x)).group()[1:]))
evs_multi_paths = sorted(list((Path(PATH_TO_RESULTS) / "multiprocess").glob("*/entity_vectors/*.npz")),
                        key=lambda x: int(re.search("w\d{1}", str(x)).group()[1:]))

In [17]:
for paths in zip(evs_single_paths, evs_multi_paths):
    assert paths[0].parent.parent.name == paths[1].parent.parent.name
    assert paths[0].name.split(".")[0] == paths[1].name.split(".")[0]
    
    model_str = paths[0].parent.parent.name
    entity = paths[0].name.split(".")[0]
    
    print(model_str, entity)
    
    npzfile_single = np.load(paths[0])
    npzfile_multi = np.load(paths[1])
    
    evs_full_single = npzfile_single["evs_full"]
    evs_summary_single = npzfile_single["evs_summary"]
    evs_summary_child = npzfile_single["evs_child"]
    
    evs_full_multi = npzfile_multi["evs_full"]
    evs_summary_multi = npzfile_multi["evs_summary"]
    evs_summary_multi = npzfile_multi["evs_child"]
    
    for key in ["evs_full", "evs_summary", "evs_child"]:
        evs_single = npzfile_single[key]
        evs_multi = npzfile_multi[key]
        if key == "evs_child":
            evs_single = evs_single.reshape((evs_single.shape[0]*evs_single.shape[1],
                                             evs_single.shape[2]))
            evs_multi = evs_multi.reshape((evs_multi.shape[0]*evs_multi.shape[1],
                                             evs_multi.shape[2]))
        similarities = cosine_similarity(evs_single, evs_multi)
        print(f"{key} mean cosine similarity {similarities.mean()}")
    print("\n")

There are negligible differences between similarities as expected, due to the random vector initialization nature of Doc2Vec

###  `joblib` applicable example

In [18]:
t0 = time.time()

parallel = Parallel(n_jobs=4, backend="multiprocessing", verbose=50, 
                    batch_size=1, max_nbytes=None, mmap_mode=None)

partial_infer_vectors = partial(infer_vectors, articles[:1], stocks[:3], vectors_per_article=10)
parallel(delayed(partial_infer_vectors)(model_path) for model_path in model_paths)
    
    
t1 = time.time() - t0
print("Multiple CPU core bound task performance: {:.0f}s".format(t1))

[Parallel(n_jobs=4)]: Using backend MultiprocessingBackend with 4 concurrent workers.
[PID: 763340] Article vectors for Doc2Vec(dm-c,d100,n20,w2,mc5,s1e-05,t4,ep20) already exist.
[PID: 763340] Entity AMD vectors for Doc2Vec(dm-c,d100,n20,w2,mc5,s1e-05,t4,ep20) already exist.
[PID: 763340] Entity AA vectors for Doc2Vec(dm-c,d100,n20,w2,mc5,s1e-05,t4,ep20) already exist.
[PID: 763340] Entity BABA vectors for Doc2Vec(dm-c,d100,n20,w2,mc5,s1e-05,t4,ep20) already exist.
[PID: 763338] Article vectors for Doc2Vec(dm-c,d100,n20,w4,mc5,s1e-05,t4,ep20) already exist.

[PID: 763338] Entity AMD vectors for Doc2Vec(dm-c,d100,n20,w4,mc5,s1e-05,t4,ep20) already exist.[PID: 763338] Entity AA vectors for Doc2Vec(dm-c,d100,n20,w4,mc5,s1e-05,t4,ep20) already exist.
[PID: 763338] Entity BABA vectors for Doc2Vec(dm-c,d100,n20,w4,mc5,s1e-05,t4,ep20) already exist.
[PID: 763341] Article vectors for Doc2Vec(dm-c,d100,n20,w1,mc5,s1e-05,t4,ep20) already exist.
[PID: 763341] Entity AMD vectors for Doc2Vec(dm-c,