# Multiprocessing practice

In [1]:
# **********************************************

The current problem is that the parallel-pool methods aren't updating the drop_tally, batch, or the featureSet_MI dataframe.

The simpler methods might also not be; I haven't checked.

In [1]:
# **********************************************

In [2]:
%run 'UNSEEN_helper_functions.ipynb'
%store -r

Below are the libraries that would be imported if I ran 'UNSEEN_helper_functions.ipynb'.

In [3]:
import nest_asyncio
nest_asyncio.apply()
import numpy
import pandas
pandas.set_option('display.max_colwidth', None)
from datetime import date, datetime
import itertools
import scipy.stats
import sklearn.metrics
import math
import os
import matplotlib.pyplot
from google.cloud import bigquery, exceptions
from IPython.display import display, Markdown, Latex
from IPython import get_ipython
from tqdm import tqdm
import pyarrow.parquet
import pathlib
import timeit
import re
import rpy2.ipython

In [21]:
#if 'df_fs_database' not in globals():
#    %run ./"UNSEEN_create_database_feature_sets.ipynb"
my_featureSet_array = df_fs_database
featureSet_array = my_featureSet_array.head()
casenessVector = caseness_array[['person_id','CMHD']]
m = 3
representation = 'all'
source = 'database'
df_ppl_and_codes = df_ppl_and_codes
global verbose
verbose = True
savelocation = None

In [5]:
# Check order of feature set. If not provided,
# default to m = 1.    
if m == None:
    order_int = 1
    order_label = "Individuals"
    print("\nNo value for m provided." +
          "\n...Default value of m = 1 will be used.")
elif m == 1:
    order_int = m
    order_label = "Individuals"
elif m == 2:
    order_int = m
    order_label = "Pairs"
elif m == 3:
    order_int = m
    order_label = "Triplets"
else:
    print("\n** Error: Integer value between 1",
          "and 3 not supplied for m.**\n")

# Check and set save location.
if savelocation == None:
    savelocation = \
       ("Mutual information saves/"+\
        order_label)
    print("\nNo save location provided." +
          "\n...Defaulting to ~/" + savelocation)    

# ## Check encoding. If not provided, 
# ## default to OR encoding.
if representation == None:
    representation_label = "ALL"
    print("\nNo representation provided." +
          "\n...Defaulting to '" + representation_label + "' representation.")
elif representation == "all":
    representation_label = "ALL"
elif representation == "multi":
    representation_label = "MULTI"
else:
    print("\n** Error: Representation value from ",
          "{'and', 'multi'} not provided.**\n")

# ## Check the source argument is provided.
if source == None:
    print("\n** Error: No source argument provided.",
          "**\n")

# ## Set save string for particular caseness variable.
caseness_type = casenessVector.columns.values[-1]
if caseness_type == 'CMHD': 
    caseness_label = 'multinomial'
elif caseness_type == 'CMHD_dx_and_rx': 
    caseness_label = 'definite'
elif caseness_type == 'CMHD_rx_not_dx': 
    caseness_label = 'possible'
elif caseness_type == 'CMHD_control': 
    caseness_label = 'control'

print("\n\n\n****************************************")  
print("Calculating mutual information values...")

# Instantiate specific storage for mutual information.
featureSet_MI = \
    pandas.DataFrame(columns = ['Feature_set', 'Mutual_information'])

# Instantiate batch number.
batch = 0

# Instantiate tally of feature sets that are dropped due to low entropy.
drop_tally = 0

# Define entropy of the particular caseness variable.
entropy_caseness = \
    scipy.stats.entropy(casenessVector.iloc[:,-1].value_counts(),
                        base = math.e)


No save location provided.
...Defaulting to ~/Mutual information saves/Pairs



****************************************
Calculating mutual information values...


# **********************************************************************************

# Benchmark ways to call `processdatabasefs()`

Firstly, set the count of SNOMED-CT codes, k, whose combinations will be assessed.

