\[<< [Testing](./09_unittest.ipynb) | [Index](./00_index.ipynb) | [Virtual Environments](./11_virtual_environment.ipynb) >>\]


# Concurrency

- **Processes and threads**: A program can have one or more processes and a process can have one or more threads. A process is an instance of a program while a thread is a part of a process that can execute instructions.
- **Multiprocessing and multithreading**: Multiprocessing uses multiple cores of a CPU to execute multiple processes in parallel. Multithreading uses multiple threads within a process to perform tasks concurrently.
- **I/O-bound and CPU-bound tasks**: I/O-bound tasks spend more time doing input/output operations than computations (network request, disk read/write, database read/write). CPU-bound tasks spend more time doing calculations than generating I/O requests (video compression, matrix multiplication, find prime number).
- **Suitability of multithreading and multiprocessing**: Multithreading is suitable for I/O-bound tasks, and multiprocessing is suitable for CPU-bound tasks.

![](./static/cpu_vs_io.png)

In [10]:
%pip install viztracer
%pip install requests

Note: you may need to restart the kernel to use updated packages.
Collecting requests
  Downloading requests-2.31.0-py3-none-any.whl.metadata (4.6 kB)
Collecting charset-normalizer<4,>=2 (from requests)
  Downloading charset_normalizer-3.3.2-cp311-cp311-win_amd64.whl.metadata (34 kB)
Collecting idna<4,>=2.5 (from requests)
  Downloading idna-3.6-py3-none-any.whl.metadata (9.9 kB)
Collecting urllib3<3,>=1.21.1 (from requests)
  Downloading urllib3-2.1.0-py3-none-any.whl.metadata (6.4 kB)
Collecting certifi>=2017.4.17 (from requests)
  Downloading certifi-2023.11.17-py3-none-any.whl.metadata (2.2 kB)
Downloading requests-2.31.0-py3-none-any.whl (62 kB)
   ---------------------------------------- 0.0/62.6 kB ? eta -:--:--
   ---------------------------------------- 62.6/62.6 kB 1.7 MB/s eta 0:00:00
Downloading certifi-2023.11.17-py3-none-any.whl (162 kB)
   ---------------------------------------- 0.0/162.5 kB ? eta -:--:--
   ---------------------------------------- 162.5/162.5 kB 4.9 MB

# threading

### Sequential Download of File
In a sequential approach, tasks are executed one after another. Consider downloading multiple files sequentially:

In [6]:
%load_ext viztracer

In [7]:
%%writefile example/downloader.py
import shutil
from pathlib import Path
import requests
import os


proxies = {
  'http': '',
  'https': '',
}

def download_file(url: str, target: Path):
    fname = os.path.basename(url)
    path = target / fname
    
    with requests.get(url, stream=True, proxies=proxies) as r:
        print(f"Downloading {fname}...")
        with open(path, 'wb') as f:
            shutil.copyfileobj(r.raw, f)
    
    print(f"Download complete for {fname}!")

Overwriting example/downloader.py


In [8]:
%%writefile example/single_core_single_process.py
from downloader import download_file
from pathlib import Path
import shutil

if __name__ == "__main__":
    itr_forms = [
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr1_english.pdf",
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr2_english.pdf",
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr3_english.pdf",
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr4_english.pdf",
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr5_english.pdf",
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr6_english.pdf",
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr7_english.pdf",
    ]

    target_path = Path("static") / "concurrency_example"
    shutil.rmtree(target_path, ignore_errors=True)
    target_path.mkdir(exist_ok=True, parents=True)

    for itr_form in itr_forms:
        download_file(itr_form, target_path)

    print("All downloads completed!")

Overwriting example/single_core_single_process.py


In [11]:
!viztracer example/single_core_single_process.py

