In [4]:
### simple multiprocessing with pure function

from multiprocessing import Pool, cpu_count


def function_doing_heavy_computations(complexity):
    a_sum = 0
    for i in range(complexity):
        a_list = [x * x * x for x in range(1, complexity)]
        a_sum += sum(a_list) % 97
    
    return a_sum % 97


def start_multiprocessing_calculations(complexity_value, amount_of_tasks):
    values_to_apply_a_func = [complexity_value + x for x in range(amount_of_tasks)]

    number_of_cores_to_use = cpu_count()
    with Pool(processes=number_of_cores_to_use) as pool:
        # the pool.map does not change the order of returned values
        # but calculation of values is not ordered
        results_of_computation = pool.map(function_doing_heavy_computations, values_to_apply_a_func)

    print(f'Computed {len(results_of_computation)} values using {number_of_cores_to_use} cores')
    
# %time function_doing_heavy_computations(1000)
# %time start_multiprocessing_calculations(3000, 100)

In [2]:
%time function_doing_heavy_computations(1000)

CPU times: user 121 ms, sys: 329 µs, total: 122 ms
Wall time: 120 ms


19

In [3]:
%time start_multiprocessing_calculations(3000, 100)

Computed 100 values using 8 cores
CPU times: user 97.3 ms, sys: 27.7 ms, total: 125 ms
Wall time: 26.6 s


In [5]:
### multiprocessing for a DataFrame

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
pd.set_option('max_colwidth', None)


df = pd.DataFrame(np.random.randint(11111,99999,size=(10000, 4)), columns=list('ABCD'))
print(f"original shape: {df.shape}")
df.head(3)

original shape: (10000, 4)


Unnamed: 0,A,B,C,D
0,32102,89647,89131,26597
1,32756,91016,41672,17804
2,47817,17118,70317,31225


In [6]:
def function_to_calculate_column(a_row):
    a = a_row['A']
    b = a_row['B']
    c = a_row['C']
    d = a_row['D']
    
    a_list = [(c * d * x) % 97 for x in range(min(a,b), max(a,b))]
    
    return sum(a_list) % 97

In [10]:
%time temp = df.head(500).apply(function_to_calculate_column, axis=1)

len(temp)

CPU times: user 6.07 s, sys: 0 ns, total: 6.07 s
Wall time: 6.07 s


500

In [None]:
number_of_cores_to_use = cpu_count()
with Pool(processes=number_of_cores_to_use) as pool:
    # the pool.map does not change the order of returned values
    # but calculation of values is not ordered
    temp2 = pool.map(function_to_calculate_column, df.head(500))


len(temp2)

Process ForkPoolWorker-14:
Process ForkPoolWorker-9:
Process ForkPoolWorker-16:
Process ForkPoolWorker-10:
Process ForkPoolWorker-11:
Process ForkPoolWorker-13:
Process ForkPoolWorker-15:
Process ForkPoolWorker-12:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/conda/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/opt/conda/lib/python3.9/multiprocessing/queues.py", line 366, in get
    res = self._reader.recv_bytes()
  File "/opt/conda/lib/python3.9/multiprocessing/connection.py", line 221, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/opt/conda/lib/python3.9/multiprocessing/connection.py", line 419, in _recv_bytes
    buf = self._recv(4)
  File "/opt/conda/lib/python3.9/multiprocessing/connection.py", line 384