In [16]:
k = 5
portion_size = 10
n_workers = 6

The trick to get multiprocessing to work in iPython is to call the function from another PY file rather than define the function within the current PY file.
Sources:
 - https://medium.com/@grvsinghal/speed-up-your-python-code-using-multiprocessing-on-windows-and-jupyter-or-ipython-2714b49d6fac
 - https://stackoverflow.com/questions/57103984/why-cant-jupyter-notebooks-handle-multiprocessing-on-windows

Below I assess the following options to expedite the processing of database feature sets:
- FOR loop
- `map()`
- list comprehension
- `itertools.starmap()`
- `multiprocessing.Pool.map()`
- `multiprocessing.Pool.imap_unordered()`
- `multiprocessing.Pool.starmap()`

## FOR loop with portioned generator

Applies a function to items, whereby:
- items are processed in series
- one at a time
- but only after converting all items to a list
- results are returned in the order in which they were submitted.

## `map()` with portioned generator

Applies a function to items, whereby:
- items are processed in series
- one batch at a time
- but only after converting all items to a list
- results are returned as one dump when everything is ready.

## List comprehension with portioned generator

Applies a function to items, whereby:
- items are processed in series
- in one batch
- all items are processed before being converted to a list
- results are returned in the order in which they were submitted.

## `itertools.starmap` with portioned generator

Like base `map()` but:
- iterables of iterables are acceptable.

The benefit of iterables is that they don't take as much memory to process.

## `multiprocessing.Pool.map` with portioned generator, and n_workers = 4

Like base `map()` but:
- items are processed in parallel
- but items are not converted into list to begin processing
- results are returned in the order in which they were submitted.

In [25]:
%%time
# Prerequisites
#
# The IPYNB file has already been run in this notebook but I'm repeating
# the run based on guidance from this blog:
# https://medium.com/@grvsinghal/speed-up-your-python-code-using-multiprocessing-on-windows-and-jupyter-or-ipython-2714b49d6fac
import itertools
from multiprocessing import set_start_method, Pool
from UNSEEN_helper_functions import processdatabasefs, init_worker, portion_maker
#set_start_method('spawn', force = True)
#%run 'UNSEEN_helper_functions.ipynb'

# Do the main work.
if __name__ ==  '__main__':
    # Make variable values that would otherwise exist within featuresetmi().
    featureSet_MI = \
        pandas.DataFrame(columns = ['Feature_set', 'Mutual_information'])
    batch = 0
    drop_tally = 0
    gen = itertools.combinations(df_fs_database.snomedcode[0:k], m)
    for portion in portion_maker(gen, portion_size):
        print(f"This batch is {portion}.")
        with Pool(processes = n_workers,
                  initializer = init_worker,
                  initargs = \
                      (
                      df_ppl_and_codes,
                      caseness_array,
                      drop_tally,
                      batch,
                      featureSet_MI,
                      entropy_caseness
                      )) as pool:
            list(pool.map(processdatabasefs,portion))

This batch is [(140004, 166001, 216004), (140004, 166001, 219006), (140004, 166001, 251007), (140004, 216004, 219006), (140004, 216004, 251007), (140004, 219006, 251007), (166001, 216004, 219006), (166001, 216004, 251007), (166001, 219006, 251007), (216004, 219006, 251007)].
CPU times: user 69.3 ms, sys: 126 ms, total: 195 ms
Wall time: 10.4 s


In [18]:
print(f'drop_tally = {drop_tally}')
print(f'batch = {batch}')

drop_tally = 0
batch = 0


## `multiprocessing.Pool.imap_unordered` with portioned generator, and n_workers = 4

Like base `map()` but:
- items are processed in parallel
- items are not converted into list to begin processing
- results are returned when they are complete, rather than in order in which they were submitted.

## `multiprocessing.Pool.starmap` with portioned generator, and n_workers = 4

Like `itertools.starmap()` but:
- iterable items are processed in parallel.

The benefit of iterables is that they don't take as much memory to process.