Downloading itr1_english.pdf...
Download complete for itr1_english.pdf!
Downloading itr2_english.pdf...
Download complete for itr2_english.pdf!
Downloading itr3_english.pdf...
Download complete for itr3_english.pdf!
Downloading itr4_english.pdf...
Download complete for itr4_english.pdf!
Downloading itr5_english.pdf...
Download complete for itr5_english.pdf!
Downloading itr6_english.pdf...
Download complete for itr6_english.pdf!
Downloading itr7_english.pdf...
Download complete for itr7_english.pdf!
All downloads completed!

                                                                                
Saving trace data, this could take a while
                                                                                
Loading trace data from processes 0/1
                                                                                
Combining trace data
                                                                                
Dumping trace data, total entries: 153212
  

This method is simple but may lead to slower execution when dealing with multiple files, especially in I/O-bound scenarios.

### Download of File Using Threads
To improve performance, threading can be employed. Threads execute tasks concurrently, ideal for I/O-bound operations like downloading files:

In [8]:
%%writefile example/single_core_multi_thread.py
import threading
from downloader import download_file
from pathlib import Path
import shutil

if __name__ == "__main__":
    itr_forms = [
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr1_english.pdf",
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr2_english.pdf",
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr3_english.pdf",
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr4_english.pdf",
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr5_english.pdf",
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr6_english.pdf",
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr7_english.pdf",
    ]

    target_path = Path("static") / "concurrency_example"
    shutil.rmtree(target_path, ignore_errors=True)
    target_path.mkdir(exist_ok=True, parents=True)

    threads = []
    
    for itr_form in itr_forms:
        my_thread = threading.Thread(target=download_file, args=(itr_form, target_path))
        my_thread.start()
        threads.append(my_thread)
    
    # Wait for all threads to complete
    for thread in threads:
        thread.join()

    print("All downloads completed!")

Writing example/single_core_multi_thread.py


In [9]:
!viztracer example/single_core_multi_thread.py

Downloading itr3_english.pdf...
Download complete for itr3_english.pdf!
Downloading itr6_english.pdf...
Downloading itr5_english.pdf...
Downloading itr4_english.pdf...
Download complete for itr6_english.pdf!
Downloading itr2_english.pdf...
Download complete for itr5_english.pdf!Downloading itr7_english.pdf...
Download complete for itr4_english.pdf!
Downloading itr1_english.pdf...

Download complete for itr2_english.pdf!
Download complete for itr1_english.pdf!Download complete for itr7_english.pdf!

All downloads completed!

                                                                                
Saving trace data, this could take a while
                                                                                
Loading trace data from processes 0/1
                                                                                
Combining trace data
                                                                                
Dumping trace data, total entries: 153730
  

💡 Use logging module to log output in threads as they are thread-safe

## threading.Thread

Python's `threading.Thread` class simplifies thread management. It encapsulates a function to be executed in a separate thread:

In [10]:
%%writefile example/single_core_multi_thread_class_imp.py
import threading
import shutil
from pathlib import Path
import requests
import os

proxies = {
  'http': '',
  'https': '',
}

class DownloaderThread(threading.Thread):
    def __init__(self, url, target):
        super().__init__()
        self.url = url
        self.target = target

    def run(self):
        print(f"Starting {self.name}")

        fname = os.path.basename(self.url)
        path = self.target / fname


        with requests.get(self.url, stream=True, proxies=proxies) as r:
            print(f"Downloading {fname}...")
            with open(path, 'wb') as f:
                shutil.copyfileobj(r.raw, f)
    
        print(f"Download complete for {fname}!")
        print(f"Completing {self.name}")

if __name__ == "__main__":
    itr_forms = [
        "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr1_english.pdf",
        "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr2_english.pdf",
        "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr3_english.pdf",
        "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr4_english.pdf",
        "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr5_english.pdf",
        "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr6_english.pdf",
        "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr7_english.pdf",
    ]

    target_path = Path("static") / "concurrency_example"
    shutil.rmtree(target_path, ignore_errors=True)
    target_path.mkdir()

    threads = []
    for itr_form in itr_forms:
        my_thread = DownloaderThread(itr_form, target_path)
        my_thread.start()
        threads.append(my_thread)

    # Wait for all threads to complete
    for thread in threads:
        thread.join()

    print("All downloads completed!")

