### Write and read async

In [1]:
import asyncio
import random
import time

class CircularBuffer:
    def __init__(self, size):
        self.buffer = [None] * size
        self.size = size
        self.write_index = 0
        self.read_index = 0

    def write(self, data):
        """Write data into the circular buffer."""
        self.buffer[self.write_index] = data
        print(f"Written '{data}' at index {self.write_index}")
        self.write_index = (self.write_index + 1) % self.size

    def read(self):
        """Read data from the circular buffer."""
        if self.buffer[self.read_index] is None:
            print(f"No new data at index {self.read_index}.")
            return None
        data = self.buffer[self.read_index]
        print(f"Read '{data}' from index {self.read_index}")
        self.buffer[self.read_index] = None  # Simulate consuming the data
        self.read_index = (self.read_index + 1) % self.size
        return data

async def write_to_buffer(buffer, interval):
    """Regularly write data into the circular buffer."""
    while True:
        data = random.randint(1, 100)  # Simulate some data to write
        buffer.write(data)
        await asyncio.sleep(interval)  # Simulate periodic writing

async def read_from_buffer(buffer, interval):
    """Asynchronously read from the circular buffer at irregular intervals."""
    while True:
        data = buffer.read()
        if data is None:
            print("No data to read. Retrying...")
        await asyncio.sleep(interval)  # Simulate asynchronous, delayed reading

async def main():
    buffer_size = 5
    buffer = CircularBuffer(buffer_size)
    
    # Task to write to the buffer every 1 second
    writer_task = asyncio.create_task(write_to_buffer(buffer, 1))
    
    # Task to read from the buffer every 1.5 seconds
    reader_task = asyncio.create_task(read_from_buffer(buffer, 7)) # set to be random time.
    
    # Run both tasks concurrently
    await asyncio.gather(writer_task, reader_task)

# For non-async environments use asyncio.run(main()), for async ones use await main()
await main()


Written '92' at index 0
Read '92' from index 0
Written '86' at index 1
Written '80' at index 2
Written '26' at index 3
Written '79' at index 4
Written '87' at index 0
Written '95' at index 1
Read '95' from index 1
Written '72' at index 2
Written '57' at index 3
Written '100' at index 4
Written '34' at index 0
Written '97' at index 1
Written '35' at index 2
Written '24' at index 3
Read '35' from index 2
Written '45' at index 4
Written '12' at index 0
Written '68' at index 1
Written '35' at index 2


CancelledError: 

### write and read + process async

In [3]:
import asyncio
import random
import time

class CircularBuffer:
    def __init__(self, size):
        self.buffer = [None] * size
        self.size = size
        self.write_index = 0
        self.read_index = 0

    def write(self, data):
        """Write data into the circular buffer."""
        self.buffer[self.write_index] = data
        print(f"Written '{data}' at index {self.write_index}")
        self.write_index = (self.write_index + 1) % self.size

    def read(self):
        """Read data from the circular buffer."""
        if self.buffer[self.read_index] is None:
            print(f"No new data at index {self.read_index}.")
            return None
        data = self.buffer[self.read_index]
        print(f"Read '{data}' from index {self.read_index}")
        self.buffer[self.read_index] = None  # Simulate consuming the data
        self.read_index = (self.read_index + 1) % self.size
        return data

def process_data(data):
    """Synchronous function to process the data."""
    print(f"Processing data: {data}")
    # Simulate a long-running synchronous operation
    time.sleep(7)
    print(f"Finished processing data: {data}")

async def write_to_buffer(buffer, interval):
    """Regularly write data into the circular buffer."""
    while True:
        data = random.randint(1, 100)  # Simulate some data to write
        buffer.write(data)
        await asyncio.sleep(interval)  # Simulate periodic writing

async def read_and_process(buffer, interval):
    """Asynchronously read from the circular buffer and process data in a separate thread."""
    loop = asyncio.get_running_loop()  # Get the event loop
    while True:
        data = buffer.read()
        if data is not None:
            # Use run_in_executor to run the process function in a separate thread
            await loop.run_in_executor(None, process_data, data)
        else:
            print("No data to read. Retrying...")
        await asyncio.sleep(interval)  # Simulate asynchronous, delayed reading

