In [1]:
# multiprocessing


import multiprocessing as mp

In [2]:
def square(x):
    return x**2

In [3]:
data=[i for i in range(100)]

In [4]:
%time
seq=[square(e) for e in data]

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 5.01 µs


In [5]:
pool=mp.Pool()

In [6]:
%time
res=pool.map(square, data)
pool.terminate()
pool.join()

CPU times: user 3 µs, sys: 1 µs, total: 4 µs
Wall time: 5.96 µs


In [7]:
print ('Numero de procesadores: ', mp.cpu_count())

Numero de procesadores:  12


In [8]:
import numpy as np
array=np.random.randint(0,10, size=(20000,5))
data=array.tolist()
data[:5]

[[4, 1, 6, 1, 4],
 [6, 1, 9, 2, 7],
 [1, 1, 7, 9, 0],
 [5, 1, 7, 4, 3],
 [3, 8, 0, 6, 9]]

In [9]:
def rango(row, minimo, maximo):
    cuenta=0
    for e in row:
        if minimo<= e <= maximo:
            cuenta+=1
    return cuenta

In [13]:
%time
res=[rango(e, 4, 8) for e in data]
res[:10]

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 4.77 µs


[3, 2, 1, 3, 2, 3, 3, 3, 3, 5]

In [11]:
pool=mp.Pool(mp.cpu_count())

In [12]:
%time
res=pool.starmap(rango, [(e, 4, 8) for e in data])
pool.close()

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 5.01 µs


In [14]:
# asincrono
def rango2(i,row, minimo, maximo):
    cuenta=0
    for e in row:
        if minimo<= e <= maximo:
            cuenta+=1
    return i,cuenta

In [15]:
res=[]
def collect(result):
    global res
    res.append(result)

In [16]:
pool=mp.Pool(mp.cpu_count())

In [17]:
%time
for i,e in enumerate(data):
    pool.apply_async(rango2, args=(i,e,4,8), callback=collect)
pool.close()
pool.join()

CPU times: user 4 µs, sys: 4 µs, total: 8 µs
Wall time: 11.9 µs


In [19]:
res[:5]

[(0, 3), (1, 2), (2, 1), (3, 3), (4, 2)]

In [20]:
final_res=[e for i,e in res]
final_res[:5]

[3, 2, 1, 3, 2]

In [23]:
# sin callback

pool=mp.Pool(mp.cpu_count())
res_obj=[pool.apply_async(rango2, args=(i,e,4,8)) for i,e in enumerate (data)]

In [28]:
res=[e.get()[1] for e in res_obj]
res[:5]

[3, 2, 1, 3, 2]

In [30]:
pool.close()

In [31]:
pool=mp.Pool(mp.cpu_count())
res=pool.starmap_async(rango2, [(i,e,4,8) for i,e in enumerate(data)]).get()

In [35]:
print (res[:5])
pool.close()

[(0, 3), (1, 2), (2, 1), (3, 3), (4, 2)]


In [36]:
import pandas as pd

In [65]:
df=pd.DataFrame(np.random.randint(3,10, size=(50,4)))
df.head()

Unnamed: 0,0,1,2,3
0,4,8,6,5
1,9,8,3,3
2,7,6,3,7
3,4,5,7,5
4,6,8,8,5


In [39]:
def hipotenusa(x):
    return round(x[1]**2+x[2]**2, 2)**0.5

In [58]:
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.itertuples.html
# https://www.reddit.com/r/learnpython/comments/7zr7cb/multiprocessingpool_interaction_between_chunksize/

# por filas
%time
with mp.Pool(mp.cpu_count()) as pool:
    res=pool.imap(hipotenusa, df.itertuples(name=None), chunksize=10)
    out=[round(e,2) for e in res]
out[:5]

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 5.25 µs


[9.22, 6.4, 10.0, 6.4, 12.73]

In [59]:
# por columnas

def cuadrados(x):
    return sum([e**2 for e in x[1]])

In [60]:
%time
with mp.Pool(mp.cpu_count()) as pool:
    res=pool.imap(cuadrados, df.iteritems(), chunksize=10)
    out=[round(e,2) for e in res]
out[:5]

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 6.2 µs


[1859, 2047]

In [73]:
%time
df2=df.T.copy()
with mp.Pool(mp.cpu_count()) as pool:
    res=pool.imap(cuadrados, df.itertuples(name=None), chunksize=10)
    #out=[round(e,2) for e in res]
res

CPU times: user 2 µs, sys: 1e+03 ns, total: 3 µs
Wall time: 6.2 µs


<generator object Pool.imap.<locals>.<genexpr> at 0x118cfdd50>

In [74]:
# Pathos

from pathos.multiprocessing import ProcessingPool as Pool

In [75]:
def func(df):
    return df.shape

In [77]:
cores=mp.cpu_count()
df_split=np.array_split(df, cores, axis=0)

In [78]:
pool=Pool(cores)

In [79]:
%time
df_out=pd.DataFrame(np.vstack(pool.map(func, df_split)))
pool.close()
pool.join()
pool.clear()
df_out.head()

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 6.2 µs


Unnamed: 0,0,1
0,5,4
1,5,4
2,4,4
3,4,4
4,4,4
