## 1. The foundation of asynchronous programming in Python is coroutines, which are defined using async def and are "awaited" using the await keyword.

In [3]:
import asyncio


async def simple_coroutine():
    print("Coroutine started")
    await asyncio.sleep(1)  # Simulate an I/O-bound task
    print("Coroutine finished")


# Running the coroutine
# asyncio.run(simple_coroutine())
await simple_coroutine()

Coroutine started
Coroutine finished


## 2. Running Multiple Coroutines Concurrently

In [5]:
import asyncio


async def task(name, delay):
    print(f"Task {name} started")
    await asyncio.sleep(delay)  # Simulate asynchronous I/O
    print(f"Task {name} finished")


async def main():
    await asyncio.gather(task("A", 2), task("B", 1))


# Run the event loop
# asyncio.run(main())
await main()

Task A started
Task B started
Task B finished
Task A finished


In [8]:
import asyncio


# Define a task that takes input parameters
async def my_task(name, delay):
    print(f"Task {name} started, will take {delay} seconds")
    await asyncio.sleep(delay)  # Simulate a delay
    print(f"Task {name} finished")


# Main coroutine that gathers tasks with different inputs
async def main():
    # Use asyncio.gather to run tasks with different parameters
    await asyncio.gather(
        my_task("A", 2),  # Task A with 2 seconds delay
        my_task("B", 1),  # Task B with 1 second delay
        my_task("C", 3),  # Task C with 3 seconds delay
    )


# Run the event loop
await main()

Task A started, will take 2 seconds
Task B started, will take 1 seconds
Task C started, will take 3 seconds
Task B finished
Task A finished
Task C finished


In [9]:
import asyncio


# Define a task that takes input parameters
async def my_task(name, delay):
    print(f"Task {name} started, will take {delay} seconds")
    await asyncio.sleep(delay)  # Simulate a delay
    print(f"Task {name} finished")


# Main coroutine that gathers a list of tasks
async def main():
    tasks = []

    # Dynamically create tasks with different parameters
    for i in range(5):
        task_name = f"Task {i+1}"
        delay = i + 1  # Different delay for each task
        tasks.append(my_task(task_name, delay))

    # Use *tasks to unpack the list into asyncio.gather
    await asyncio.gather(*tasks)


# Run the event loop
await main()

Task Task 1 started, will take 1 seconds
Task Task 2 started, will take 2 seconds
Task Task 3 started, will take 3 seconds
Task Task 4 started, will take 4 seconds
Task Task 5 started, will take 5 seconds
Task Task 1 finished
Task Task 2 finished
Task Task 3 finished
Task Task 4 finished
Task Task 5 finished


In [10]:
import asyncio


# Define an async function that performs some analysis on an input (e.g., squaring a number)
async def analyze_number(number, delay):
    print(f"Analyzing number {number}, this will take {delay} seconds")
    await asyncio.sleep(delay)  # Simulate async I/O delay
    result = number * number  # Simple analysis: squaring the number
    print(f"Finished analyzing number {number}: result is {result}")
    return result  # Return the result of the analysis


# Main coroutine to gather the results of analyzing a list of numbers
async def main():
    numbers = [1, 2, 3, 4, 5]  # List of numbers to analyze
    tasks = []

    # Dynamically create tasks for each number in the list
    for i, number in enumerate(numbers):
        delay = i + 1  # Different delay for each task
        tasks.append(analyze_number(number, delay))

    # Run all tasks asynchronously and collect the results
    results = await asyncio.gather(*tasks)

    print("All tasks finished.")
    print("Results:", results)


# Run the event loop and main function
await main()

Analyzing number 1, this will take 1 seconds
Analyzing number 2, this will take 2 seconds
Analyzing number 3, this will take 3 seconds
Analyzing number 4, this will take 4 seconds
Analyzing number 5, this will take 5 seconds
Finished analyzing number 1: result is 1
Finished analyzing number 2: result is 4
Finished analyzing number 3: result is 9
Finished analyzing number 4: result is 16
Finished analyzing number 5: result is 25
All tasks finished.
Results: [1, 4, 9, 16, 25]