async def main():
    buffer_size = 5
    buffer = CircularBuffer(buffer_size)
    
    # Set the interval for reading and writing
    writer_interval = 1  # 1 ms
    reader_interval = 3  # 1 ms
    
    # Task to write to the buffer every 1 ms
    writer_task = asyncio.create_task(write_to_buffer(buffer, writer_interval))
    
    # Task to read from the buffer every 1 ms and process synchronously in a separate thread
    reader_task = asyncio.create_task(read_and_process(buffer, reader_interval))
    
    # Run both tasks concurrently
    await asyncio.gather(writer_task, reader_task)

# For non-async environments use asyncio.run(main()), for async ones use await main()
await main()


Written '25' at index 0
Read '25' from index 0
Processing data: 25
Written '10' at index 1
Written '22' at index 2
Written '58' at index 3
Written '45' at index 4
Written '75' at index 0
Written '93' at index 1
Finished processing data: 25
Written '36' at index 2
Written '69' at index 3
Written '61' at index 4
Read '93' from index 1
Processing data: 93
Written '35' at index 0
Written '80' at index 1
Written '23' at index 2
Written '99' at index 3
Written '76' at index 4
Written '74' at index 0
Written '14' at index 1
Finished processing data: 93


CancelledError: 

### Multithreading programming with 3 workers

In [4]:
import threading
import queue
import time

# Create a task queue
task_queue = queue.Queue()

# Function for worker threads to process tasks
def worker_thread(thread_id):
    while True:
        # Get a task from the queue
        task = task_queue.get()
        if task is None:
            # If the task is None, exit the loop
            print(f"Thread-{thread_id} exiting.")
            break
        # Simulate task processing
        print(f"Thread-{thread_id} is processing task: {task}")
        time.sleep(1)  # Simulate work being done
        task_queue.task_done()

# Create and start worker threads
num_threads = 3
threads = []

for i in range(num_threads):
    thread = threading.Thread(target=worker_thread, args=(i,))
    thread.start()
    threads.append(thread)

# Add tasks to the task queue
for task_id in range(10):
    task_queue.put(f"Task-{task_id}")

# Block until all tasks are done
task_queue.join()

# Stop workers by adding None to the queue for each worker
for i in range(num_threads):
    task_queue.put(None)

# Wait for all worker threads to finish
for thread in threads:
    thread.join()

print("All tasks completed.")

Thread-0 is processing task: Task-0Thread-1 is processing task: Task-1

Thread-2 is processing task: Task-2
Thread-1 is processing task: Task-3
Thread-0 is processing task: Task-4
Thread-2 is processing task: Task-5
Thread-1 is processing task: Task-6
Thread-0 is processing task: Task-7
Thread-2 is processing task: Task-8
Thread-1 is processing task: Task-9
Thread-1 exiting.
Thread-0 exiting.
Thread-2 exiting.
All tasks completed.


### Task Queue: the task with non-enough data will be requeued at the end

In [7]:
import threading
import queue
import time
import random

# Create a task queue
task_queue = queue.Queue()

# Simulate a shared resource that tasks depend on (e.g., data buffer size)
shared_buffer = {"data_size": 0}

# Create a lock for synchronizing access to shared_buffer
buffer_lock = threading.Lock()

# Function for worker threads to process tasks
def worker_thread(thread_id):
    while True:
        # Get a task from the queue
        task = task_queue.get()
        if task is None:
            # If the task is None, exit the loop
            print(f"Thread-{thread_id} exiting.")
            break
        
        task_id, required_data_size, requeue_count = task
        
        # Acquire lock before checking or modifying shared_buffer
        with buffer_lock:
            # Simulate checking if the task can be processed
            if shared_buffer["data_size"] >= required_data_size:
                # Process the task if the required data size is available
                print(f"Thread-{thread_id} is processing {task_id} requiring {required_data_size} data.")
                time.sleep(3)  # Simulate work being done
                shared_buffer["data_size"] -= required_data_size  # Reduce buffer size after processing
                print(f"Task {task_id} completed by Thread-{thread_id}. Remaining buffer size: {shared_buffer['data_size']}")
            else:
                # Not enough data, re-queue the task at the end
                if requeue_count < 20:  # Limit the number of re-queue attempts
                    print(f"Thread-{thread_id} cannot process {task_id} due to insufficient data. Re-queueing task (Attempt {requeue_count + 1}).")
                    task_queue.put((task_id, required_data_size, requeue_count + 1))
                else:
                    print(f"Thread-{thread_id} discarding {task_id} after {requeue_count} requeue attempts.")
        
        task_queue.task_done()

