# Asynchronous Programming

## Non-blocking I/O

%pip install asyncio aiohttp nest_asyncio numpy -q


In [1]:
import asyncio
import nest_asyncio

# Patch the event loop to allow nested asyncio calls in Jupyter Notebook environments
nest_asyncio.apply()

async def fetch_data(source_id):
    """
    Simulates an asynchronous I/O-bound operation to fetch data from a given source.

    Args:
        source_id (int): The identifier of the data source (e.g., API, file).
    
    Returns:
        str: Simulated data from the specified source.
    
    Note:
        The function simulates a delay (e.g., network request or file read)
        using `await asyncio.sleep(2)` to represent a non-blocking I/O-bound task.
    """
    print(f"Fetching data from source {source_id}...")
    await asyncio.sleep(2)  # Simulate I/O delay such as network call or file read.
    return f"Data from source {source_id}"

async def process_data(data):
    """
    Simulates an asynchronous processing operation on the fetched data.

    Args:
        data (str): The data to be processed.
    
    Returns:
        str: Processed version of the input data.
    
    Note:
        The function simulates a processing delay (e.g., CPU-bound computation)
        using `await asyncio.sleep(1)` to represent non-blocking processing time.
    """
    print(f"Processing data: {data}")
    await asyncio.sleep(1)  # Simulate data processing delay (e.g., computations).
    return f"Processed: {data}"

async def main():
    """
    The main asynchronous function that manages data fetching and processing concurrently.
    
    Steps:
        1. Fetch data concurrently from multiple sources.
        2. Process the fetched data concurrently.
    
    Returns:
        list: A list of processed data from all sources.
    
    Note:
        The `asyncio.gather` function is used to run the `fetch_data` and `process_data`
        tasks concurrently, making efficient use of I/O-bound operations.
    """
    sources = [1, 2, 3]  # Simulated data sources (e.g., files, APIs, databases)
    
    # Step 1: Fetch data concurrently from all sources
    fetch_tasks = [fetch_data(source) for source in sources]
    raw_data = await asyncio.gather(*fetch_tasks)  # Wait for all fetch tasks to complete
    
    # Step 2: Process the fetched data concurrently
    process_tasks = [process_data(data) for data in raw_data]
    processed_data = await asyncio.gather(*process_tasks)  # Wait for all processing tasks to complete
    
    return processed_data

# In a Jupyter environment, we can directly await the main function
# This will execute the entire process of fetching and processing data concurrently
results = await main()
print("Final processed data:", results)


Fetching data from source 1...
Fetching data from source 2...
Fetching data from source 3...
Processing data: Data from source 1
Processing data: Data from source 2
Processing data: Data from source 3
Final processed data: ['Processed: Data from source 1', 'Processed: Data from source 2', 'Processed: Data from source 3']


## Event-driven architectures

In [2]:
import asyncio

class AIEventProcessor:
    """
    Simulates an AI processor that handles individual events asynchronously.

    Methods:
        process_event(event: str): Simulates the asynchronous processing of an event.
    """
    async def process_event(self, event):
        """
        Asynchronously processes an event, simulating AI-based event handling.

        Args:
            event (str): The event to be processed.

        Returns:
            str: A string indicating the result of the processing.
        
        Note:
            This function includes a simulated delay (`await asyncio.sleep(1)`)
            to mimic real-world AI computation or data analysis time.
        """
        await asyncio.sleep(1)  # Simulate a 1-second AI processing delay
        return f"Processed: {event}"  # Return the result after processing

