<a href="https://colab.research.google.com/github/derricksobrien/101-tutorial/blob/master/Asynchronous_Data_Workflow_Demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
"""
ASYNC DATA WORKFLOW DEMO
A simple demonstration of building a concurrent data processing workflow
using Python's asyncio, illustrating I/O concurrency and task management.

Principles Illustrated:
1. Concurrency via asyncio.gather()
2. Non-blocking I/O using 'await' with aiohttp (simulated via asyncio.sleep)
3. Task monitoring using asyncio.Task and callbacks
"""
import asyncio
import time
from typing import Any, List, Dict
import random

# --- Configuration ---
# List of "tasks" or "jobs" to be processed concurrently.
WORKFLOW_JOBS = [
    {"id": "DB_Fetch_User_A", "duration": 2.5, "type": "I/O"},
    {"id": "API_Call_Service_B", "duration": 4.0, "type": "I/O"},
    {"id": "File_Read_Config", "duration": 1.0, "type": "I/O"},
    {"id": "Validate_Data_C", "duration": 3.2, "type": "I/O"},
    {"id": "Heavy_Calc_D", "duration": 0.5, "type": "CPU"}, # Short CPU simulation
]

# --- 1. Task Completion Callback (Monitoring Stage) ---
def task_completed_callback(task: asyncio.Task) -> None:
    """
    This synchronous function runs immediately upon the completion of its attached task.
    It simulates a monitoring or logging step, providing instant feedback.
    """
    job_id = task.get_name() # Get the unique name we assigned to the task
    current_time = time.time() - WORKFLOW_START_TIME

    if task.cancelled():
        print(f"\n[--- MONITOR ---] ‚ùå Task '{job_id}' was cancelled.")
    elif task.exception():
        print(f"\n[--- MONITOR ---] ‚ö†Ô∏è Task '{job_id}' FAILED! Error: {task.exception()}")
    else:
        # Task completed successfully. Print the result.
        result = task.result()
        print(f"\n[--- MONITOR ---] ‚úÖ Task '{job_id}' FINISHED at T+{current_time:.2f}s.")
        print(f"                | Result: Successfully processed {result} bytes.")

# --- 2. Asynchronous Workflow Step (I/O Concurrency) ---
async def execute_job(job: Dict[str, Any]) -> int:
    """
    The coroutine that performs the actual job. This function uses 'await'
    to yield control during simulated I/O-bound wait times.
    """
    job_id = job["id"]
    duration = job["duration"]
    current_time = time.time() - WORKFLOW_START_TIME

    print(f"[{job_id}] ‚û°Ô∏è  INITIATED at T+{current_time:.2f}s. Expected duration: {duration:.1f}s.")

    # --- THE KEY TO ASYNCHRONOUS CONCURRENCY ---
    # We use await asyncio.sleep(duration) to simulate a non-blocking I/O operation
    # (e.g., waiting for an HTTP response, a database query, or a file read).
    # When this coroutine hits 'await', it pauses and returns control to the Event Loop,
    # allowing the loop to switch to other pending tasks immediately.
    await asyncio.sleep(duration)
    # The coroutine is resumed when the Event Loop determines the sleep is over.

    # Simulate a small, successful processing result (e.g., number of bytes read)
    processed_bytes = random.randint(1000, 10000)
    print(f"[{job_id}] ‚¨ÖÔ∏è  COMPLETED simulated processing internally.")
    return processed_bytes

# --- 3. Main Workflow Orchestrator ---
async def main_workflow() -> List[Any]:
    """
    The primary coroutine that sets up and manages the concurrent execution.
    """
    global WORKFLOW_START_TIME
    WORKFLOW_START_TIME = time.time()

    print("=========================================================================")
    print(f"| WORKFLOW STARTING: Processing {len(WORKFLOW_JOBS)} Jobs Concurrently |")
    print("=========================================================================")

    # Create a list of coroutine objects from the job list
    coroutine_objs = [execute_job(job) for job in WORKFLOW_JOBS]

    # --- Task Creation & Monitoring ---
    tasks = []
    for job, coro in zip(WORKFLOW_JOBS, coroutine_objs):
        # 1. Create an asyncio.Task from the coroutine
        # Tasks are scheduled immediately by the Event Loop after creation.
        task = asyncio.create_task(coro, name=job['id'])

        # 2. Attach the monitoring callback
        # This allows us to track completion status for each task independently.
        task.add_done_callback(task_completed_callback)
        tasks.append(task)

    print(f"\n[ORCHESTRATOR] üèóÔ∏è  {len(tasks)} Tasks have been scheduled concurrently on the Event Loop.")
    print("[ORCHESTRATOR] ‚è≥ Now 'await'ing all tasks via asyncio.gather()...")
    print("-------------------------------------------------------------------------")

    # --- Concurrency Execution and Waiting ---
    # asyncio.gather() runs all tasks concurrently and waits for the results of ALL of them.
    # The total execution time will be dominated by the longest-running task (API_Call_Service_B at 4.0s).
    all_results = await asyncio.gather(*tasks, return_exceptions=True)

    # --- Finalization ---
    end_time = time.time()
    total_time = end_time - WORKFLOW_START_TIME

    print("-------------------------------------------------------------------------")
    print(f"[ORCHESTRATOR] üèÅ All tasks finished.")
    print(f"[ORCHESTRATOR] ‚è±Ô∏è  Total Workflow Time: {total_time:.2f} seconds")

    # Filter out exceptions for the final summary
    successful_results = [r for r in all_results if not isinstance(r, Exception)]

    return successful_results

# Global variable to track start time for relative timing in callbacks
WORKFLOW_START_TIME = 0.0

# --- 4. Run the Workflow ---

final_data = await main_workflow()

print("\n=========================================================================")
print(f"| FINAL WORKFLOW SUMMARY: {len(final_data)} Successful Results |")
print("=========================================================================")
total_bytes = sum(final_data)
for i, data in enumerate(final_data):
    print(f"Result {i+1}: Processed {data} bytes.")
print(f"\nTotal Processed Bytes: {total_bytes}")

| WORKFLOW STARTING: Processing 5 Jobs Concurrently |

[ORCHESTRATOR] üèóÔ∏è  5 Tasks have been scheduled concurrently on the Event Loop.
[ORCHESTRATOR] ‚è≥ Now 'await'ing all tasks via asyncio.gather()...
-------------------------------------------------------------------------
[DB_Fetch_User_A] ‚û°Ô∏è  INITIATED at T+0.00s. Expected duration: 2.5s.
[API_Call_Service_B] ‚û°Ô∏è  INITIATED at T+0.00s. Expected duration: 4.0s.
[File_Read_Config] ‚û°Ô∏è  INITIATED at T+0.00s. Expected duration: 1.0s.
[Validate_Data_C] ‚û°Ô∏è  INITIATED at T+0.00s. Expected duration: 3.2s.
[Heavy_Calc_D] ‚û°Ô∏è  INITIATED at T+0.00s. Expected duration: 0.5s.
[Heavy_Calc_D] ‚¨ÖÔ∏è  COMPLETED simulated processing internally.

[--- MONITOR ---] ‚úÖ Task 'Heavy_Calc_D' FINISHED at T+0.50s.
                | Result: Successfully processed 1333 bytes.
[File_Read_Config] ‚¨ÖÔ∏è  COMPLETED simulated processing internally.

[--- MONITOR ---] ‚úÖ Task 'File_Read_Config' FINISHED at T+1.00s.
                | Result