# Create and start worker threads
num_threads = 3
threads = []

for i in range(num_threads):
    thread = threading.Thread(target=worker_thread, args=(i,))
    thread.start()
    threads.append(thread)

# Add tasks to the task queue with varying required data sizes
for task_id in range(10):
    required_data_size = random.randint(1, 10)
    task_queue.put((f"Task-{task_id}", required_data_size, 0))  # Initialize requeue_count to 0

# Simulate data being added to the buffer at random intervals
def buffer_data_feeder():
    while not task_queue.empty():
        # Randomly add more data to the shared buffer
        added_data = random.randint(3, 8)  # Increase the average data added to avoid starvation
        with buffer_lock:  # Lock the buffer when modifying it
            shared_buffer["data_size"] += added_data
            print(f"Buffer feeder added {added_data} data. Total buffer size: {shared_buffer['data_size']}")
        time.sleep(1)

# Start a thread to feed data into the buffer
buffer_thread = threading.Thread(target=buffer_data_feeder)
buffer_thread.start()

# Block until all tasks are done
task_queue.join()

# Stop workers by adding None to the queue for each worker
for i in range(num_threads):
    task_queue.put(None)

# Wait for all worker threads to finish
for thread in threads:
    thread.join()

buffer_thread.join()

print("All tasks completed.")


Buffer feeder added 8 data. Total buffer size: 8
Thread-0 is processing Task-0 requiring 1 data.
Task Task-0 completed by Thread-0. Remaining buffer size: 7
Thread-0 is processing Task-3 requiring 3 data.
Task Task-3 completed by Thread-0. Remaining buffer size: 4
Thread-1 cannot process Task-2 due to insufficient data. Re-queueing task (Attempt 1).
Thread-1 cannot process Task-5 due to insufficient data. Re-queueing task (Attempt 1).
Thread-1 cannot process Task-6 due to insufficient data. Re-queueing task (Attempt 1).
Thread-0 cannot process Task-4 due to insufficient data. Re-queueing task (Attempt 1).
Thread-0 cannot process Task-8 due to insufficient data. Re-queueing task (Attempt 1).
Thread-0 cannot process Task-9 due to insufficient data. Re-queueing task (Attempt 1).
Thread-1 is processing Task-7 requiring 3 data.
Task Task-7 completed by Thread-1. Remaining buffer size: 1
Buffer feeder added 5 data. Total buffer size: 6
Thread-2 cannot process Task-1 due to insufficient data.

### Circular buffer + task queue

In [9]:
import asyncio
import random
import time
import queue

# Circular buffer class with thread-safe access
class CircularBuffer:
    def __init__(self, size):
        self.buffer = [None] * size
        self.size = size
        self.write_index = 0
        self.read_index = 0
        self.data_size = 0  # Keeps track of how much data is in the buffer
        self.lock = asyncio.Lock()  # Async lock for thread-safe access in async context

    async def write(self, data_size):
        """Asynchronously write data into the circular buffer, one unit per index."""
        async with self.lock:
            # Check available space in the buffer
            available_space = self.size - self.data_size
            if available_space <= 0:
                print("Buffer full. Skipping write.")  # If there's no space, skip the write
                return
            
            # If more data is to be written than available space, write as much as possible
            if data_size > available_space:
                print(f"Buffer has only {available_space} slots left. Writing partial data.")
                data_size = available_space  # Only write as much as we can fit
            
            # Write as much data as possible (one unit per index)
            for _ in range(data_size):
                self.buffer[self.write_index] = 1  # Write one unit of data at a time
                print(f"Written 1 unit of data at index {self.write_index}.")
                self.write_index = (self.write_index + 1) % self.size
            self.data_size += data_size
            print(f"Buffer after write: {self.data_size}/{self.size}")

    async def read(self, required_data_size):
        """Asynchronously read data from the circular buffer, one unit per index."""
        async with self.lock:
            if self.data_size >= required_data_size:
                data_read = 0
                for _ in range(required_data_size):
                    self.buffer[self.read_index] = None  # Consume data
                    print(f"Read 1 unit of data from index {self.read_index}.")
                    self.read_index = (self.read_index + 1) % self.size
                    data_read += 1
                self.data_size -= required_data_size
                print(f"Buffer after read: {self.data_size}/{self.size}")
                return data_read
            else:
                print("Not enough data to read.")
                return None