class EventDrivenAISystem:
    """
    Manages an event-driven AI system that produces and consumes events asynchronously.

    Attributes:
        processor (AIEventProcessor): An instance of AIEventProcessor to process events.
        queue (asyncio.Queue): An asynchronous queue to hold events for processing.
    
    Methods:
        produce_event(event: str): Asynchronously adds an event to the queue.
        consume_events(): Continuously consumes and processes events from the queue.
    """
    def __init__(self):
        """
        Initializes the EventDrivenAISystem with an AI event processor and an event queue.
        """
        self.processor = AIEventProcessor()  # Initialize the AI event processor
        self.queue = asyncio.Queue()  # Create an asyncio queue to store events

    async def produce_event(self, event):
        """
        Adds an event to the queue asynchronously.

        Args:
            event (str): The event to be added to the queue.
        
        Note:
            This method simulates the event production process in an asynchronous
            environment, where events are dynamically added to a queue for processing.
        """
        await self.queue.put(event)  # Add the event to the queue

    async def consume_events(self):
        """
        Continuously consumes and processes events from the queue asynchronously.

        Note:
            This method runs indefinitely and processes events from the queue as they
            arrive. Once an event is processed, it marks the task as done using `task_done()`.
        """
        while True:
            event = await self.queue.get()  # Wait for an event to be available in the queue
            result = await self.processor.process_event(event)  # Process the event using AI processor
            print(result)  # Output the result of the processed event
            self.queue.task_done()  # Mark the task as done to indicate completion

async def main():
    """
    Main coroutine that sets up the event-driven AI system, produces events,
    and consumes them concurrently.
    
    Steps:
        1. Initialize the EventDrivenAISystem.
        2. Start the event consumer task.
        3. Produce several events.
        4. Wait for the queue to be processed.
        5. Cancel the consumer task once all events are processed.
    
    Note:
        The consumer task runs indefinitely in the background and is explicitly
        cancelled after all events in the queue are processed.
    """
    system = EventDrivenAISystem()  # Initialize the event-driven AI system
    consumer = asyncio.create_task(system.consume_events())  # Start consuming events asynchronously
    
    # Step 1: Simulate event production (adding events to the queue)
    for i in range(5):  # Simulate producing 5 events
        await system.produce_event(f"Event {i}")
    
    # Step 2: Wait until all events are processed and the queue is empty
    await system.queue.join()
    
    # Step 3: Cancel the consumer task once all events have been processed
    consumer.cancel()

# Run the main asynchronous event loop
asyncio.run(main())


Processed: Event 0
Processed: Event 1
Processed: Event 2
Processed: Event 3
Processed: Event 4


## Pipeline parallelism

In [3]:
import asyncio
from typing import List

class AsyncAIPipeline:
    """
    A class to represent an asynchronous AI processing pipeline.
    
    The pipeline performs three stages for each item: preprocessing, model inference, 
    and postprocessing, all in a non-blocking, asynchronous manner.
    """
    
    async def preprocess(self, data: str) -> str:
        """
        Simulates the preprocessing step of the pipeline.

        Args:
            data (str): The raw input data.

        Returns:
            str: The preprocessed data.
        """
        await asyncio.sleep(0.1)  # Simulate a brief delay for preprocessing
        preprocessed_data = f"Preprocessed: {data}"
        print(preprocessed_data)  # Print preprocessing stage
        return preprocessed_data

    async def model_inference(self, data: str) -> str:
        """
        Simulates the model inference step of the pipeline.

        Args:
            data (str): The preprocessed data.

        Returns:
            str: The inference result.
        """
        await asyncio.sleep(0.2)  # Simulate a slightly longer delay for model inference
        inference_result = f"Inference: {data}"
        print(inference_result)  # Print inference stage
        return inference_result

    async def postprocess(self, data: str) -> str:
        """
        Simulates the postprocessing step of the pipeline.

        Args:
            data (str): The data after model inference.

        Returns:
            str: The postprocessed result.
        """
        await asyncio.sleep(0.1)  # Simulate a brief delay for postprocessing
        postprocessed_data = f"Postprocessed: {data}"
        print(postprocessed_data)  # Print postprocessing stage
        return postprocessed_data

    async def process_item(self, item: str) -> str:
        """
        Processes an individual item through the full pipeline:
        preprocessing, model inference, and postprocessing.

        Args:
            item (str): The raw input data.

        Returns:
            str: The fully processed result.
        """
        # Preprocess the input data
        preprocessed = await self.preprocess(item)
        
        # Perform model inference on the preprocessed data
        inference_result = await self.model_inference(preprocessed)
        
        # Postprocess the inference result
        postprocessed = await self.postprocess(inference_result)
        
        return postprocessed

    async def run_pipeline(self, items: List[str]) -> List[str]:
        """
        Runs the pipeline for a list of items concurrently.

        Args:
            items (List[str]): A list of raw input data items.

        Returns:
            List[str]: A list of fully processed results.
        """
        # Create asynchronous tasks for each item in the list
        tasks = [self.process_item(item) for item in items]
        
        # Execute all tasks concurrently and return the results
        return await asyncio.gather(*tasks)

