<a href="https://colab.research.google.com/github/ahmadabousetta/multiprocesspandas/blob/main/Python_multitasking.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
from os import cpu_count
from tqdm.notebook import tqdm, trange
import requests
import numpy as np
import pandas as pd

In [None]:
cpu_count()

2

In [None]:
def f1(x):
    return sum(range(500_000)) + x

In [None]:
n = 5_000

In [None]:
%%time
a = list(map(f1, trange(n)))
a[:5]

  0%|          | 0/5000 [00:00<?, ?it/s]

CPU times: user 1min 13s, sys: 368 ms, total: 1min 13s
Wall time: 1min 15s


[124999750000, 124999750001, 124999750002, 124999750003, 124999750004]

In [None]:
%%time

# p.map returns a list. We can't follow progress with tqdm.
with Pool() as p:
    a = p.map(f1, trange(n))
a[:5]

  0%|          | 0/5000 [00:00<?, ?it/s]

CPU times: user 411 ms, sys: 79.2 ms, total: 490 ms
Wall time: 1min 16s


[124999750000, 124999750001, 124999750002, 124999750003, 124999750004]

In [None]:
%%time

# p.imap returns an iterator. We can follow progress with tqdm.
with Pool() as p:
    a = list(tqdm(p.imap(f1, trange(n))))
a[: 5]

  0%|          | 0/5000 [00:00<?, ?it/s]

0it [00:00, ?it/s]

CPU times: user 13.6 s, sys: 1.01 s, total: 14.6 s
Wall time: 1min 26s


[124999750000, 124999750001, 124999750002, 124999750003, 124999750004]

In [None]:
%%time
with ProcessPoolExecutor() as executor:
    a = list(tqdm(executor.map(f1, trange(n))))
a[: 5]

  0%|          | 0/5000 [00:00<?, ?it/s]

0it [00:00, ?it/s]

CPU times: user 3.42 s, sys: 405 ms, total: 3.83 s
Wall time: 1min 19s


[124999750000, 124999750001, 124999750002, 124999750003, 124999750004]

In [None]:
# Why use iterators better than lists?

def f2(x):
    sum(range(1000_000))
    return x==n//2

In [None]:
%%time
any(map(f2, trange(n)))

  0%|          | 0/5000 [00:00<?, ?it/s]

CPU times: user 1min 11s, sys: 343 ms, total: 1min 12s
Wall time: 1min 15s


True

In [None]:
%%time
with Pool() as p:
    result = any(tqdm(p.map(f2, trange(n))))
result

  0%|          | 0/5000 [00:00<?, ?it/s]

  0%|          | 0/5000 [00:00<?, ?it/s]

CPU times: user 741 ms, sys: 110 ms, total: 851 ms
Wall time: 2min 23s


True

In [None]:
%%time
with Pool() as p:
    result = any(tqdm(p.imap(f2, trange(n))))
result

  0%|          | 0/5000 [00:00<?, ?it/s]

0it [00:00, ?it/s]

CPU times: user 7.4 s, sys: 579 ms, total: 7.98 s
Wall time: 1min 19s


True

In [None]:
%%time
with ProcessPoolExecutor() as executor:
    result = any(tqdm(executor.map(f2, trange(n))))
result

  0%|          | 0/5000 [00:00<?, ?it/s]

0it [00:00, ?it/s]

CPU times: user 3.75 s, sys: 376 ms, total: 4.13 s
Wall time: 2min 29s


True

Now, we start comparing multithreads.

In [None]:
%%time

# Threads are not suitable for cpu bound tasks.
with ThreadPool() as p:
    result = any(tqdm(p.map(f2, trange(n))))
result

  0%|          | 0/5000 [00:00<?, ?it/s]

  0%|          | 0/5000 [00:00<?, ?it/s]

CPU times: user 2min 20s, sys: 520 ms, total: 2min 20s
Wall time: 2min 29s


True

In [None]:
# Threads are more suitable for IO bound tasks.
# Actually, we should use the more efficient asyncio. But threads are much easier to use.

def get_joke(i=0):
    url = f'https://api.chucknorris.io/jokes/random?category=dev'
    response = requests.get(url)
    return response.json()['value']

