<a href="https://colab.research.google.com/github/stavis1/GST_colloquium/blob/main/GST_colloquium.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#GST Colloquium on Python coding


*   random grab-bag of tips and tricks
*   hyper-optimization is not encouraged in Python but these techniques can provide large performace gains for little-to-no extra work
*   In many cases these optimizations don't matter much and we've chosen small examples for practicality but when working on large datasets they can provide major practical improvements
*   examples are tasks our lab performs and negative examples are inspired by our own learning process



In [None]:
#TESTING ONLY
#this allows us to interact with google drive from within python
from google.colab import drive
#we mount our google drive which means that we make it acessable as a hard drive
#in our file system under the path '/content/drive'
drive.mount('/content/drive')
!cp /content/drive/MyDrive/GST_colloquium/* .

In [None]:
#colab comes with most common packages installed already but we want to use a
#few uncommon ones so we install these ourselves.
#The "!" tells jupyter to pass the line to the bash shell, this allows us to run
#command line tools like pip from within jupyter notebook.
!pip install brain-isotopic-distribution
!pip install multiprocesspandas
!pip install xlsx2csv

In [None]:
#it is generally recommended best practice to import the packages that come with
#python first before importing third party packages.
from collections import defaultdict
from collections import Counter
from functools import cache
from multiprocessing import Pool
from multiprocessing import Manager
import time
import os
import warnings
from io import StringIO
import re
import traceback

from brainpy import isotopic_variants
from numba import jit
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from multiprocesspandas import applyparallel
from scipy.optimize import minimize
from scipy.stats import zscore
from scipy.stats import binom
from sortedcontainers import SortedList
from xlsx2csv import Xlsx2csv
from sklearn.metrics import mean_squared_error

In [None]:
#This defines a function decorator, a concept which is used in §1.2.
#It times a function call and prints the elapsed time while suppressing warnings
def time_this(func):
  #the way this works is that time_this() creates and returns a new function
  #that replaces whatever funciton follows @time_this. But, because
  def inner(*args, **kwargs):
    #this context manager suppresses all warnings. DO NOT DO THIS NORMALLY
    #We have extensively tested this code and data and know that the warnings
    #can be safely ignored for this tutorial but that is not true for most code
    with warnings.filterwarnings('ignore'):
      #time.time() gives unix time in units of seconds
      start_time = time.time()
      #the try: ... except: block is a technique for gracefully handling errors.
      #first the code under try: is run and if an exception is raised the code
      #under except: is run.
      try:
        result = func(*args, **kwargs)
      except:
        #we want to still time a function call even if it fails so we catch any
        #exception generated and use the traceback package to print the error
        #message
        traceback.print_exc()
        #if func() fails it does not return anything so result is undefined
        #in order to continue our script we need to define result
        result = None
      elapsed_time = time.time() - start_time
    #prepending f to a string allows you to place expressions within braces {}
    #which are evaluated and the result is placed within the string
    print(f'Running {func.__name__} took {elapsed_time} seconds.')
    return result
  #we return the function inner which is then evaluated with the origional
  #arguments. Using @time_this is equivalent to running
  #time_this(func)(*args, **kwargs)
  return inner

#A test version without the warning suppression
def time_this(func):
  def inner(*args, **kwargs):
    start_time = time.time()
    try:
      result = func(*args, **kwargs)
    except:
      traceback.print_exc()
      result = None
    elapsed_time = time.time() - start_time
    print(f'Running {func.__name__} took {elapsed_time} seconds.')
    return result
  return inner

##1: Data Structures

The way data are stored in memory can have an enormous effect on the speed with which they can be processed for particular tasks. Choosing the correct data structure can be the difference between a runtime of hours or minutes in some situations.

###1.1 Hashmaps

Hashmaps are data structures that store key-value pairs in a way that allows you to very quickly look up a value (some data) based on a key (the datorum label). Dictionaries in python are hashmaps which can store any object as a value and can use any hashable object as a key. Sets are specialized hashmaps which only store true or false as values. These can be used to efficiently look up whether or not a key is in the set.

A hash is a function that maps objects of arbitrary size and complexity to a finite range of numbers. In the context of a hashmap data structure the finite range represents locations in memory and the hash function calculates where in memory a value is stored based on its key which means that the time it takes to find a value is only dependent on the time it takes to calculate the hash of a key. This contrasts with looking up values in a list which requires iterating over each item in the list until you find what you're looking for, an operation that becomes more expensive on average as the list grows in size.


<img align="right" src="https://github.com/stavis1/GST_colloquium/blob/main/Hashmapfigure-margin.png?raw=true" alt="Figure 1. Hashmap Data Call" title="Figure 1: Accessing Data in a Hashmap" width="440" hspace="20" vspace="10"> The tradeoff that you have to make with hashmaps is that there are some cases in which two keys have the same hash so their values must be stored in a list and iterated over for lookup. To minimize this problem the range of memory locations needs to be significantly larger than the number of values stored. This means that hash maps will generally be larger in memory than more compact data structures like lists. This is not as much of a problem as it first appears because  Python does not store the actual value at the memory locaiton calculated by the hash function. Instead the memory location it calculates is the index of a list that stores memory addresses which point to where the values are actually stored (see figure). This means that each block of memory indexed by the hashmap can be very small while still being able to point to values of arbitrary size. Therefore the memory overhead is only likely to be siginficant if you're storing a large number of small things.





**TLDR:** Storing data in hashmaps (like a python dictionary) can significantly improve data access speed, thereby speeding up the whole script.





###Example 1: file parsing


###1.2: Memoization


###Example 2: isotope abundance calculations

###1.3: Search trees

###Example 3: fuzzy matching

##2: Pandas and Numpy

###2.1: Faster .xlsx import
Large .xlsx files can be slow to import with pandas. This is because they are actually zipped folders of mostly XML files which need to be decompressed before going through the somewhat inefficient process of XML parsing. This setup gives excel the flexability to store images, graphs, formatting, and functions but, if all you want is a table of data, this flexability comes at significant performance costs. Thus it is generally preferable to store tabular data as a text file (.csv or .tsv) unless you absolutely need to store these   extra types of information.

If you have no choice but to use large .xlsx files there is still a faster way to import the data as a pandas dataframe. The xlsx2csv package is sigificantly faster at parsing .xlsx files than pandas. So, what we can do is parse the .xlsx file to an in-memory .csv (meaning that the file is stored only in RAM and not written to our hard drive) then use the very efficient pandas .csv parser to get the data into a dataframe. This trick comes from [stackoverflow](https://stackoverflow.com/questions/28766133/faster-way-to-read-excel-files-to-pandas-dataframe#comment136215400_28769537).


###Example 4: metabolomics data
metabolomics_data.xlsx is an example of the files produced during metabolomics experiments in our lab. It is an export of unfiltered search results from Compound Discoverer which we use for downstream analysis.

For practicality purposes we have reduced the size of the .xlsx file for this demonstration significantly. In its original form (256547 rows by 506 columns) pandas .read_excel() took 16 minutes to load the file, the Xlsx2csv trick took 7 minutes, and reading in the .tsv version of the file with pandas .read_csv() took 16 seconds. The remarkable increase in speed between the original document and its current version is not merely due to cutting the number of rows in half. The extra formatting information that was included in the export, which was stripped during our file processing that went .xlsx -> .tsv -> .xlsx,  has a far larger impact. This can be seen in the relatively modest improvement in load time for the tsv reader as compared to the impressive improvements for the two direct .xlsx readers. We expect that these improvements are due to the removed formatting having massively complicated the XML representation of the data.



In [None]:
#The simple .tsv import
@time_this
def tsv_import(file_path):
  df = pd.read_csv(file_path, sep = '\t')
  return df

#The in-memory .csv conversion trick described above
@time_this
def fast_import(file_path):
    #buffer is the in-memory variable in which the converted .csv is placed
    buffer = StringIO()
    #here we do the actual .xlsx parsing and conversion to .csv
    #note that skip_hidden_rows is true by default unlike other .xlsx readers
    Xlsx2csv(file_path,
             outputencoding="utf-8",
             skip_hidden_rows = False).convert(buffer,sheetid=0)
    buffer.seek(0)
    #now we import the .csv with pandas exactly as if it were a regular file
    df = pd.read_csv(buffer, low_memory=False, skiprows=1)
    return df

#the pandas native approach to excel import
@time_this
def pandas_import(file_path):
  df = pd.read_excel(file_path, 0)
  return df

In [None]:
tsv_df = tsv_import('metabolomics_data.tsv')
fast_df = fast_import('metabolomics_data.xlsx')
pandas_df = pandas_import('metabolomics_data.xlsx')

###2.2: Using vectorized operations
Numpy carries out most data processing operations using highly optimized pre-compiled code that is orders of magnitude faster than anything Python can natively accomplish. The optimizations include single instruction, multiple data (SIMD) steps in which a single operation that would normally be applied serially to values in an array can be applied simultaniously to blocks of 4-8 values. When doing mathematical operations it is therefore best to let numpy handle as much of the processing as possible. This means that we should avoid writing explicit loops or accessing individual values within numpy arrays as much as possible and instead perform operations on whole arrays at once using numpy functions.

This strategy holds true for other packages like Pandas, Scipy, and Scikit-Learn as well because Numpy provides most of the data processing functionality for these packages.  

###Example 5: normalization
Proteomic normalization and imputation are important to minimize non-biological variation and ensure reliability of your dataset. Normalization is used to minimize variation coming from factors such as sample preparation inconsistencies, injection volume errors, and fluctuations in instrument sensitivity, which can happen with extended use (i.e. sensitivity drift with instrument cleanliness, or column age). By accounting for these variations, normalization allows for the comparison of biological variances rather than technical ones. Following normalization, imputation fills missing values with low abundance numbers that mimic the instrument's noise floor, allowing for statistical testing across conditions. Both normalization and imputation create a baseline from which conditions can be compared in a more robust way.


In [None]:
#The data processing produces one file per condition that we read into a list
#of dataframes
prot_files = [f for f in os.listdir() if f.endswith('Proteins.txt')]
dataframes = [pd.read_csv(f, sep = '\t') for f in prot_files]
key_cols = ['Accession','Description','MW [kDa]','# AAs']
#We need to merge these data into a single dataframe for further processing
proteins = dataframes.pop()
for df in dataframes:
  #on = key_cols is the list of columns that we use as a key for which rows to merge
  #how = 'outer' means we use the union of these keys
  proteins = proteins.merge(df, how = 'outer', on = key_cols)
#we want a list of the columns containing protein abundances for processing
abundance_cols = [c for c in proteins.columns if c.startswith('Abundance')]
#For this example we only want to work with the key and abundance columns
#so we get rid of all the other data in this dataframe.
#The * operator unpacks an interable so [1,*[2,3,4]] becomes [1,2,3,4].
proteins = proteins[[*key_cols, *abundance_cols]]

In [None]:
rng = np.random.default_rng(1)

@time_this
def vector_norm_impute(values):
  #processing is done in the log space because intensity is log-normally
  #distributed across proteins
  values = np.log(values)
  #we calculated the mean and standard deviation of all values in all samples
  gmean = np.nanmean(values)
  gstd = np.nanstd(values)
  #intensities are z-scored by column (axis = 0), which corresponds to files
  values = zscore(values, axis = 0, nan_policy = 'omit')
  #we un-do the z-scoring with the grand mean and standard deviation so that
  #all files now have the same log-mean and log-standard deviation
  values = (values*gstd)+gmean
  #we impute by taking draws from a normal distribution centered 1.8 standard
  #deviations below the mean to simulate the correlation between missingness and
  #intensity
  loc = gmean - (1.8*gstd)
  scale = gstd*0.3
  #The strategy here is to generate a random number for each value in our data
  #and then only transfer random values to our data at the missing value
  #positions. We do generate significantly more random values than necessary
  #this way but the vectorized approach is so much faster than repeated calls to
  #rng that it is still the preferred approach
  imputed_values = rng.normal(loc, scale, values.shape)
  mask = np.isnan(values) #an array of true/false for if a value is missing
  values[mask] = imputed_values[mask] #only missing values are replaced
  #the data are taken back out of log space for downstream processing
  values = np.exp(values)
  return values

@time_this
def loop_norm_impute(values):
  #this is how you do a nested loop within a list comprehension
  logvalues = [np.log(value) for sample in values for value in sample]
  #as above we pre-calculate variables that depend on all values
  gmean = np.nanmean(logvalues)
  gstd = np.nanstd(logvalues)
  loc = gmean - (1.8*gstd)
  scale = gstd*0.3
  #in this verison we construct a list of lists
  new_values = []
  for i in range(values.shape[1]):
    sample = values[:,i]
    new_sample = []
    #in order to do the normalization elementwise we have to precalculate
    #sample-specific values
    smean = np.nanmean(sample)
    sstd = np.nanstd(sample)
    for value in sample:
      #all the processing from here out is the same but done on one element
      #at a time
      if np.isnan(value):
        new_sample.append(rng.normal(loc, scale))
      else:
        value = np.log(value)
        zscore = (value-smean)/sstd
        value = (zscore*gstd)+gmean
        value = np.exp(value)
        new_sample.append(value)
    new_values.append(new_sample)
  #the .T is a transpose function which swaps rows for columns
  return np.array(new_values).T


In [None]:
vec_proteins = proteins.copy()
vec_proteins[abundance_cols] = vector_norm_impute(vec_proteins[abundance_cols].to_numpy())
loop_proteins = proteins.copy()
loop_proteins[abundance_cols] = loop_norm_impute(loop_proteins[abundance_cols].to_numpy())

###2.3: Building and accessing dataframes


###Example 6: protein summary statistics

In [None]:
#This is the ideal approach for data processing if what you are doing can be
#done with numpy. axis = 1 means that an operation is applied rowwise while
#axis = 0 is applied columnwise
@time_this
def numpy_stats(df):
  df['mean'] = np.nanmean(df[abundance_cols], axis = 1)
  df['std'] = np.nanstd(df[abundance_cols], axis = 1)
  return df

#If the function can't be done with numpy (dictionary lookups are a common
#example of this class of tasks) then this strategy is probably your best bet.
#What we do here is zip together the columns we care about processing and then
#apply a function over them within a list comprehension. Remember that zip
#collects elements into tuples e.g. zip([1,2,3],[4,5,6]) -> ((1,4),(2,5),(3,6)).
#We turn the zip into a list so that it can be used twice.
@time_this
def listcomp_stats(df):
  abundance_iter = list(zip(*[df[c] for c in abundance_cols]))
  df['mean'] = [np.nanmean(row) for row in abundance_iter]
  df['std'] = [np.nanstd(row) for row in abundance_iter]
  return df

#Iterrows behaves similarly to the above code but is generally slower. It is
#somewhat easier to read than the zip(*[df[c] for c in cols]) ideom above so for
#uses where performance is not a concern it could be preferred.
@time_this
def iterrows_stats(df):
  means = []
  stds = []
  #iterrows gives both the index and row at each iteration
  for i,row in df[abundance_cols].iterrows():
    #sequential calls to .append() are also slower than using list comprehension
    means.append(np.nanmean(row))
    stds.append(np.nanstd(row))
  df['mean'] = means
  df['std'] = stds
  return df

#Here we construct empty columns in the data frame and fill them element by
#element using df.at to access individual values.
#Accessing, and especially modifying, individual entries in a dataframe is
#extremely slow and should be done only as a very last resort. If you are
#filling or modifying an entire column or row then there is never a need to use
#this approach.
@time_this
def at_stats(df):
  #these two lines initialize empty columns
  df['mean'] = ''
  df['std'] = ''
  for i,row in df[abundance_cols].iterrows():
    df.at[i,'mean'] = np.nanmean(row)
    df.at[i,'std'] = np.nanstd(row)
  return df


In [None]:
numpy_df = numpy_stats(proteins.copy())
listcomp_df = listcomp_stats(proteins.copy())
iterrows_df = iterrows_stats(proteins.copy())
at_df = at_stats(proteins.copy())

In [None]:
#The pandas native way of doing this
@time_this
def melt(df):
  df = df.melt(id_vars = key_cols, value_vars = abundance_cols)
  return df

#Here we make a list of dataframes, one per analyte column, that each have the
#metadata columns and a single analyte intensity column. Finally this list is
#concatenated together with a single call to pd.concat()
@time_this
def loop_melt(df):
  dfs = [ ]
  for a in abundance_cols:
    tmp_df = df[[*key_cols,a]].copy()
    #we have to rename the analyte column so that they all end up as a single
    #column in the concatenated dataframe
    tmp_df.columns = [*key_cols,'Abundnace']
    #to keep track of what analyte each row represents we make a list of the
    #column name with as many elements as the dataframe has rows and add that
    #as a column
    tmp_df['sample'] = [a]*df.shape[0]
    dfs.append(tmp_df)
  df = pd.concat(dfs)
  return df

#this works the same way as loop_melt() but at each iteration the temp_df is
#concatenated to a growing dataframe. These repeated calls to pd.concat() are
#expensive and result in noticably poorer performance.
@time_this
def accumulator_melt(df):
  new_df = pd.DataFrame()
  for a in abundance_cols:
    tmp_df = df[[*key_cols,a]].copy()
    tmp_df.columns = [*key_cols,'Abundnace']
    tmp_df['sample'] = [a]*df.shape[0]
    new_df = pd.concat([new_df, tmp_df])
  return new_df

In [None]:
melt_df = melt(proteins.copy())
print(melt_df.shape)
loop_melt_df = loop_melt(proteins.copy())
print(loop_melt_df.shape)
accum_melt_df = accumulator_melt(proteins.copy())
print(accum_melt_df.shape)


##3: Black Box Optimizers

###scipy.optimize
Optimizers are useful tools for a variety of tasks. If you ever find yourself doing a "guess and check" procedure, consider automating it with an optimizer function. The scipy package contains a variety of off-the-shelf black box optimizers that are easily applicable to a variety of situations. All you need is a way of numerically summarizing task "success". This is accomplished with a loss function (function to calculate error). The optimize will start with an initial guess you provide and iteratively adjust the value searching for an error minimum.

**Possible Pitfall:** If your initial guess is too far off, the scipy optimizer can give pathological results, without nessesarily throwing an error.

###Example 7: isotopic packet fitting
In one Hettich-lab project, it was nessesary to estimate the level of lipid deuteration by fitting a binomial distribution to isotope abundance data. Given the setup of the experiment, the binomial probability would correspond to the deuteration percent in the lipid. Someone whose name is definitely **not** Matthew built an excel spreadsheet to generate binomial distributions from a given probability. Matthew--\*ahem\*--this nameless person proceeded to generate binomial distributions from a variety of probabilities, plot them against the observed data, and generate new probabilites based on the perceived goodness of fit.

This can be done more precisely and much faster using scipy.optimize.minimize. Basically the data are fed into minimize() which then changes the binomial probablility a bunch of times until it identifies a minimum point. In this case, we are calculating the root-mean squared error in our loss function.

In [None]:
class hashabledict(dict): #if needed to prevent collisions when memoizing formulae
    def __hash__(self):
        return hash(tuple(sorted(self.items())))

@cache
def formparser(chemformula): #This version uses memoize to speed up if there are many calculations with repeated formulae
    chemform = chemformula.strip(" ")
    parsed = re.findall(r'([A-Z][a-z]?)(\d*)',chemform)
    composition = {p[0]:int(p[1]) if p[1] else 1 for p in parsed}
    #return(hashabledict(composition))
    return(composition)

def deu_binom(totalhydrogen, pD, nonexH = 3): #Generate binomial distribution; need to adjust nonexH based on how many hydrogen in the formula are expected to back exchange with the solvent and be unlabeled
    lit = []
    for n in range(totalhydrogen-nonexH+1):
        lit.append(binom.pmf(n, totalhydrogen-nonexH, pD))
    arr = np.array(lit)
    return arr

@cache
def natisodist(formuladict): # Generated natural isotope distribution for correction; uses memoize to potentially speed processing
    theoretical_isotopic_cluster = isotopic_variants(formuladict, npeaks=5, charge=1)
    array = np.empty(0)
    for peak in theoretical_isotopic_cluster:
        array = np.append(array, peak.intensity)
    return(array)

def natcor_deudist(formula, pD, scaled = True): #Combine natural isotope distribution with deuterium distibution
    try:
      pD =  pD[0] if hasattr(pD, '__len__') else pD
      formdict = hashabledict(formparser(str(formula)))
      nH = formdict.pop('H')
      natdist = natisodist(formdict)
      deudist = deu_binom(nH, pD)
      totdist = np.convolve(natdist, deudist)
      if scaled:
          totdist = totdist/max(list(totdist))
      return(totdist)
    except:
      print(formula, pD, scaled)
      print(type(pD))

def error(pD,formula,actualdata):
    '''
    calculate expected dist from elements given using above function.
    calculate RMS error from actualdata as compared to calculated data.
    '''
    expecteddata = natcor_deudist(formula,pD)
    actualdataext = np.append(actualdata, [0,0,0,0,0,0])
    actualdatatrim = actualdataext[:len(expecteddata)]
    return(mean_squared_error(actualdatatrim, expecteddata, squared=False))

def optimize(pD, formula, actualdata):
    '''
    Use err function for least squared optimization of pD.
    Takes RMS error calculated by err and varies pD to minimize RMSE
    Returns least_squares optimization output
    Needs pD to be in the ballpark of the best value, otherwise behaviour can be weird
    Assumes Carbon, Hydrogen, Oxygen, Nitrogen, and actualdata are defined prior to using optimize
    '''
    results = minimize(error,
                       x0 = [pD],
                       bounds = [[0,1]],
                       method = 'Nelder-Mead',
                       args=(formula, actualdata)) #trf/dogbox
    param = results.x
    return(results)

@time_this
def deuterium_fitting(df):
    int_cols = [*df.columns[7:]]
    int_iter = list(zip(*[df[c] for c in int_cols]))
    pDs = list(df["pD"])
    formula = list(df["Formula"])
    test = [optimize(pD, form, row) for row, pD, form in zip(int_iter, pDs, formula)]
    maxiso = int(str(df.iloc[:,-1].name)[1:])
    df["BestFit pD"] = [t['x'][0] for t in test]
    df["RMSE"] = [t['fun'] for t in test]
    df["fun termination"] = [t['message'] for t in test]
    def exp_dist(formula, x):
        dist = natcor_deudist(formula, x)
        dist = np.concatenate((dist,np.zeros(maxiso - len(dist))))
        return dist
    dists = np.array([exp_dist(f, t['x'][0]) for f,t in zip(formula, test)])
    dists = pd.DataFrame(dists, columns= [f'isotope_{i}' for i in range(dists.shape[1])])
    df = pd.concat([df,dists], axis = 1)
    return df



In [None]:
df = pd.read_csv("DL_Example_3-25-24.csv", sep=',', encoding="UTF-8")

dfproc = deuterium_fitting(df)

##4: Multiprocessing

Most modern computers that we are likely to do any amount of bioinformatics on have more than one core, meaning they can process more than one instruction at a time. Python is not typically capable of taking advantage of this ability because it is constrained to run only one instruction at a time. This constriant has the advantage of making python code less prone to bugs and far easier to write, but sometimes we need the speed that parallelism provides. Python provides the multiprocessing package that provides a suite of tools for creating, running and communicating with multiple processes concurrently. A process is the in-memory representation of your code, associated data, and the instructions being run by the CPU.  

There is a distinction to be made between logical and pysical CPU cores. A physical core is a self-contained computational unit capable of carrying out all the tasks we expect of a CPU. Having multiple physical cores always means that you can do multiple computations at once. Some physical cores are capable of doing two computations at a time if, and only if, those two tasks are different, e.g. adding two numbers while bit shifting another. This capacity is represented by the OS as "logical cores". If you're doing many different tasks having two logical cores is almost as good as having two physical cores but if there is significant overlap in tasks there's much less of an advantage. Google colab has only a single physical core with two logical cores. This is not a good processor for demonstrating the advantages of multiprocessing so be aware that you can expect far more dramatic performance improvements on other systems than what you see here.

In the ideal case, splitting up a task across N cores will run in 1/N the time that the task took to run serially. However, this is never achieved in practice because the parallelization process itself requires extra work to carry out. In python specifically there is a lot of extra work to be done because the entire interpreter (the program that reads and runs your code) needs to be duplicated for each process.

Additionally, in order to communicate between processes, e.g. to pass inputs to or results from a paralellized function, the data need to be serialized into a string of bytes then unpacked at the other end. This process can be qite slow for very large variables like dataframes or arrays. On Linux (and google colab runs on Linux) there is a workaround for this. The default way for a new process to be created on Linux is for the child process to share its parent memory until one of the processes modifies a chunk of that memory, at which point only that chunk is copied into a process-specific memory pool. This means that any variables available in the global scope when a child process is created remains available to that child process. On other operating systems this behavior is simulated by re-running the script from the top in each child process which can have significant performance costs if you aren't careful.



###4.1: multiprocessing.Pool
Python's built-in multiprocessing package has many tools for paralell processing that allow an advanced user to set up complex archetectures. We, however, want something simple and the Pool module provides that. Pool will set up a pool of worker processes then, with a function and an iterable of inputs, will distribute the inputs across worker processes to be evaluated in parallel.
We care about two methods that Pool provides: .map(func, jobs) works for functions with a single input, and .starmap(func, jobs) works for functions with multiple inputs where jobs = [(input1a, input1b, ...), ..., (inputNa, inputNb, ...)]

In [42]:
#this function passes the input data to optimize() directly which means it must
#be serialized to send it between processes
@time_this
def mp_row(lipid_data):
  lipids = lipid_data.copy()
  start_cols = list(lipids.columns)
  input_cols = ['pD', 'Formula', *lipids.columns[7:]]
  maxiso = int(str(lipids.iloc[:,-1].name)[1:])
  jobs = zip(zip(*[lipids[c] for c in input_cols]), [maxiso]*lipids.shape[0])

  if __name__ == '__main__':
    with Pool() as p:
      results = p.starmap(optimize, jobs)
  results = pd.DataFrame(np.array(results)).T
  lipids = pd.concat((lipids,results))
  new_cols = ["BestFit pD",
              "RMSE",
              "func termination",
              *[f'isotope_{i}' for i in range(results.shape[1] - 3)]]
  lipids.columns = start_cols + new_cols
  return lipids

def idx_opt(i):
  row = jobs[i]
  return optimize(row,maxiso)

@time_this
def mp_idx(lipid_data):
  lipids = lipid_data.copy()
  start_cols = list(lipids.columns)
  input_cols = ['pD', 'Formula', *lipids.columns[7:]]
  global maxiso
  maxiso = int(str(lipids.iloc[:,-1].name)[1:])
  global jobs
  jobs = list(zip(*[lipids[c] for c in input_cols]))

  if __name__ == '__main__':
    with Pool() as p:
      results = p.map(idx_opt, range(len(jobs)))
  results = pd.DataFrame(np.array(results)).T
  lipids = pd.concat((lipids,results))
  new_cols = ["BestFit pD",
              "RMSE",
              "func termination",
              *[f'isotope_{i}' for i in range(results.shape[1] - 3)]]
  lipids.columns = start_cols + new_cols
  return lipids

In [None]:
lipids = pd.read_csv('DL_Example_3-25-24.csv')

row_results = mp_row(lipids)
idx_results = mp_idx(lipids)

One downside of multiprocess.Pool is that if a fatal error is encountered in a function call only the process that encountered the error will termninate. This means that all the other jobs are run by the remaining worker processes before the error is communincated to the main process and your script finally stops. Debugging then becomes slow and painful. What follows is a code snippit that will skip function evaluation for all jobs if one worker encounters and error. This trick comes from [here.](https://superfastpython.com/multiprocessing-pool-stop-all-tasks/)

In [None]:
#Don't bother running this cell. It is only included to make it easy to
#copy-paste it into your own code
from multiprocessing import Pool
from multiprocessing import Manager
#this is a built-in package for printing out more informative error messages
import traceback

#the function we wish to parallelize
def task(input):
  #event is a shared object that can be seen by all child processes.
  #if .is_set() is True that means there's been an error so function evaluation
  #is skipped
  if event.is_set():
    return
  #the try: ... except: block is a technique for gracefully handling errors.
  #first the code under try: is run and if an exception is raised the code under
  #except: is run.
  try:
    #your code here
    return
  except:
    #we print the error message to the console
    traceback.print_exc()
    #and we set the event so that .is_set() returns True and all remaining
    #function evaluations are skipped
    event.set()

if __name__ == '__main__':
  jobs = [] #a list of inputs to task()
  #the manager takes care of communicating the state of event between processes
  with Manager() as manager:
    shared_event = manager.Event()

    #this function will be called first when starting a child process it makes
    #event visible as a global variable within that process
    def init_worker(shared_event):
      global event
      event = shared_event

    with Pool(initializer=init_worker, initargs=(shared_event,)) as p:
      results = p.map(task, jobs)

In [38]:
def fail_optimize(i):
  row = jobs[i]
  pD, formula = row[:2]
  actualdata = row[2:]
  results = minimize(error,
                      x0 = pD,
                      bounds = [[0,1]],
                      method = 'Nelder-Mead',
                      args=(formula, actualdata))
  param = results.x
  dist = natcor_deudist(formula, param[0])
  dist = np.concatenate((dist,np.zeros(maxiso - len(dist))))
  results = (param[0], results.fun, results.message, *dist)
  if i == 0:
    raise Exception()
  return results

def safe_task(input):
  if event.is_set():
    return
  try:
    return fail_optimize(input)
  except:
    traceback.print_exc()
    event.set()
    return None

def unsafe_task(input):
    return fail_optimize(input)

@time_this
def run_safe(lipid_data):
  if __name__ == '__main__':
    lipids = lipid_data.copy()
    start_cols = list(lipids.columns)
    input_cols = ['pD', 'Formula', *lipids.columns[7:]]
    global maxiso
    maxiso = int(str(lipids.iloc[:,-1].name)[1:])
    global jobs
    jobs = list(zip(*[lipids[c] for c in input_cols]))
    with Manager() as manager:
      shared_event = manager.Event()

      def init_worker(shared_event):
        global event
        event = shared_event

      with Pool(initializer=init_worker, initargs=(shared_event,)) as p:
        return p.map(safe_task, range(len(jobs)))

@time_this
def run_unsafe(lipid_data):
  if __name__ == '__main__':
    lipids = lipid_data.copy()
    start_cols = list(lipids.columns)
    input_cols = ['pD', 'Formula', *lipids.columns[7:]]
    global maxiso
    maxiso = int(str(lipids.iloc[:,-1].name)[1:])
    global jobs
    jobs = list(zip(*[lipids[c] for c in input_cols]))
    with Pool() as p:
      return p.map(unsafe_task, range(len(jobs)))

In [None]:
lipids = pd.read_csv('DL_Example_3-25-24.csv')
run_safe(lipids)
run_unsafe(lipids)

###Example 9: bootstrapping


###4.2 Multiprocesspandas

The multiprocesspandas package implements a drop-in replacement for the pandas .apply() called .apply_parallel() which allows you to easily apply a function to either rows or columns of a data frame in parallel. Parallelization reqires some amount of overhead so for simple functions that are fast to compute this overhead may result in the total runtime being longer than taking a serial approach. In these cases overhead can be minimized by trying different values for the n_chunks parameter, although the default should be ideal in most circumstances.

Internally multiprocesspandas usess multiprocessing.pool so the considerations outlined in §4 apply here as well. Which, in light of colab only having 2 CPU cores is why we see very little improvement from parallelization in this situation.

In [None]:
def optimize(row, niso):
    '''
    Use err function for least squared optimization of pD.
    Takes RMS error calculated by err and varies pD to minimize RMSE
    Returns least_squares optimization output
    Needs pD to be in the ballpark of the best value, otherwise behaviour can be weird
    Assumes Carbon, Hydrogen, Oxygen, Nitrogen, and actualdata are defined prior to using optimize
    '''
    pD, formula = row[:2]
    actualdata = row[2:]
    results = minimize(error,
                       x0 = pD,
                       bounds = [[0,1]],
                       method = 'Nelder-Mead',
                       args=(formula, actualdata))
    param = results.x
    dist = natcor_deudist(formula, param[0])
    dist = np.concatenate((dist,np.zeros(niso - len(dist))))
    return (param[0], results.fun, results.message, *dist)

@time_this
def parallel_fit(lipid_data):
    lipids = lipid_data.copy()
    start_cols = list(lipids.columns)
    input_cols = ['pD', 'Formula', *lipids.columns[7:]]
    maxiso = int(str(lipids.iloc[:,-1].name)[1:])
    results = lipids[input_cols].apply_parallel(optimize, niso = maxiso)
    results = pd.DataFrame(np.array(results)).T
    lipids = pd.concat((lipids,results))
    new_cols = ["BestFit pD",
                "RMSE",
                "func termination",
                *[f'isotope_{i}' for i in range(results.shape[1] - 3)]]
    lipids.columns = start_cols + new_cols
    return lipids


@time_this
def serial_fit(lipid_data):
    lipids = lipid_data.copy()
    start_cols = list(lipids.columns)
    input_cols = ['pD', 'Formula', *lipids.columns[7:]]
    maxiso = int(str(lipids.iloc[:,-1].name)[1:])
    results = lipids[input_cols].apply(optimize, niso = maxiso, axis = 1)
    results = pd.DataFrame(np.array(results)).T
    lipids = pd.concat((lipids,results))
    new_cols = ["BestFit pD",
                "RMSE",
                "func termination",
                *[f'isotope_{i}' for i in range(results.shape[1] - 3)]]
    lipids.columns = start_cols + new_cols
    return lipids

In [None]:
lipids = pd.read_csv('DL_Example_3-25-24.csv')

parallel = parallel_fit(lipids)
serial = serial_fit(lipids)