In [41]:
%load_ext autotime
import pandas as pd
import numpy as np
import concurrent.futures

The autotime extension is already loaded. To reload it, use:
  %reload_ext autotime
time: 92.5 ms


# Example data

We'll use s synthetic population file for simple parallel examples; read the data in using `pandas`.

In [62]:
population = pd.read_csv('dat/2005_2009_ver2_42065_synth_people.txt')

time: 121 ms


In [63]:
# Just to show the contents of the dataframe
population.head()

Unnamed: 0,p_id,hh_id,serialno,stcotrbg,age,sex,race,sporder,relate,school_id,workplace_id
0,416175660,261526469,2005000002176,420659503002,86,2,1,1,0,,
1,416175661,261526469,2005000002176,420659503002,92,1,1,2,1,,
2,416175676,261533970,2005000002176,420659508002,86,2,1,1,0,,
3,416175677,261533970,2005000002176,420659508002,92,1,1,2,1,,
4,416175678,261526897,2005000002176,420659503003,86,2,1,1,0,,


time: 22.2 ms


# Task: randomly sample the population and summarize the age distribution

This is an example function to execute on the population dataframe using both threading and multiprocessing.

In [44]:
def get_mean_age(sample_size):
    #print(id(population))
    #print('Sample number %d' % n)
    return population.loc[np.random.randint(0, len(population), sample_size), 'age'].mean()

time: 12.5 ms


In [45]:
# Here we use a process pool and the map function
def test_process_pool(num_samples, sample_size):
    with concurrent.futures.ProcessPoolExecutor() as executor:
        samples = list(range(num_samples))
        for sample_number, max_age in zip(
            samples,executor.map(get_mean_age, [sample_size]*num_samples)):
            pass

time: 14.5 ms


In [46]:
# Note the consistent syntax
def test_threading_pool(num_samples, sample_size):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        samples = list(range(num_samples))
        for sample_number, max_age in zip(
            samples,executor.map(get_mean_age, [sample_size]*num_samples)):
            pass

time: 8.62 ms


## Testing performance of the process pool compared to threading

Note that there is a significant amount of overhead for the process pool relative to the threading pool

In [47]:
test_process_pool(10,10)

time: 149 ms


In [48]:
test_threading_pool(10,10)

time: 30.5 ms


But when the amount of work increases the independent execution in the process pool wins out

In [36]:
test_process_pool(100,1000)

time: 1.79 s


In [35]:
test_threading_pool(100,1000)

time: 4.92 s


# Example: apply a parallel function to a pandas data frame

Pandas `groupby` can be used to provide the chunks to operate on in parallel.

This can work especially well with the threading backend if the function to be applied is a compiled extension that releases the gil

The following example uses `submit` rather than `map` to illustrate this more flexible way to execute functions in parallel.  `submit` allows straightforward specification of other arguments to the target function.

In [50]:
def summarize_group(group_df):
    return group_df.age.mean()

group_futures = {}
with concurrent.futures.ProcessPoolExecutor() as executor:
    for group, group_df in population.groupby('race'):
        group_futures[group] = executor.submit(summarize_group, group_df)
    

time: 139 ms


## Accessing results

When work is submitted to the executor, a futures object is returned.  Calling the `results()` method on this futures object will block until exection has completed.

In [53]:
for group, future in group_futures.items():
    print(group, future.result())

1 40.9084934277
2 22.7407407407
3 47.2857142857
5 27.0
6 47.3305084746
7 21.0
8 34.9393939394
9 28.9795081967
time: 5.3 ms


## Alternative presentation of results as pandas Series

In [61]:
pd.Series({g:f.result() for g,f in group_futures.items()},name='average_age')

1    40.908493
2    22.740741
3    47.285714
5    27.000000
6    47.330508
7    21.000000
8    34.939394
9    28.979508
Name: average_age, dtype: float64

time: 8.66 ms


Next: [Simple parallel R example](Simple parallel R example.ipynb)