#Thread Pool:
A pool of pre-initialized threads used to execute tasks concurrently, which helps avoid the overhead of thread creation and destruction.



In [None]:
import concurrent.futures

def task(n):
    print(f"Executing task {n}")

# Create a thread pool with 5 worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Submit tasks to the thread pool
    for i in range(10):
        executor.submit(task, i)


Executing task 0
Executing task 1
Executing task 2
Executing task 3Executing task 4
Executing task 5

Executing task 6
Executing task 7
Executing task 8
Executing task 9


#Producer-Consumer:
Coordination pattern where one or more threads produce data, while one or more threads consume that data. It helps balance the workload between producers and consumers.

In [None]:
import threading
import queue
import time

def producer(q):
    for i in range(5):
        time.sleep(0.1)  # Simulate some work
        item = f"Item {i}"
        q.put(item)
        print(f"Produced {item}")

def consumer(q):
    while True:
        item = q.get()
        print(f"Consumed {item}")
        q.task_done()

q = queue.Queue()
producer_thread = threading.Thread(target=producer, args=(q,))
consumer_thread = threading.Thread(target=consumer, args=(q,))
producer_thread.start()
consumer_thread.start()


Produced Item 0Consumed Item 0

Produced Item 1Consumed Item 1

Produced Item 2Consumed Item 2

Produced Item 3
Consumed Item 3


#Barrier:
A synchronization pattern that allows multiple threads to wait at a barrier until all threads have reached a certain point before continuing execution.



In [None]:
import threading

barrier = threading.Barrier(3)

def worker():
    print("Worker started")
    barrier.wait()
    print("Worker finished")

for _ in range(3):
    threading.Thread(target=worker).start()


Worker startedWorker started

Worker started
Worker finished
Worker finished
Worker finished


#Reader-Writer Lock:
A synchronization pattern that allows multiple readers to access a shared resource simultaneously while ensuring exclusive access for writers.



In [None]:
import threading

class RWLock:
    def __init__(self):
        self._lock = threading.Lock()
        self._readers = 0

    def acquire_read(self):
        with self._lock:
            self._readers += 1
            if self._readers == 1:
                # First reader, acquire exclusive access
                self._resource_lock = threading.Lock()
                self._resource_lock.acquire()

    def release_read(self):
        with self._lock:
            self._readers -= 1
            if self._readers == 0:
                # Last reader, release exclusive access
                self._resource_lock.release()

    def acquire_write(self):
        # Writers always have exclusive access
        self._resource_lock.acquire()

    def release_write(self):
        self._resource_lock.release()


In [None]:
import threading
import time

class SharedResource:
    def __init__(self):
        self._lock = RWLock()
        self._data = 0

    def read(self):
        self._lock.acquire_read()
        print(f"Reader {threading.current_thread().name} reading data: {self._data}")
        self._lock.release_read()

    def write(self, value):
        self._lock.acquire_write()
        self._data = value
        print(f"Writer {threading.current_thread().name} writing data: {value}")
        self._lock.release_write()

def reader(shared_resource):
    for _ in range(3):
        shared_resource.read()
        time.sleep(0.1)

def writer(shared_resource):
    for i in range(3):
        shared_resource.write(i)
        time.sleep(0.1)

shared_resource = SharedResource()

# Create reader threads
readers = [threading.Thread(target=reader, args=(shared_resource,)) for _ in range(3)]

# Create writer threads
writers = [threading.Thread(target=writer, args=(shared_resource,)) for _ in range(2)]

# Start all threads
for reader_thread in readers:
    reader_thread.start()

for writer_thread in writers:
    writer_thread.start()

# Wait for all threads to finish
for reader_thread in readers:
    reader_thread.join()

for writer_thread in writers:
    writer_thread.join()


Reader Thread-15 (reader) reading data: 0
Reader Thread-16 (reader) reading data: 0
Reader Thread-17 (reader) reading data: 0
Writer Thread-18 (writer) writing data: 0
Writer Thread-19 (writer) writing data: 0
Reader Thread-15 (reader) reading data: 0
Reader Thread-16 (reader) reading data: 0
Reader Thread-17 (reader) reading data: 0
Writer Thread-18 (writer) writing data: 1
Writer Thread-19 (writer) writing data: 1
Reader Thread-15 (reader) reading data: 1
Reader Thread-16 (reader) reading data: 1
Reader Thread-17 (reader) reading data: 1
Writer Thread-18 (writer) writing data: 2
Writer Thread-19 (writer) writing data: 2