# Synchronous processing function, run in separate thread
def process_data(worker_id, data_size):
    """Synchronous function to process the data."""
    print(f"Worker {worker_id}: Processing {data_size} units of data.")
    time.sleep(2)  # Simulate a long-running operation
    print(f"Worker {worker_id}: Finished processing {data_size} units of data.")

# Asynchronously feed data into the buffer at random intervals
async def write_to_buffer(buffer):
    while True:
        data_size = random.randint(1, 5)  # Random data size to write
        await buffer.write(data_size)
        await asyncio.sleep(random.uniform(0.5, 1))  # Random sleep between writes

# Task queue worker for processing tasks asynchronously
async def task_worker(worker_id, task_queue, buffer, loop):
    while True:
        task = await task_queue.get()
        if task is None:
            break

        task_id, required_data_size, requeue_count = task
        print(f"Worker {worker_id}: Checking task {task_id} with required data size {required_data_size}")

        # Attempt to read from buffer
        data = await buffer.read(required_data_size)
        if data is not None:
            # Process the data synchronously in a separate thread
            await loop.run_in_executor(None, process_data, worker_id, data)
        else:
            # Wait a bit longer before requeueing task to allow buffer to refill
            await asyncio.sleep(0.5)  # Increased wait to allow more data accumulation
            if requeue_count < 10:
                print(f"Worker {worker_id}: Requeueing task {task_id}, attempt {requeue_count + 1}")
                await task_queue.put((task_id, required_data_size, requeue_count + 1))
            else:
                print(f"Worker {worker_id}: Discarding task {task_id} after {requeue_count} requeues.")

        task_queue.task_done()

async def main():
    buffer_size = 10
    buffer = CircularBuffer(buffer_size)

    # Create an asyncio task queue
    task_queue = asyncio.Queue()

    # Add some tasks to the queue (task_id, required_data_size, requeue_count)
    for task_id in range(10):
        required_data_size = random.randint(2, 6)
        await task_queue.put((f"Task-{task_id}", required_data_size, 0))

    # Create the asyncio loop to run tasks
    loop = asyncio.get_running_loop()

    # Start the buffer writer task
    writer_task = asyncio.create_task(write_to_buffer(buffer))

    # Start task workers (readers) that check tasks from the queue and process
    num_workers = 3
    workers = [
        asyncio.create_task(task_worker(i, task_queue, buffer, loop))
        for i in range(num_workers)
    ]

    # Wait for all tasks to complete
    await task_queue.join()

    # Stop the workers by putting `None` into the queue for each worker
    for _ in workers:
        await task_queue.put(None)

    # Wait for all workers to finish
    await asyncio.gather(*workers)
    await writer_task

# Run the event loop
await main()


Written 1 unit of data at index 0.
Written 1 unit of data at index 1.
Written 1 unit of data at index 2.
Written 1 unit of data at index 3.
Written 1 unit of data at index 4.
Buffer after write: 5/10
Worker 0: Checking task Task-0 with required data size 6
Not enough data to read.
Worker 1: Checking task Task-1 with required data size 6
Not enough data to read.
Worker 2: Checking task Task-2 with required data size 3
Read 1 unit of data from index 0.
Read 1 unit of data from index 1.
Read 1 unit of data from index 2.
Buffer after read: 2/10
Worker 2: Processing 3 units of data.
Worker 0: Requeueing task Task-0, attempt 1
Worker 0: Checking task Task-3 with required data size 2
Read 1 unit of data from index 3.
Read 1 unit of data from index 4.
Buffer after read: 0/10
Worker 1: Requeueing task Task-1, attempt 1
Worker 1: Checking task Task-4 with required data size 4
Not enough data to read.
Worker 0: Processing 2 units of data.
Written 1 unit of data at index 5.
Written 1 unit of data 

CancelledError: 