## 3. asyncio.create_task() is used to schedule coroutines as tasks, which run concurrently but allow you to manage them explicitly.

In [11]:
import asyncio


async def my_task():
    print("Task started")
    await asyncio.sleep(2)
    print("Task finished")


async def main():
    task1 = asyncio.create_task(my_task())
    task2 = asyncio.create_task(my_task())

    print("Tasks are running concurrently")
    await task1
    await task2


await main()

Tasks are running concurrently
Task started
Task started
Task finished
Task finished


In [12]:
import asyncio


async def my_task(task_name):
    print(f"{task_name} started")
    await asyncio.sleep(2)
    print(f"{task_name} finished")
    return f"Result from {task_name}"


async def main():
    task1 = asyncio.create_task(my_task("Task 1"))
    task2 = asyncio.create_task(my_task("Task 2"))

    print("Tasks are running concurrently")

    # Await tasks and collect their results
    result1 = await task1
    result2 = await task2

    print("Results:", result1, result2)


await main()

Tasks are running concurrently
Task 1 started
Task 2 started
Task 1 finished
Task 2 finished
Results: Result from Task 1 Result from Task 2


## 4. In certain situations, such as reading data from a stream asynchronously, you can use async for to iterate over asynchronous data sources.

In [15]:
import asyncio


async def async_generator():
    for i in range(5):
        await asyncio.sleep(1)
        yield i


async def main():
    async for value in async_generator():
        print(value)


await main()

0
1
2
3
4


In [17]:
async def async_counter(limit):
    count = 0
    while count < limit:
        yield count
        count += 1


async def main_with_counter():
    async for value in async_counter(3):
        print(f"Counter: {value}")


await main_with_counter()

Counter: 0
Counter: 1
Counter: 2


In [20]:
import asyncio


async def async_process_list(elements):
    for element in elements:
        # Simulate an asynchronous operation (e.g., fetching data or processing)
        yield element * 2  # For example, doubling the element


async def main_process_list():
    numbers = [1, 2, 3, 4, 5]
    a = []
    async for processed_value in async_process_list(numbers):
        a.append(processed_value)
    print(a)


# Run the main function
await main_process_list()

[2, 4, 6, 8, 10]


## 5. Coroutines can call other coroutines and await their results, making it possible to build complex async workflows.

In [21]:
import asyncio


async def fetch_data():
    print("Fetching data...")
    await asyncio.sleep(1)
    return {"data": "sample data"}


async def process_data():
    print("Processing data...")
    data = await fetch_data()
    print(f"Processed data: {data}")


async def main():
    await process_data()


await main()

Processing data...
Fetching data...
Processed data: {'data': 'sample data'}


## 6. Sometimes you want to limit the amount of time a coroutine can take. asyncio.wait_for() allows you to specify a timeout for an awaited task.

In [22]:
import asyncio


async def long_task():
    print("Task started")
    await asyncio.sleep(5)
    print("Task finished")


async def main():
    try:
        await asyncio.wait_for(long_task(), timeout=2)
    except asyncio.TimeoutError:
        print("Task timed out!")


await main()

Task started
Task timed out!


## 7. To ensure that only one coroutine accesses a critical section of code at a time, you can use asyncio.Lock (similar to a thread lock but for coroutines).

In [23]:
import asyncio

lock = asyncio.Lock()


async def safe_task(name):
    print(f"Task {name} waiting for lock")
    async with lock:  # Acquire the lock
        print(f"Task {name} acquired the lock")
        await asyncio.sleep(1)
    print(f"Task {name} released the lock")


async def main():
    await asyncio.gather(safe_task("A"), safe_task("B"))


await main()

Task A waiting for lock
Task A acquired the lock
Task B waiting for lock
Task A released the lock
Task B acquired the lock
Task B released the lock


## 8. asyncio.Queue is a thread-safe queue that supports asynchronous producers and consumers. It is useful for distributing tasks between multiple workers.

In [29]:
import asyncio


async def producer(queue):
    for i in range(5):
        print(f"Producing {i}")
        await queue.put(i)


