In [29]:
%%time
dict0 = {'a':[1, 2], 'b':['+', '*'], 'c':['!', '@']}
for a in dict0['a']:
    for b in dict0['b']:
        for c in dict0['c']:
            print(f'a {a}, b {b}, c {c}')

a 1, b +, c !
a 1, b +, c @
a 1, b *, c !
a 1, b *, c @
a 2, b +, c !
a 2, b +, c @
a 2, b *, c !
a 2, b *, c @
CPU times: user 94 μs, sys: 5 μs, total: 99 μs
Wall time: 102 μs


In [31]:
%%time
from itertools import product
dict0 = {'a':[1, 2], 'b':['+', '*'], 'c':['!', '@']}
jobs = (dict(zip(dict0, i)) for i in product(*dict0.values()))
for i in jobs:
    print(i)

{'a': 1, 'b': '+', 'c': '!'}
{'a': 1, 'b': '+', 'c': '@'}
{'a': 1, 'b': '*', 'c': '!'}
{'a': 1, 'b': '*', 'c': '@'}
{'a': 2, 'b': '+', 'c': '!'}
{'a': 2, 'b': '+', 'c': '@'}
{'a': 2, 'b': '*', 'c': '!'}
{'a': 2, 'b': '*', 'c': '@'}
CPU times: user 336 μs, sys: 0 ns, total: 336 μs
Wall time: 310 μs


### Single thread vs Multithreading vs Multiprocessing

In [61]:
import numpy as np
def main0():
    r = np.random.normal(0, .01, size=(1000, 10000))
    t = barrierTouch(r)
    return t

def barrierTouch(r, width=0.5):
    # find the index of the earliest barrier touch
    t, p = {}, np.log((1+r).cumprod(axis=0))
    for j in range(r.shape[1]):
        for i in range(r.shape[0]):
            if p[i,j] >= width or p[i, j] <= -width:
                t[j] = i
                continue
    return t

if __name__ =='__main__':
    import timeit
    print(min(timeit.Timer('main0()', setup='from __main__ import main0').repeat(5,10)))

15.201339991996065


In [70]:
import numpy as np
import multiprocessing as mp
def main1():
    r, numThreads = np.random.normal(0, 0.01, size=(1000, 10000)), mp.cpu_count()
    parts = np.linspace(0, r.shape[0], min(numThreads, r.shape[0])+1)
    parts, jobs = np.ceil(parts).astype(int), []
    for i in range(1, len(parts)):
        jobs.append(r[:,parts[i-1]:parts[i]]) # parallel jobs
    pool, out = mp.Pool(processes=numThreads), []
    outputs = pool.imap_unordered(barrierTouch, jobs)
    for out_ in outputs:
        out.append(out_)
    pool.close()
    pool.join()
    return out

if __name__=='__main__':
    import timeit
    print(min(timeit.Timer('main1()', setup='from __main__ import main1').repeat(5,10)))

1.8688174480048474


In [3]:
def linParts(numAtoms, numTreads):
    parts = np.linspace(0, numAtoms, min(numTreads, numAtoms)+1)
    parts = np.ceil(parts).astype(int)
    return parts
linParts(1000, 6)

array([   0,  167,  334,  500,  667,  834, 1000])

In [4]:
def nestedParts(numAtoms, numThreads, upperTriang=False):
    # partition of atoms with an inner loop
    parts, numThreads_ = [0], min(numThreads, numAtoms)
    for num in range(numThreads_):
        part = 1 + 4*(parts[-1]**2+parts[-1]+numAtoms*(numAtoms+1.)/numThreads_)
        part = (-1+part**.5)/2.
        parts.append(part)

    parts=np.round(parts).astype(int)
    if upperTriang:
        parts=np.cumsum(np.diff(parts)[::-1])
        parts=np.append(np.array([0]), parts)
    return parts

In [6]:
nestedParts(1000, 10, True)

array([   0,   51,  106,  163,  226,  293,  368,  453,  553,  684, 1000])

In [15]:
def expandCall(kargs):
    # Expand the arguments of a callback hunction kargs['func']
    func = kargs['func']
    del kargs['func']
    out = func(**kargs)
    return out

In [18]:
def processJobs_(jobs):
    # run jobs sequentially, for debugging
    out = []
    for job in jobs:
        out_ = expandCall(job)
        out.append(out_)
    return out