#Monitor:
A synchronization pattern that encapsulates shared resources and provides mechanisms for mutual exclusion and condition variables for signaling between threads.



In [8]:
import threading

class Monitor:
    def __init__(self):
        self._lock = threading.Lock()
        self._condition = threading.Condition(lock=self._lock)
        self._resource = None

    def acquire(self):
        self._lock.acquire()

    def release(self):
        self._lock.release()

    def wait(self):
        self._condition.wait()

    def notify(self):
        self._condition.notify()

    def notify_all(self):
        self._condition.notifyAll()

    def get_resource(self):
        return self._resource

    def set_resource(self, value):
        self._resource = value

    # Context manager methods
    def __enter__(self):
        self.acquire()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.release()

def reader(monitor):
    with monitor:
        while monitor.get_resource() is None:
            monitor.wait()
        print(f"Reader read resource: {monitor.get_resource()}")

def writer(monitor, data):
    with monitor:
        monitor.set_resource(data)
        monitor.notify_all()
        print(f"Writer wrote resource: {data}")

# Usage
monitor = Monitor()
threading.Thread(target=reader, args=(monitor,)).start()
threading.Thread(target=writer, args=(monitor, "Data from writer")).start()

  self._condition.notifyAll()


Writer wrote resource: Data from writer
Reader read resource: Data from writer


#Semaphore:
A synchronization primitive that restricts the number of threads that can access a shared resource concurrently, helping to control access to resources.



In [9]:
import threading

class Semaphore:
    def __init__(self, value):
        self._value = value
        self._lock = threading.Lock()
        self._condition = threading.Condition(lock=self._lock)

    def acquire(self):
        with self._lock:
            while self._value <= 0:
                self._condition.wait()
            self._value -= 1

    def release(self):
        with self._lock:
            self._value += 1
            self._condition.notify()

    def get_value(self):
        return self._value


In [10]:
import threading
import time

class SharedResource:
    def __init__(self):
        self._semaphore = Semaphore(value=1)
        self._data = None

    def read(self):
        self._semaphore.acquire()
        print(f"Reader {threading.current_thread().name} reading data: {self._data}")
        self._semaphore.release()

    def write(self, value):
        self._semaphore.acquire()
        self._data = value
        print(f"Writer {threading.current_thread().name} writing data: {value}")
        self._semaphore.release()

def reader(shared_resource):
    for _ in range(3):
        shared_resource.read()
        time.sleep(0.1)

def writer(shared_resource):
    for i in range(3):
        shared_resource.write(i)
        time.sleep(0.1)

shared_resource = SharedResource()

# Create reader threads
readers = [threading.Thread(target=reader, args=(shared_resource,)) for _ in range(3)]

# Create writer threads
writers = [threading.Thread(target=writer, args=(shared_resource,)) for _ in range(2)]

# Start all threads
for reader_thread in readers:
    reader_thread.start()

for writer_thread in writers:
    writer_thread.start()

# Wait for all threads to finish
for reader_thread in readers:
    reader_thread.join()

for writer_thread in writers:
    writer_thread.join()


Reader Thread-27 (reader) reading data: None
Reader Thread-28 (reader) reading data: None
Reader Thread-29 (reader) reading data: None
Writer Thread-30 (writer) writing data: 0
Writer Thread-31 (writer) writing data: 0
Reader Thread-27 (reader) reading data: 0
Reader Thread-28 (reader) reading data: 0
Reader Thread-29 (reader) reading data: 0
Writer Thread-30 (writer) writing data: 1
Writer Thread-31 (writer) writing data: 1
Reader Thread-27 (reader) reading data: 1
Reader Thread-28 (reader) reading data: 1
Reader Thread-29 (reader) reading data: 1
Writer Thread-30 (writer) writing data: 2
Writer Thread-31 (writer) writing data: 2


#Futures:
A concurrency pattern that represents a value that may not yet be available, typically used to represent the result of an asynchronous operation.



