\[<< [Testing](./09_unittest.ipynb) | [Index](./00_index.ipynb) | [Virtual Environments](./11_virtual_environment.ipynb) >>\]


# Concurrency

- **Processes and threads**: A program can have one or more processes and a process can have one or more threads. A process is an instance of a program while a thread is a part of a process that can execute instructions.
- **Multiprocessing and multithreading**: Multiprocessing uses multiple cores of a CPU to execute multiple processes in parallel. Multithreading uses multiple threads within a process to perform tasks concurrently.
- **I/O-bound and CPU-bound tasks**: I/O-bound tasks spend more time doing input/output operations than computations (network request, disk read/write, database read/write). CPU-bound tasks spend more time doing calculations than generating I/O requests (video compression, matrix multiplication, find prime number).
- **Suitability of multithreading and multiprocessing**: Multithreading is suitable for I/O-bound tasks, and multiprocessing is suitable for CPU-bound tasks.

# threading

### Sequential Download of File
In a sequential approach, tasks are executed one after another. Consider downloading multiple files sequentially:

In [None]:
%%time
import shutil
from pathlib import Path
import urllib.request
import os


def download(url, target):
    handle = urllib.request.urlopen(url)
    fname = os.path.basename(url)
    path = target / fname

    print(f"Downloading {fname}...")

    with open(path, "wb") as f_handler:
        while True:
            chunk = handle.read(1024)
            if not chunk:
                break
            f_handler.write(chunk)

    print(f"Download complete for {fname}!")


itr_forms = [
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr1_english.pdf",
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr2_english.pdf",
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr3_english.pdf",
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr4_english.pdf",
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr5_english.pdf",
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr6_english.pdf",
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr7_english.pdf",
]

target_path = Path("static") / "concurrency_example"
shutil.rmtree(target_path, ignore_errors=True)
target_path.mkdir(exist_ok=True, parents=True)

for itr_form in itr_forms:
    download(itr_form, target_path)

print("All downloads completed!")

This method is simple but may lead to slower execution when dealing with multiple files, especially in I/O-bound scenarios.

### Download of File Using Threads
To improve performance, threading can be employed. Threads execute tasks concurrently, ideal for I/O-bound operations like downloading files:

In [None]:
%%time
import threading
import shutil
from pathlib import Path
import urllib.request
import os


def download(url, target):
    print(f"Starting {threading.currentThread().getName()}")

    handle = urllib.request.urlopen(url)
    fname = os.path.basename(url)
    path = target / fname

    print(f"Downloading {fname}...")

    with open(path, "wb") as f_handler:
        while True:
            chunk = handle.read(1024)
            if not chunk:
                break
            f_handler.write(chunk)

    print(f"Download complete for {fname}!")
    print(f"Completing {threading.currentThread().getName()}")


itr_forms = [
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr1_english.pdf",
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr2_english.pdf",
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr3_english.pdf",
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr4_english.pdf",
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr5_english.pdf",
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr6_english.pdf",
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr7_english.pdf",
]

target_path = Path("static") / "concurrency_example"
shutil.rmtree(target_path, ignore_errors=True)
target_path.mkdir()

threads = []
for itr_form in itr_forms:
    my_thread = threading.Thread(target=download, args=(itr_form, target_path))
    my_thread.start()
    threads.append(my_thread)

# Wait for all threads to complete
for thread in threads:
    thread.join()

print("All downloads completed!")

💡 Use logging module to log output in threads as they are thread-safe

## threading.Thread

Python's `threading.Thread` class simplifies thread management. It encapsulates a function to be executed in a separate thread:

In [None]:
import threading
import shutil
from pathlib import Path
import urllib.request
import os


class DownloaderThread(threading.Thread):
    def __init__(self, url, target):
        super().__init__()
        self.url = url
        self.target = target

    def run(self):
        print(f"Starting {self.getName()}")

        handle = urllib.request.urlopen(self.url)
        fname = os.path.basename(self.url)
        path = self.target / fname

        print(f"Downloading {fname}...")

        with open(path, "wb") as f_handler:
            while True:
                chunk = handle.read(1024)
                if not chunk:
                    break
                f_handler.write(chunk)

        print(f"Download complete for {fname}!")
        print(f"Completing {self.getName()}")


itr_forms = [
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr1_english.pdf",
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr2_english.pdf",
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr3_english.pdf",
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr4_english.pdf",
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr5_english.pdf",
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr6_english.pdf",
    "https://incometaxindia.gov.in/forms/income-tax%20rules/2023/itr7_english.pdf",
]

target_path = Path("static") / "concurrency_example"
shutil.rmtree(target_path, ignore_errors=True)
target_path.mkdir()

threads = []
for itr_form in itr_forms:
    my_thread = DownloaderThread(itr_form, target_path)
    my_thread.start()
    threads.append(my_thread)

# Wait for all threads to complete
for thread in threads:
    thread.join()

print("All downloads completed!")


## threading.Lock

Threads share memory space, raising synchronization concerns. The `threading.Lock` class ensures only one thread accesses a resource at a time:

In [None]:
from threading import Thread
from time import sleep

counter = 0

def increase(by):
    global counter

    # Read the current value of the counter
    local_counter = counter

    # Perform the increment
    local_counter += by

    # Simulate some processing time (0.1 seconds)
    sleep(0.1)

    # Update the global counter with the new value
    counter = local_counter
    print(f"counter={counter}")

# Create threads
t1 = Thread(target=increase, args=(10,))
t2 = Thread(target=increase, args=(20,))