get_joke()

'Scientists have estimated that the energy given off during the Big Bang is roughly equal to 1CNRhK (Chuck Norris Roundhouse Kick).'

In [None]:
%%time
jokes = list(tqdm(map(get_joke, trange(100))))
jokes[: 5]

  0%|          | 0/100 [00:00<?, ?it/s]

0it [00:00, ?it/s]

CPU times: user 2.22 s, sys: 124 ms, total: 2.34 s
Wall time: 33.7 s


['Chuck Norris does not code in cycles, he codes in strikes.',
 'Chuck Norris can write infinite recursion functions and have them return.',
 "Chuck Norris doesn't need a debugger, he just stares down the bug until the code confesses.",
 "No one has ever spoken during review of Chuck Norris' code and lived to tell about it.",
 'Chuck Norris can solve the Towers of Hanoi in one move.']

In [None]:
%%time
with ThreadPool() as p:
    jokes = list(tqdm(p.imap(get_joke, trange(100))))
jokes[: 5]

  0%|          | 0/100 [00:00<?, ?it/s]

0it [00:00, ?it/s]

CPU times: user 1.9 s, sys: 94.1 ms, total: 1.99 s
Wall time: 12.8 s


['Chuck Norris burst the dot com bubble.',
 "Chuck Norris can't test for equality because he has no equal.",
 'Chuck Norris can overflow your stack just by looking at it.',
 'Chuck Norris can retrieve anything from /dev/null.',
 "Chuck Norris doesn't need an OS."]

In [None]:
%%time
with ThreadPool(10 * cpu_count()) as p:
    jokes = list(tqdm(p.imap(get_joke, trange(100))))
jokes[: 5]

  0%|          | 0/100 [00:00<?, ?it/s]

0it [00:00, ?it/s]

CPU times: user 2.29 s, sys: 124 ms, total: 2.41 s
Wall time: 2.39 s


["A diff between your code and Chuck Norris's is infinite.",
 "Chuck Norris doesn't need an OS.",
 "Chuck Norris doesn't need to use AJAX because pages are too afraid to postback anyways.",
 "Chuck Norris' programs never exit, they terminate.",
 "Chuck Norris doesn't need an OS."]

In [None]:
%%time
with ThreadPool(20 * cpu_count()) as p:
    jokes = list(tqdm(p.imap(get_joke, trange(1000))))
jokes[: 5]

  0%|          | 0/1000 [00:00<?, ?it/s]

0it [00:00, ?it/s]

CPU times: user 23.9 s, sys: 1.22 s, total: 25.2 s
Wall time: 16.9 s


["Chuck Norris doesn't use Oracle, he is the Oracle.",
 "Chuck Norris's log statements are always at the FATAL level.",
 "Chuck Norris' Internet connection is faster upstream than downstream because even data has more incentive to run from him than to him.",
 'Chuck Norris breaks RSA 128-bit encrypted codes in milliseconds.',
 "A diff between your code and Chuck Norris's is infinite."]

In [None]:
%%time
with ThreadPoolExecutor() as executor:
    jokes = list(tqdm(executor.map(get_joke, trange(1000))))
jokes[: 5]

  0%|          | 0/1000 [00:00<?, ?it/s]

0it [00:00, ?it/s]

CPU times: user 19.9 s, sys: 950 ms, total: 20.8 s
Wall time: 43.3 s


["When Chuck Norris throws exceptions, it's across the room.",
 "There is no Esc key on Chuck Norris' keyboard, because no one escapes Chuck Norris.",
 'Chuck Norris can retrieve anything from /dev/null.',
 'Chuck Norris hosting is 101% uptime guaranteed.',
 'Everybody thinks the Galaxy Note 7 is explosive. In fact it is only Chuck Norris who tries to send a WhatsApp message with a selfie to his fans.']

In [None]:
%%time
with ThreadPoolExecutor(20 * cpu_count()) as executor:
    jokes = list(tqdm(executor.map(get_joke, trange(1000))))
jokes[: 5]

  0%|          | 0/1000 [00:00<?, ?it/s]

0it [00:00, ?it/s]

CPU times: user 23.6 s, sys: 1.31 s, total: 24.9 s
Wall time: 15.3 s