In [11]:
import concurrent.futures
import time

def compute():
    time.sleep(2)  # Simulate a long computation
    return 42

# Create a thread pool with 1 worker thread
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(compute)
    # Do other work while waiting for the result
    print("Doing other work...")
    result = future.result()  # Wait for the result
    print("Result:", result)


Doing other work...
Result: 42


#Active Object:
A concurrency pattern that encapsulates the execution of a method in an object, allowing method invocations to be queued and executed asynchronously.



In [12]:
import threading
import queue

class ActiveObject:
    def __init__(self):
        self._queue = queue.Queue()

    def enqueue(self, func, *args, **kwargs):
        self._queue.put((func, args, kwargs))

    def _run(self):
        while True:
            func, args, kwargs = self._queue.get()
            func(*args, **kwargs)

    def start(self):
        threading.Thread(target=self._run, daemon=True).start()

# Example usage
def print_message(message):
    print(message)

ao = ActiveObject()
ao.start()
ao.enqueue(print_message, "Hello from Active Object!")


Hello from Active Object!


#Actor:
A concurrency pattern that models concurrent computations as independent actors that communicate by passing messages to each other, allowing for distributed and fault-tolerant systems.



In [13]:
import threading
import queue

class Actor:
    def __init__(self):
        self._inbox = queue.Queue()

    def send_message(self, message):
        self._inbox.put(message)

    def _process_messages(self):
        while True:
            message = self._inbox.get()
            self._handle_message(message)

    def _handle_message(self, message):
        # Implement message handling logic here
        print("Received message:", message)

    def start(self):
        threading.Thread(target=self._process_messages, daemon=True).start()

# Example usage
actor = Actor()
actor.start()
actor.send_message("Hello from Actor!")


Received message:

#Thread-Specific Storage:
A pattern that provides each thread with its own storage for data, ensuring that each thread can maintain its own state without interference from other threads.

In [14]:
import threading

class ThreadLocalStorage:
    def __init__(self):
        self._local = threading.local()

    def set_data(self, data):
        self._local.data = data

    def get_data(self):
        return getattr(self._local, 'data', None)

def worker(storage, value):
    storage.set_data(value)
    print(f"Thread {threading.get_ident()} stored value: {storage.get_data()}")

# Usage
storage = ThreadLocalStorage()
threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(storage, i))
    threads.append(t)
    t.start()
for t in threads:
    t.join()


Thread 137093678364224 stored value: 0
Thread 137093678364224 stored value: 1
Thread 137093074384448 stored value: 2


#Guarded Suspension:
A synchronization pattern where a thread suspends its execution until a certain condition is met, typically used in producer-consumer scenarios.


In [15]:
import threading

class GuardedResource:
    def __init__(self):
        self._lock = threading.Lock()
        self._condition = threading.Condition(lock=self._lock)
        self._resource = None
        self._ready = False

    def get_resource(self):
        with self._lock:
            while not self._ready:
                self._condition.wait()
            return self._resource

    def set_resource(self, resource):
        with self._lock:
            self._resource = resource
            self._ready = True
            self._condition.notify_all()

def producer(resource, data):
    resource.set_resource(data)

def consumer(resource):
    data = resource.get_resource()
    print("Consumer got resource:", data)

# Usage
resource = GuardedResource()
t1 = threading.Thread(target=producer, args=(resource, "Data from producer"))
t2 = threading.Thread(target=consumer, args=(resource,))
t1.start()
t2.start()
t1.join()
t2.join()


Consumer got resource: Data from producer


#Balking:
A pattern where a thread performs an action only if certain conditions are met; otherwise, it "balks" or refuses to proceed.



In [16]:
import threading
import time

class BalkingWorker:
    def __init__(self):
        self._working = False

    def start_work(self):
        if self._working:
            print("Already working, balking.")
            return
        self._working = True
        threading.Thread(target=self._do_work).start()

    def _do_work(self):
        print("Starting work...")
        time.sleep(1)  # Simulate work
        self._working = False
        print("Work done.")

# Usage
worker = BalkingWorker()
worker.start_work()
time.sleep(0.5)  # Simulate time for another request
worker.start_work()  # Balking occurs


Starting work...
Already working, balking.