Writing example/single_core_multi_thread_class_imp.py


In [11]:
!viztracer example/single_core_multi_thread_class_imp.py

Starting Thread-1
Starting Thread-2
Starting Thread-3
Starting Thread-4
Starting Thread-5
Starting Thread-6
Starting Thread-7
Downloading itr3_english.pdf...
Download complete for itr3_english.pdf!
Completing Thread-3
Downloading itr2_english.pdf...
Download complete for itr2_english.pdf!
Completing Thread-2
Downloading itr5_english.pdf...Downloading itr4_english.pdf...

Download complete for itr5_english.pdf!
Completing Thread-5
Download complete for itr4_english.pdf!
Completing Thread-4
Downloading itr6_english.pdf...
Download complete for itr6_english.pdf!
Completing Thread-6
Downloading itr7_english.pdf...Downloading itr1_english.pdf...

Download complete for itr1_english.pdf!
Completing Thread-1Download complete for itr7_english.pdf!

Completing Thread-7
All downloads completed!

                                                                                
Saving trace data, this could take a while
                                                                                

## Threads are not good for CPU bound task

In [12]:
%%writefile example/single_core_multi_thread_cpu_bound.py
import threading
import random

def is_prime(n):
    for i in range(2, n):
        if n % i == 0:
            return False
    return True

def get_prime_arr(arr):
    return [is_prime(elem) for elem in arr]

num_arr = [random.randint(100, 10000) for i in range(3000)]
thread1 = threading.Thread(target=get_prime_arr, args=(num_arr[:1001],))
thread2 = threading.Thread(target=get_prime_arr, args=(num_arr[1001:2001],))
thread3 = threading.Thread(target=get_prime_arr, args=(num_arr[2001:3000],))

thread1.start()
thread2.start()
thread3.start()

thread1.join()
thread2.join()
thread3.join()

Writing example/single_core_multi_thread_cpu_bound.py


In [13]:
!viztracer example/single_core_multi_thread_cpu_bound.py


                                                                                
Saving trace data, this could take a while
                                                                                
Loading trace data from processes 0/1
                                                                                
Combining trace data
                                                                                
Dumping trace data, total entries: 29141
                                                                                