async def main():
    """
    Main function to demonstrate running the asynchronous AI pipeline.
    """
    # Initialize the AI processing pipeline
    pipeline = AsyncAIPipeline()
    
    # Sample input data
    input_data = ["Data1", "Data2", "Data3", "Data4", "Data5"]
    
    # Run the pipeline on the input data and gather the results
    results = await pipeline.run_pipeline(input_data)
    
    # Print each result to the console
    for result in results:
        print(f"Final result: {result}")

# Start the event loop and run the main function
asyncio.run(main())


Preprocessed: Data1
Preprocessed: Data2
Preprocessed: Data3
Preprocessed: Data4
Preprocessed: Data5
Inference: Preprocessed: Data1
Inference: Preprocessed: Data2
Inference: Preprocessed: Data3
Inference: Preprocessed: Data4
Inference: Preprocessed: Data5
Postprocessed: Inference: Preprocessed: Data1
Postprocessed: Inference: Preprocessed: Data2
Postprocessed: Inference: Preprocessed: Data3
Postprocessed: Inference: Preprocessed: Data4
Postprocessed: Inference: Preprocessed: Data5
Final result: Postprocessed: Inference: Preprocessed: Data1
Final result: Postprocessed: Inference: Preprocessed: Data2
Final result: Postprocessed: Inference: Preprocessed: Data3
Final result: Postprocessed: Inference: Preprocessed: Data4
Final result: Postprocessed: Inference: Preprocessed: Data5


## Asynchronous Parameter Updates in Distributed AI Training

In [4]:
import asyncio
import numpy as np
from typing import List

class ParameterServer:
    """
    A class that simulates a parameter server to store and update parameters 
    for a distributed training system.
    
    Attributes:
        params (np.ndarray): An array of parameters initialized to zeros.
        lock (asyncio.Lock): A lock to ensure synchronous updates to the parameters.
    """
    
    def __init__(self, param_size: int):
        """
        Initializes the parameter server with a parameter array of given size.
        
        Args:
            param_size (int): The size of the parameter array.
        """
        self.params = np.zeros(param_size)  # Initialize parameters to zeros
        self.lock = asyncio.Lock()  # Lock to handle concurrent access to parameters

    async def get_params(self) -> np.ndarray:
        """
        Asynchronously fetches a copy of the parameters.
        
        Returns:
            np.ndarray: A copy of the current parameters.
        """
        async with self.lock:  # Ensures synchronous access to parameters
            return self.params.copy()  # Return a copy of the parameters

    async def update_params(self, gradients: np.ndarray, learning_rate: float = 0.01):
        """
        Asynchronously updates the parameters based on gradients using gradient descent.
        
        Args:
            gradients (np.ndarray): The computed gradients to update the parameters.
            learning_rate (float): The learning rate for parameter update (default is 0.01).
        """
        async with self.lock:  # Lock ensures only one update happens at a time
            self.params -= learning_rate * gradients  # Update parameters using gradient descent