In [None]:
import multiprocessing as mp
import time
import datetime as dt
import sys
#------------------------------
def reportProgress(jobNum, numJobs, time0, task):
    # Report progress as asynch jobs are completed
    msg = [float(jobNum)/numJobs, (time.time()-time0)/60.]
    msg.append(msg[1]*(1/msg[0]-1))
    timeStamp = str(dt.datetime.fromtimestamp(time.time()))
    msg = timeStamp+' '+str(round(msg[0]*100, 2))+'% '+task+' done after '+\
        str(round(msg[1], 2))+' minutes. Remaining '+str(round(msg[2], 2))+' minutes.'
    if jobNum<numJobs:
        sys.stderr.write(msg+'\r')
    else:
        sys.stderr.write(msg+'\n')
    return

#--------------------------------
def processJobs(jobs, task=None, numThreads=24):
    # run in parallel
    # jobs must contain a 'func' callback, for expandCall
    if task is None:
        task = jobs[0]['func'].__name__
    pool = mp.Pool(processes=numThreads)
    outputs, out, time0 = pool.imap_unordered(expandCall, jobs), [], time.time()
    # process asynchronous output, report progress
    for i, out_ in enumerate(outputs, 1):
        out.append(out_)
        reportProgress(i, len(jobs), time0, task)
    pool.close()
    pool.join()
    return out

In [118]:
jobs=[{'func': barrierTouch, 'r': np.random.normal(0, .1, size=(100,1000))}, {'func': barrierTouch, 'r':np.random.normal(0, .01, size=(100,109))}]
print(jobs[0]['func'].__name__)
out = processJobs(jobs, task=None, numThreads=2)
len(out)

barrierTouch


2024-11-19 14:24:47.367496 100.0% barrierTouch done after 0.0 minutes. Remaining 0.0 minutes.


2

In [151]:
mpPandasObj(barrierTouch, ('r', r_df.columns), numThreads=2)

AttributeError: 'RangeIndex' object has no attribute 'cumprod'

In [174]:
r.shape


(1000, 10000)

In [171]:
barrierTouch(r_df)