Total Entries: 29141
Use the following command to open the report:
[92mvizviewer "c:\Users\bhmiller\OneDrive - Intel Corporation\Documents\GitHub\intermediate-python\content\result.json"[0m


## threading.Lock

Threads share memory space, raising synchronization concerns. The `threading.Lock` class ensures only one thread accesses a resource at a time:

In [19]:
from threading import Thread
from time import sleep

counter = 0


def increase(by):
    global counter

    # Read the current value of the counter
    local_counter = counter

    # Perform the increment
    local_counter += by

    # Simulate some processing time (0.1 seconds)
    sleep(0.1)

    # Update the global counter with the new value
    counter = local_counter
    print(f"counter={counter}")


# Create threads
t1 = Thread(target=increase, args=(10,))
t2 = Thread(target=increase, args=(20,))

# Start the threads
t1.start()
t2.start()

# Wait for the threads to complete
t1.join()
t2.join()

print(f"The final counter is {counter}")

counter=10
counter=20
The final counter is 20


In [20]:
import dis

dis.dis(increase)

  7           0 RESUME                   0

 11           2 LOAD_GLOBAL              0 (counter)
             14 STORE_FAST               1 (local_counter)

 14          16 LOAD_FAST                1 (local_counter)
             18 LOAD_FAST                0 (by)
             20 BINARY_OP               13 (+=)
             24 STORE_FAST               1 (local_counter)

 17          26 LOAD_GLOBAL              3 (NULL + sleep)
             38 LOAD_CONST               1 (0.1)
             40 PRECALL                  1
             44 CALL                     1
             54 POP_TOP

 20          56 LOAD_FAST                1 (local_counter)
             58 STORE_GLOBAL             0 (counter)

 21          60 LOAD_GLOBAL              5 (NULL + print)
             72 LOAD_CONST               2 ('counter=')
             74 LOAD_GLOBAL              0 (counter)
             86 FORMAT_VALUE             0
             88 BUILD_STRING             2
             90 PRECALL                  1
 

<table border="1">
<tbody>
<tr>
    <td ><b>Thread 1</b></td>
    <td ><b>Thread 2</b></td>
</tr>
<tr>
<td >&nbsp;11 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 2 LOAD_GLOBAL &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;0 (counter)<br />&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;14 STORE_FAST &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 1 (local_counter)</td>
<td >&nbsp;</td>
</tr>
<tr>
<td >&nbsp;</td>
<td >
<p>11 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 2 LOAD_GLOBAL &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;0 (counter)<br />&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;14 STORE_FAST &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 1 (local_counter)</p>
<p>&nbsp;14 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;16 LOAD_FAST &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;1 (local_counter)<br />&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;18 LOAD_FAST &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;0 (by)<br />&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;20 BINARY_OP &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 13 (+=)<br />&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;24 STORE_FAST &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 1 (local_counter)</p>
</td>
</tr>
<tr>
<td >14 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;16 LOAD_FAST &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;1 (local_counter)<br />&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;18 LOAD_FAST &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;0 (by)<br />&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;20 BINARY_OP &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 13 (+=)<br />&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;24 STORE_FAST &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 1 (local_counter)</td>
<td >&nbsp;</td>
</tr>
</tbody>
</table>

In [21]:
from threading import Thread, Lock
from time import sleep

counter = 0
counter_lock = Lock()


def increase(by):
    global counter

    # Acquire the lock before updating the counter
    with counter_lock:
        local_counter = counter

        # Perform the increment
        local_counter += by

        # Simulate some processing time (0.1 seconds)
        sleep(0.1)

        # Update the global counter with the new value
        counter = local_counter
        print(f"counter={counter}")


# Create threads
t1 = Thread(target=increase, args=(10,))
t2 = Thread(target=increase, args=(20,))

# Start the threads
t1.start()
t2.start()

# Wait for the threads to complete
t1.join()
t2.join()

print(f"The final counter is {counter}")

counter=10
counter=30
The final counter is 30


## threading.Timer

Python's `threading.Timer` class schedules a function to run after a specified delay:

In [22]:
import subprocess
import threading

# Lambda function to kill the process
kill = lambda process: process.kill()

# Command to execute (ping www.google.com)
cmd = ["ping", "www.google.com", "-t"]

# Start the ping command as a subprocess
ping = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# Create a timer to kill the process after 10 seconds
my_timer = threading.Timer(10, kill, [ping])

try:
    # Start the timer
    my_timer.start()

    # Communicate with the subprocess and get the stdout and stderr
    stdout, stderr = ping.communicate()

finally:
    # Cancel the timer if the subprocess has completed
    my_timer.cancel()

# Print the stdout of the ping command
print(stdout.decode())


Pinging www.google.com [142.250.69.196] with 32 bytes of data:
Reply from 142.250.69.196: bytes=32 time=17ms TTL=114
Reply from 142.250.69.196: bytes=32 time=14ms TTL=114
Reply from 142.250.69.196: bytes=32 time=15ms TTL=114
Reply from 142.250.69.196: bytes=32 time=15ms TTL=114
Reply from 142.250.69.196: bytes=32 time=15ms TTL=114
Reply from 142.250.69.196: bytes=32 time=15ms TTL=114
Reply from 142.250.69.196: bytes=32 time=15ms TTL=114
Reply from 142.250.69.196: bytes=32 time=18ms TTL=114
Reply from 142.250.69.196: bytes=32 time=13ms TTL=114
Reply from 142.250.69.196: bytes=32 time=14ms TTL=114



## Thread communication using Queue

Threading can lead to race conditions when threads access shared resources simultaneously. The `queue` module provides thread-safe data structures.

Producer and Consumer example using thread and queue

In [23]:
import threading
import queue
import time

# The shared queue between the producer and consumer
shared_queue = queue.Queue(maxsize=5)


# Function for the producer
def producer():
    for i in range(1, 5):
        item = f"Item {i}"
        shared_queue.put(item)
        print(f"Produced: {item}")
        time.sleep(1)


# Function for the consumer
def consumer():
    while True:
        item = shared_queue.get()
        if item is None:
            break
        print(f"Consumed: {item}")
        shared_queue.task_done()


# Create and start the producer and consumer threads
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

# Wait for the producer to finish producing
producer_thread.join()

# Add None to the queue to signal the consumer to exit
shared_queue.put(None)

# Wait for the consumer to finish consuming
consumer_thread.join()

print("Producer and consumer have finished.")

Produced: Item 1
Consumed: Item 1
Produced: Item 2Consumed: Item 2

Produced: Item 3Consumed: Item 3

Produced: Item 4Consumed: Item 4

Producer and consumer have finished.


## Deamon thread

A daemon thread is a thread that runs in the background, detached from the main program's lifecycle. When all non-daemon threads finish, daemon threads are terminated

In [24]:
import threading
import time


def timer_thread():
    count = 0
    while True:
        count += 1
        time.sleep(1)
        print(f"Has been waiting for {count} second(s)...")


# Create and start the timer thread
timer_thread = threading.Thread(target=timer_thread)
timer_thread.start()

# Wait for user input to exit
answer = input("Do you want to exit? (Type 'yes' to exit)\n")

# To stop this go to Kernel -> Restart Kernel

Has been waiting for 1 second(s)...
Has been waiting for 2 second(s)...
Has been waiting for 3 second(s)...
Has been waiting for 4 second(s)...
Has been waiting for 5 second(s)...
Has been waiting for 6 second(s)...
Has been waiting for 7 second(s)...
Has been waiting for 8 second(s)...
Has been waiting for 9 second(s)...
Has been waiting for 10 second(s)...
Has been waiting for 11 second(s)...
Has been waiting for 12 second(s)...
Has been waiting for 13 second(s)...
Has been waiting for 14 second(s)...
Has been waiting for 15 second(s)...
Has been waiting for 16 second(s)...
Has been waiting for 17 second(s)...
Has been waiting for 18 second(s)...
Has been waiting for 19 second(s)...
Has been waiting for 20 second(s)...
Has been waiting for 21 second(s)...
Has been waiting for 22 second(s)...
Has been waiting for 23 second(s)...
Has been waiting for 24 second(s)...
Has been waiting for 25 second(s)...
Has been waiting for 26 second(s)...
Has been waiting for 27 second(s)...
Has been w

Has been waiting for 157 second(s)...
Has been waiting for 158 second(s)...
Has been waiting for 159 second(s)...
Has been waiting for 160 second(s)...
Has been waiting for 161 second(s)...
Has been waiting for 162 second(s)...
Has been waiting for 163 second(s)...
Has been waiting for 164 second(s)...
Has been waiting for 165 second(s)...
Has been waiting for 166 second(s)...
Has been waiting for 167 second(s)...
Has been waiting for 168 second(s)...
Has been waiting for 169 second(s)...
Has been waiting for 170 second(s)...
Has been waiting for 171 second(s)...
Has been waiting for 172 second(s)...
Has been waiting for 173 second(s)...
Has been waiting for 174 second(s)...
Has been waiting for 175 second(s)...
Has been waiting for 176 second(s)...
Has been waiting for 177 second(s)...
Has been waiting for 178 second(s)...
Has been waiting for 179 second(s)...
Has been waiting for 180 second(s)...
Has been waiting for 181 second(s)...
Has been waiting for 182 second(s)...
Has been wai

In [1]:
import threading
import time


def timer_thread():
    count = 0
    while True:
        count += 1
        time.sleep(1)
        print(f"Has been waiting for {count} second(s)...")


# Create and start the timer thread as a daemon thread
timer_thread = threading.Thread(target=timer_thread, daemon=True)
timer_thread.start()

# Wait for user input to exit
answer = input("Do you want to exit? (Type 'yes' to exit)\n")

# this won't automatically stop the thread in Jupyter notebook, because the main() thread isn't actually here
# Use Kernel -> Restart to stop the thread

Has been waiting for 1 second(s)...
Has been waiting for 2 second(s)...
Has been waiting for 3 second(s)...
Has been waiting for 4 second(s)...
Has been waiting for 5 second(s)...
Has been waiting for 6 second(s)...
Has been waiting for 7 second(s)...


AttributeError: 'Thread' object has no attribute 'stop'

Has been waiting for 8 second(s)...
Has been waiting for 9 second(s)...
Has been waiting for 10 second(s)...
Has been waiting for 11 second(s)...
Has been waiting for 12 second(s)...
Has been waiting for 13 second(s)...
Has been waiting for 14 second(s)...
Has been waiting for 15 second(s)...
Has been waiting for 16 second(s)...
Has been waiting for 17 second(s)...
Has been waiting for 18 second(s)...
Has been waiting for 19 second(s)...
Has been waiting for 20 second(s)...
Has been waiting for 21 second(s)...
Has been waiting for 22 second(s)...
Has been waiting for 23 second(s)...
Has been waiting for 24 second(s)...
Has been waiting for 25 second(s)...
Has been waiting for 26 second(s)...
Has been waiting for 27 second(s)...
Has been waiting for 28 second(s)...
Has been waiting for 29 second(s)...
Has been waiting for 30 second(s)...
Has been waiting for 31 second(s)...
Has been waiting for 32 second(s)...
Has been waiting for 33 second(s)...
Has been waiting for 34 second(s)...
Has

# multiprocessing

The multiprocessing module facilitates parallel execution by creating separate processes. This is beneficial for CPU-bound tasks as it leverages multiple CPU cores. 

In [1]:
%%writefile example/multiprocess_example.py
import multiprocessing
import random

def is_prime(n):
    for i in range(2, n):
        if n % i == 0:
            return False
    return True

def get_prime_arr(arr):
    return [is_prime(elem) for elem in arr]

if __name__ == "__main__":
    num_arr = [random.randint(100, 10000) for i in range(3000)]
    p1 = multiprocessing.Process(target=get_prime_arr, args=(num_arr[:1001],))
    p2 = multiprocessing.Process(target=get_prime_arr, args=(num_arr[1001:2001],))
    p3 = multiprocessing.Process(target=get_prime_arr, args=(num_arr[2001:3000],))

    p1.start()
    p2.start()
    p3.start()

    p1.join()
    p2.join()
    p3.join()

Writing example/multiprocess_example.py


In [3]:
!viztracer example/multiprocess_example.py


                                                                                
Saving trace data, this could take a while
                                                                                
Loading trace data from processes 0/4
                                                                                
Loading trace data from processes 1/4
                                                                                
Loading trace data from processes 2/4
                                                                                
Loading trace data from processes 3/4
                                                                                
Combining trace data
                                                                                
Dumping trace data, total entries: 29555
                                                                                
Total Entries: 29555
Use the following command to open the report:
[92mvizviewer "c:\Users\bh

## concurrent.futures

The `concurrent.futures` module abstracts the management of threads and processes for concurrent execution. It provides high-level interfaces for executing functions asynchronously.

In [6]:
%%writefile example/single_core_multi_thread_concurrent.py
import concurrent.futures
from downloader import download_file  # Make sure you have the download_file function defined
from pathlib import Path
import shutil

def main():
    itr_forms = [
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr1_english.pdf",
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr2_english.pdf",
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr3_english.pdf",
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr4_english.pdf",
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr5_english.pdf",
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr6_english.pdf",
        "http://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr7_english.pdf",
    ]

    target_path = Path("static") / "concurrency_example"
    shutil.rmtree(target_path, ignore_errors=True)
    target_path.mkdir(exist_ok=True, parents=True)

    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [executor.submit(download_file, itr_form, target_path) for itr_form in itr_forms]
        
        # Wait for all futures to complete
        concurrent.futures.wait(futures)

    print("All downloads completed!")

if __name__ == "__main__":
    main()

Writing example/single_core_multi_thread_concurrent.py


In [7]:
!viztracer example/single_core_multi_thread_concurrent.py

Downloading itr3_english.pdf...
Download complete for itr3_english.pdf!
Downloading itr5_english.pdf...
Downloading itr2_english.pdf...
Download complete for itr5_english.pdf!
Download complete for itr2_english.pdf!
Downloading itr1_english.pdf...
Downloading itr6_english.pdf...
Downloading itr7_english.pdf...
Download complete for itr1_english.pdf!
Download complete for itr6_english.pdf!
Download complete for itr7_english.pdf!
Downloading itr4_english.pdf...
Download complete for itr4_english.pdf!
All downloads completed!

                                                                                
Saving trace data, this could take a while
                                                                                
Loading trace data from processes 0/1
                                                                                
Combining trace data
                                                                                
Dumping trace data, total entries: 154337
  

In [14]:
%%writefile example/multiprocess_example_concurrent.py
import concurrent.futures
import random

def is_prime(n):
    for i in range(2, n):
        if n % i == 0:
            return False
    return True

def get_prime_arr(arr):
    return [is_prime(elem) for elem in arr]

if __name__ == "__main__":
    num_arr = [random.randint(100, 10000) for i in range(3000)]

    num_arr_chunks = [num_arr[i:i+1000] for i in range(0, len(num_arr), 1000)]

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = [executor.submit(get_prime_arr, chunk) for chunk in num_arr_chunks]
        
        # Wait for all futures to complete
        concurrent.futures.wait(futures)

    print("All prime number calculations completed!")

Writing example/multiprocess_example_concurrent.py


In [15]:
!viztracer --log_multiprocess example/multiprocess_example_concurrent.py

[93m--log_multiprocess and --log_subprocess are no longer needed to trace multi-process program[0m
All prime number calculations completed!

                                                                                
Saving trace data, this could take a while
                                                                                
Loading trace data from processes 0/4
                                                                                
Loading trace data from processes 1/4
                                                                                
Loading trace data from processes 2/4
                                                                                
Loading trace data from processes 3/4
                                                                                
Combining trace data
                                                                                
Dumping trace data, total entries: 33063
                                 

**Best Visualization on multithreading and multiprocessing**

[![](https://img.youtube.com/vi/AZnGRKFUU0c/0.jpg)](https://youtu.be/AZnGRKFUU0c)

Also this [Medium article: Intro to Threads and Processes in Python](https://medium.com/@bfortuner/python-multithreading-vs-multiprocessing-73072ce5600b)

**Some good talk on concurrency in Python**

[![](https://img.youtube.com/vi/MCs5OvhV9S4/0.jpg)](https://youtu.be/MCs5OvhV9S4)

[![](https://img.youtube.com/vi/18B1pznaU1o/0.jpg)](https://youtu.be/18B1pznaU1o)

\[<< [Testing](./09_unittest.ipynb) | [Index](./00_index.ipynb) | [Virtual Environments](./11_virtual_environment.ipynb) >>\]