# Start the threads
t1.start()
t2.start()

# Wait for the threads to complete
t1.join()
t2.join()

print(f"The final counter is {counter}")

In [None]:
from threading import Thread, Lock
from time import sleep

counter = 0
counter_lock = Lock()

def increase(by):
    global counter

    # Acquire the lock before updating the counter
    with counter_lock:
        local_counter = counter

        # Perform the increment
        local_counter += by

        # Simulate some processing time (0.1 seconds)
        sleep(0.1)

        # Update the global counter with the new value
        counter = local_counter
        print(f"counter={counter}")

# Create threads
t1 = Thread(target=increase, args=(10,))
t2 = Thread(target=increase, args=(20,))

# Start the threads
t1.start()
t2.start()

# Wait for the threads to complete
t1.join()
t2.join()

print(f"The final counter is {counter}")

## threading.Timer

Python's `threading.Timer` class schedules a function to run after a specified delay:

In [None]:
import subprocess
import threading

# Lambda function to kill the process
kill = lambda process: process.kill()

# Command to execute (ping www.google.com)
cmd = ['ping', 'www.google.com', '-t']

# Start the ping command as a subprocess
ping = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# Create a timer to kill the process if it takes more than 10 seconds
my_timer = threading.Timer(10, kill, [ping])

try:
    # Start the timer
    my_timer.start()

    # Communicate with the subprocess and get the stdout and stderr
    stdout, stderr = ping.communicate()

finally:
    # Cancel the timer if the subprocess has completed
    my_timer.cancel()

# Print the stdout of the ping command
print(stdout.decode())

## Thread communication using Queue

Threading can lead to race conditions when threads access shared resources simultaneously. The `queue` module provides thread-safe data structures.

Producer and Consumer example using thread and queue

In [None]:
import threading
import queue
import time

# The shared queue between the producer and consumer
shared_queue = queue.Queue(maxsize=5)

# Function for the producer
def producer():
    for i in range(1, 5):
        item = f"Item {i}"
        shared_queue.put(item)
        print(f"Produced: {item}")
        time.sleep(1)

# Function for the consumer
def consumer():
    while True:
        item = shared_queue.get()
        if item is None:
            break
        print(f"Consumed: {item}")
        shared_queue.task_done()

# Create and start the producer and consumer threads
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

# Wait for the producer to finish producing
producer_thread.join()

# Add None to the queue to signal the consumer to exit
shared_queue.put(None)

# Wait for the consumer to finish consuming
consumer_thread.join()

print("Producer and consumer have finished.")

## Deamon thread

A daemon thread is a thread that runs in the background, detached from the main program's lifecycle. When all non-daemon threads finish, daemon threads are terminated

In [None]:
import threading
import time

def timer_thread():
    count = 0
    while True:
        count += 1
        time.sleep(1)
        print(f"Has been waiting for {count} second(s)...")

# Create and start the timer thread
timer_thread = threading.Thread(target=timer_thread)
timer_thread.start()

# Wait for user input to exit
answer = input("Do you want to exit? (Type 'yes' to exit)\n")

# To stop this go to Kernel -> Restart Kernet

In [None]:
import threading
import time

def timer_thread():
    count = 0
    while True:
        count += 1
        time.sleep(1)
        print(f"Has been waiting for {count} second(s)...")

# Create and start the timer thread as a daemon thread
timer_thread = threading.Thread(target=timer_thread, daemon=True)
timer_thread.start()

# Wait for user input to exit
answer = input("Do you want to exit? (Type 'yes' to exit)\n")

# This doesn't work in Jupyter notebook!!!

# multiprocessing

The multiprocessing module facilitates parallel execution by creating separate processes. This is beneficial for CPU-bound tasks as it leverages multiple CPU cores. 

In [None]:
def task():
    result = 2
    for _ in range(10**5):
        result *= 9999
    return result

In [None]:
import time


start = time.perf_counter()
task()
task()
task()
finish = time.perf_counter()

print(f"It took {finish-start:.2f} second(s) to finish")

In [None]:
# Doesn't work in Jupyter. We can use multiprocessing.pool.

import multiprocessing


start = time.perf_counter()

p1 = multiprocessing.Process(target=task)
p2 = multiprocessing.Process(target=task)
p3 = multiprocessing.Process(target=task)

p1.start()
p2.start()
p3.start()

p1.join()
p2.join()
p3.join()

finish = time.perf_counter()
print(f"It took {finish-start:.2f} second(s) to finish")

## concurrent.futures

The `concurrent.futures` module abstracts the management of threads and processes for concurrent execution. It provides high-level interfaces for executing functions asynchronously.

In [None]:
import concurrent.futures
import time

# Function that simulates a time-consuming task
def do_work(task_name):
    print(f"Starting task: {task_name}")
    time.sleep(2)  # Simulate some work
    print(f"Completed task: {task_name}")
    return f"Result from {task_name}"

tasks = ["Task1", "Task2", "Task3", "Task4"]

# Using ThreadPoolExecutor to parallelize the execution
with concurrent.futures.ThreadPoolExecutor() as executor:
    future_to_task = {executor.submit(do_work, task): task for task in tasks}

    for future in concurrent.futures.as_completed(future_to_task):
        task = future_to_task[future]
        try:
            result = future.result()
            print(f"{task} result: {result}")
        except Exception as e:
            print(f"{task} raised an exception: {e}")

\[<< [Testing](./09_unittest.ipynb) | [Index](./00_index.ipynb) | [Virtual Environments](./11_virtual_environment.ipynb) >>\]