{174: 285,
 190: 884,
 191: 884,
 192: 884,
 193: 884,
 194: 884,
 199: 884,
 223: 130,
 224: 752,
 225: 752,
 227: 130,
 228: 130,
 242: 752,
 243: 884,
 244: 884,
 245: 285,
 246: 285,
 247: 285,
 248: 130,
 249: 130,
 250: 752,
 251: 752,
 252: 752,
 253: 752,
 254: 752,
 255: 752,
 256: 752,
 257: 752,
 258: 752,
 259: 752,
 260: 752,
 261: 752,
 262: 752,
 263: 752,
 264: 752,
 265: 752,
 266: 752,
 267: 884,
 268: 884,
 269: 884,
 270: 884,
 271: 884,
 272: 884,
 273: 884,
 274: 884,
 275: 884,
 276: 884,
 277: 884,
 278: 884,
 279: 884,
 280: 884,
 281: 884,
 282: 884,
 283: 884,
 284: 884,
 285: 884,
 286: 884,
 287: 884,
 288: 884,
 289: 884,
 290: 884,
 291: 884,
 292: 884,
 293: 884,
 294: 884,
 295: 884,
 296: 884,
 297: 884,
 298: 884,
 299: 884,
 300: 884,
 301: 752,
 302: 752,
 303: 564,
 304: 752,
 305: 752,
 306: 752,
 307: 752,
 308: 752,
 309: 884,
 310: 884,
 311: 884,
 312: 884,
 313: 884,
 314: 884,
 315: 884,
 316: 884,
 317: 884,
 318: 884,
 319: 884,
 320: 884,

In [170]:
barrierTouch(r)

AttributeError: 'numpy.ndarray' object has no attribute 'iloc'

In [169]:
def barrierTouch(r, width=0.5):
    # find the index of the earliest barrier touch
    t, p = {}, np.log((1+r).cumprod(axis=1))
    for j in range(r.shape[1]):
        for i in range(r.shape[0]):
            if p.iloc[i,j] >= width or p.iloc[i, j] <= -width:
                t[j] = i
                continue
    return t

In [139]:
def mpPandasObj(func, pdObj, numThreads=24, mpBatches=1, linMols=True, **kargs):
    '''
    Parallelize jobs, return a DataFrame or Series
    + func: function to be parallelized. Return a DataFrame
    + pdObj[0]: Name of argument used to pass the molecule
    + pdObj[1]: List of atoms that will be grouped into molecules
    + kargs: any other argument needed by func
    Example: df1=mpPandasObj(func, ('molecule', df0.index), 24, **kargs)
    '''
    import pandas as pd
    if linMols:
        parts = linParts(len(pdObj[1]), numThreads*mpBatches)
    else:
        parts = nestedParts(len(pdObj[1]), numThreads*mpBatches)
    jobs = []
    for i in range(1, len(parts)):
        job = {pdObj[0]: pdObj[1][parts[i-1]:parts[i]], 'func': func}
        job.update(kargs)
        jobs.append(job)
    if numThreads==1:
        out = processJobs_(jobs)
    else:
        out = processJobs(jobs, numThreads=numThreads)
    if isinstance(out[0], pd.DataFrame):
        df0 = pd.DataFrame()
    elif isinstance(out[0], pd.Series):
        df0 = pd.Series()
    else:
        return out
    for i in out:
        df0 = df0.append(i)
    df0 = df0.sort_index()
    return df0

In [149]:
import pandas as pd
r, numThreads = np.random.normal(0, 0.01, size=(1000, 10000)), mp.cpu_count()
r_df = pd.DataFrame(r)
r_df

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,9990,9991,9992,9993,9994,9995,9996,9997,9998,9999
0,-0.005903,-0.000240,-0.002719,0.018674,0.000468,-0.007335,0.020087,-0.008969,0.005708,0.007534,...,0.013227,-0.013003,0.019447,-0.005109,0.003363,-0.001806,-0.003766,0.006356,-0.007618,0.000072
1,-0.001566,0.012316,-0.011838,0.006854,0.012018,0.022015,0.015066,0.012578,-0.004078,0.012986,...,0.011143,0.024651,-0.002400,0.006477,-0.005514,0.004693,-0.001387,-0.008130,-0.005142,0.004398
2,0.001624,-0.009201,0.013672,0.005975,-0.008323,-0.004871,0.004385,0.013819,-0.013569,0.000566,...,-0.006085,0.009932,0.019720,0.004600,0.000237,-0.014965,0.007566,0.008936,0.010396,0.003925
3,-0.001594,0.019432,-0.000668,0.001886,-0.012902,0.003088,-0.001451,0.004720,0.000026,0.003313,...,-0.004436,0.005891,0.006981,-0.004615,0.029167,-0.018055,-0.004879,0.001245,-0.009374,-0.001815
4,0.005434,0.008022,0.007291,-0.002785,-0.024075,-0.005332,0.005052,0.000670,-0.002520,0.025439,...,0.008038,-0.000322,0.012288,-0.005940,-0.003783,0.000959,-0.005319,0.005022,0.016415,-0.007710
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,0.011379,-0.004405,-0.008386,0.011844,0.018529,0.001451,0.004766,-0.003885,0.008601,0.017487,...,0.009275,0.014715,-0.013639,-0.013474,-0.008285,-0.014599,-0.001153,-0.008538,-0.011991,0.006492
996,0.014737,-0.014939,0.011768,-0.009393,0.003819,-0.001524,0.006797,0.015416,0.006207,0.006994,...,-0.002951,-0.013691,-0.000990,0.002976,-0.021102,0.001462,0.015271,0.000792,-0.000379,-0.025849
997,0.000794,0.013066,0.007464,0.008197,-0.010781,-0.010902,-0.004268,0.001316,0.001496,-0.009059,...,-0.003662,-0.002409,0.006002,-0.001682,0.004280,0.006081,0.003705,0.016735,0.013714,0.010441
998,0.005519,0.002432,-0.002651,0.001258,-0.002647,0.022158,-0.002740,-0.007805,-0.019501,0.011441,...,0.017623,0.002649,0.002454,-0.012575,-0.009026,0.004086,-0.004467,-0.003217,0.004202,0.006013


In [None]:
import numpy as np
import multiprocessing as mp
def main1():
    r, numThreads = np.random.normal(0, 0.01, size=(1000, 10000)), mp.cpu_count()
    parts = np.linspace(0, r.shape[0], min(numThreads, r.shape[0])+1)
    parts, jobs = np.ceil(parts).astype(int), []
    for i in range(1, len(parts)):
        jobs.append(r[:,parts[i-1]:parts[i]]) # parallel jobs
    pool, out = mp.Pool(processes=numThreads), []
    outputs = pool.imap_unordered(barrierTouch, jobs)
    for out_ in outputs:
        out.append(out_)
    pool.close()
    pool.join()
    return out

if __name__=='__main__':
    import timeit
    print(min(timeit.Timer('main1()', setup='from __main__ import main1').repeat(5,10)))