class Worker:
    """
    A class representing a worker in a distributed training system.
    
    Attributes:
        worker_id (int): The unique identifier for the worker.
        param_server (ParameterServer): The parameter server instance to fetch and update parameters.
    """
    
    def __init__(self, worker_id: int, param_server: ParameterServer):
        """
        Initializes a worker with an ID and a reference to the parameter server.
        
        Args:
            worker_id (int): The unique identifier for the worker.
            param_server (ParameterServer): The parameter server instance.
        """
        self.worker_id = worker_id  # Worker ID for tracking
        self.param_server = param_server  # Parameter server to fetch/update parameters

    async def compute_gradients(self, params: np.ndarray) -> np.ndarray:
        """
        Simulates gradient computation asynchronously by generating random gradients.
        
        Args:
            params (np.ndarray): The current parameters.
        
        Returns:
            np.ndarray: Simulated gradients (random values with the same shape as parameters).
        """
        # Simulate gradient computation with random delay (I/O-bound task)
        await asyncio.sleep(np.random.rand())  # Random sleep time to simulate computation
        return np.random.randn(*params.shape)  # Return random gradients of the same shape as params

    async def train(self, num_iterations: int):
        """
        Simulates the training process for a specified number of iterations.
        
        Args:
            num_iterations (int): The number of training iterations to perform.
        """
        for i in range(num_iterations):
            # Fetch the current parameters from the parameter server
            params = await self.param_server.get_params()
            # Compute gradients based on the fetched parameters
            gradients = await self.compute_gradients(params)
            # Update parameters on the server based on computed gradients
            await self.param_server.update_params(gradients)
            # Log the completion of each iteration
            print(f"Worker {self.worker_id} completed iteration {i+1}")

async def distributed_training(num_workers: int, param_size: int, num_iterations: int):
    """
    Simulates distributed training with multiple workers interacting with a central parameter server.
    
    Args:
        num_workers (int): The number of workers to participate in the training.
        param_size (int): The size of the parameter array managed by the parameter server.
        num_iterations (int): The number of training iterations each worker will perform.
    """
    # Create a parameter server with the specified parameter size
    param_server = ParameterServer(param_size)
    
    # Initialize workers and assign each to the parameter server
    workers = [Worker(i, param_server) for i in range(num_workers)]
    
    # Create asynchronous training tasks for all workers
    tasks = [worker.train(num_iterations) for worker in workers]
    
    # Run all worker training tasks concurrently
    await asyncio.gather(*tasks)
    
    # Fetch and print the final parameters from the parameter server after training
    final_params = await param_server.get_params()
    print(f"Final parameters: {final_params}")

async def main():
    """
    Main entry point for running the distributed training simulation.
    """
    # Run distributed training with 3 workers, 5 parameters, and 10 iterations per worker
    await distributed_training(num_workers=3, param_size=5, num_iterations=10)

# Start the asynchronous event loop and execute the main function
asyncio.run(main())


Worker 0 completed iteration 1
Worker 2 completed iteration 1
Worker 0 completed iteration 2
Worker 1 completed iteration 1
Worker 2 completed iteration 2
Worker 0 completed iteration 3
Worker 0 completed iteration 4
Worker 1 completed iteration 2
Worker 0 completed iteration 5
Worker 2 completed iteration 3
Worker 1 completed iteration 3
Worker 0 completed iteration 6
Worker 2 completed iteration 4
Worker 1 completed iteration 4
Worker 0 completed iteration 7
Worker 2 completed iteration 5
Worker 1 completed iteration 5
Worker 2 completed iteration 6
Worker 1 completed iteration 6
Worker 0 completed iteration 8
Worker 1 completed iteration 7
Worker 2 completed iteration 7
Worker 1 completed iteration 8
Worker 0 completed iteration 9
Worker 1 completed iteration 9
Worker 0 completed iteration 10
Worker 2 completed iteration 8
Worker 1 completed iteration 10
Worker 2 completed iteration 9
Worker 2 completed iteration 10
Final parameters: [-0.03253866 -0.07696249 -0.01870058  0.04505101 