['Chuck Norris hosting is 101% uptime guaranteed.',
 'Chuck Norris programs occupy 150% of CPU, even when they are not executing.',
 "Chuck Norris doesn't use GUI, he prefers COMMAND line.",
 'Chuck Norris is currently suing myspace for taking the name of what he calls everything around you.',
 'Chuck Norris is the ultimate mutex, all threads fear him.']

In [None]:
%%time

# NOTICE THE HIGH MEMORY USAGE ASSOCIATED WITH HIGH NUMBER OF PROCESSES
with Pool(200) as p:
    jokes = list(tqdm(p.imap(get_joke, trange(1000))))
jokes[: 5]

  0%|          | 0/1000 [00:00<?, ?it/s]

0it [00:01, ?it/s]

CPU times: user 879 ms, sys: 2.5 s, total: 3.38 s
Wall time: 30.4 s


['Chuck Norris can retrieve anything from /dev/null.',
 'Chuck Norris can delete the Recycling Bin.',
 'Chuck Norris compresses his files by doing a flying round house kick to the hard drive.',
 'Chuck Norris can instantiate an abstract class.',
 'Chuck Norris can write multi-threaded applications with a single thread.']

In [None]:
# %%time

# # THIS WILL FREEZE PYTHON DUE TO HIGH MEMORY CONSUMPTION
# with Pool(1000) as p:
#     jokes = p.map(get_joke, trange(1000))
# jokes[: 5]

Let's show a Pandas example.

In [None]:
rng = np.random.default_rng(seed=42)
rng

Generator(PCG64) at 0x7FDB5644D7E0

In [None]:
df = pd.DataFrame({'A': rng.integers(0, 100, 1000), 'B': rng.integers(0, 100, 1000)})
df.head()

Unnamed: 0,A,B
0,63,77
1,58,15
2,25,2
3,97,49
4,85,20


In [None]:
%%time
df.query("A==B")

CPU times: user 5.15 ms, sys: 4 µs, total: 5.16 ms
Wall time: 6.86 ms


Unnamed: 0,A,B
88,35,35
218,40,40
244,48,48
270,73,73
278,81,81
282,55,55
362,54,54
548,56,56
550,87,87
744,2,2


In [None]:
%%time
df[df.A==df.B]

CPU times: user 1.93 ms, sys: 0 ns, total: 1.93 ms
Wall time: 1.95 ms


Unnamed: 0,A,B
88,35,35
218,40,40
244,48,48
270,73,73
278,81,81
282,55,55
362,54,54
548,56,56
550,87,87
744,2,2


In [None]:
%%time
df['jokes'] = df.A.apply(get_joke)

CPU times: user 17.7 s, sys: 894 ms, total: 18.6 s
Wall time: 3min 52s


In [None]:
%%time
with ThreadPool(10*cpu_count()) as p:
    df['jokes'] = list(tqdm(p.imap(get_joke, tqdm(df['A']))))
df.head()

  0%|          | 0/1000 [00:00<?, ?it/s]

0it [00:00, ?it/s]

CPU times: user 22.2 s, sys: 1.1 s, total: 23.3 s
Wall time: 18.5 s


Unnamed: 0,A,B,jokes
0,63,77,With Chuck Norris P = NP. There's no nondeterm...
1,58,15,Chuck Norris's first program was kill -9.
2,25,2,Project managers never ask Chuck Norris for es...
3,97,49,Chuck Norris doesn't pair program.
4,85,20,"For Chuck Norris, NP-Hard = O(1)."


In [None]:
%%time
with ThreadPool(10*cpu_count()) as p:
    df['jokes'] = p.map(get_joke, df['A'])
df.head()

CPU times: user 22.2 s, sys: 1.1 s, total: 23.3 s
Wall time: 18.8 s


Unnamed: 0,A,B,jokes
0,63,77,"MySpace actually isn't your space, it's Chuck'..."
1,58,15,Chuck Norris can retrieve anything from /dev/n...
2,25,2,Chuck Norris can't test for equality because h...
3,97,49,"""It works on my machine"" always holds true for..."
4,85,20,"Chuck Norris doesn't use GUI, he prefers COMMA..."