async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:  # Sentinel value to exit
            break
        print(f"Consuming {item}")
        queue.task_done()


async def main():
    queue = asyncio.Queue()

    # Run producer and consumer concurrently
    producer_task = asyncio.create_task(producer(queue))
    consumer_task = asyncio.create_task(consumer(queue))

    await producer_task  # Wait for producer to finish
    await queue.put(None)  # Stop consumer
    await consumer_task  # Wait for consumer to finish


await main()

Producing 0
Producing 1
Producing 2
Producing 3
Producing 4
Consuming 0
Consuming 1
Consuming 2
Consuming 3
Consuming 4


## 9. An asyncio.Semaphore is used to limit the number of concurrent tasks. For instance, if you're making API requests and want to avoid making too many requests at once.

In [31]:
import asyncio

semaphore = asyncio.Semaphore(3)


async def limited_task(name):
    async with semaphore:
        print(f"Task {name} started")
        await asyncio.sleep(1)
        print(f"Task {name} finished")


async def main():
    await asyncio.gather(*(limited_task(f"Task {i}") for i in range(10)))


await main()

Task Task 0 started
Task Task 1 started
Task Task 2 started
Task Task 0 finished
Task Task 1 finished
Task Task 2 finished
Task Task 3 started
Task Task 4 started
Task Task 5 started
Task Task 3 finished
Task Task 4 finished
Task Task 5 finished
Task Task 6 started
Task Task 7 started
Task Task 8 started
Task Task 6 finished
Task Task 7 finished
Task Task 8 finished
Task Task 9 started
Task Task 9 finished


In [34]:
import asyncio

semaphore = asyncio.Semaphore(4)  # Limit to 2 concurrent downloads


async def download_file(file_name):
    async with semaphore:
        print(f"Downloading {file_name} started")
        print(f"Downloading {file_name} finished")


async def main():
    file_names = [f"File-{i}" for i in range(5)]
    await asyncio.gather(*(download_file(file_name) for file_name in file_names))


# Run the main function
await main()

Downloading File-0 started
Downloading File-0 finished
Downloading File-1 started
Downloading File-1 finished
Downloading File-2 started
Downloading File-2 finished
Downloading File-3 started
Downloading File-3 finished
Downloading File-4 started
Downloading File-4 finished


## 10. asyncio.Event can be used to signal between coroutines, ensuring that one coroutine waits for an event to be set by another.

In [35]:
import asyncio

event = asyncio.Event()


async def waiter():
    print("Waiting for event...")
    await event.wait()  # Wait for the event to be set
    print("Event received, proceeding")


async def trigger():
    await asyncio.sleep(2)
    print("Setting event")
    event.set()  # Set the event


async def main():
    await asyncio.gather(waiter(), trigger())


await main()

Waiting for event...
Setting event
Event received, proceeding


## 11. You can run external commands or processes asynchronously using asyncio.create_subprocess_exec() or asyncio.create_subprocess_shell().

In [36]:
import asyncio


async def run_command():
    process = await asyncio.create_subprocess_exec(
        "ls", "-l", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
    )

    stdout, stderr = await process.communicate()
    print(f"[stdout]\n{stdout.decode()}")
    print(f"[stderr]\n{stderr.decode()}")


await run_command()

[stdout]
total 56
-rw-r--r--@ 1 eshantdas  staff    201 Oct 11 12:16 2.py
-rw-r--r--@ 1 eshantdas  staff  22303 Oct 11 12:52 async.ipynb
drwxr-xr-x@ 7 eshantdas  staff    224 Oct 11 12:11 [34mmyenv[m[m

[stderr]



## 12. The standard file operations are blocking, but you can use third-party libraries like aiofiles for non-blocking file I/O.


In [39]:
import aiofiles
import asyncio


async def async_file_io():
    async with aiofiles.open("example.txt", mode="w") as file:
        await file.write("Hello, async file!\n")

    async with aiofiles.open("example.txt", mode="r") as file:
        contents = await file.read()
        print(contents)


await async_file_io()

Hello, async file!

