# Asynchronous Programing

The Async keyword when used before a def statment defined a new coroutine.

A coroutine can be suspended or exicuted at predefined circumstances.

Asynchronous tasks do not all run at the same time but insted when idle will allow for another process to run also

This is called cooperative.

Cooperative multitasking is on the application level and does not deal with threads or processes that need to release control.



In [5]:
import asyncio 
import nest_asyncio # this will fix the iPython unable to perform Asynchronous tasks
nest_asyncio.apply()
async def async_hello():
    print("hello, world")
async_hello()

<coroutine object async_hello at 0x7effb573fdc0>

As you can see an async gives an output like a generator/functional program. As you can guess it also does not do anything untill execution is scheduled. 

Application developers should typically use the high-level asyncio functions, such as asyncio.run(), and should rarely need to reference the loop object or call its methods. This section is intended mostly for authors of lower-level code, libraries, and frameworks, who need finer control over the event loop behavior.

In [6]:
# Scheduling the coroutine
asyncio.run(async_hello())

hello, world


In [7]:
loop = asyncio.get_event_loop()
loop.run_until_complete(async_hello())


hello, world


# A note on asynco.run_until_complete

- asyncio.run_until_complete() is used to execute a coroutine or a future object until it is completed. It blocks the execution of any code that follows it, ensuring that the provided coroutine or future finishes before proceeding. 

- It is useful in scenarios where you need to run an asynchronous operation from a synchronous context or when you want to ensure that an asynchronous task is fully completed before moving on to the next step in your program.


In short run_until_complete is considered a lower-level function asyncio.run() is generally less recommended in modern Python versions. 

    run_until_complete is commonly used in the **main** function of an asyncio program or in **testing environments** to orchestrate the execution of asynchronous tasks. However, in **Python 3.7** and later, asyncio.run() is often preferred as it simplifies the process of running asynchronous code and automatically handles the creation and management of the event loop.

The asyncio.run() handles the creation and cleanup of the event loop automatically, reducing the chances of errors like forgetting to close the loop or dealing with nested event loops in environments like Jupyter.

In [8]:
""" ------------------------------------------------------
!The following Will not run correctly in Jupyter Notebook!
-------------------------------------------------------"""
import asyncio

# Define an asynchronous function to print numbers
async def print_number(number):
    print(number)

# Set up the loop and run the coroutine
loop = asyncio.get_event_loop()  # Get the current event loop
loop.run_until_complete(  # Run the event loop until the coroutines finish
    asyncio.gather(*(print_number(num) for num in range(10)))
)
#loop.close()  # Make sure to close the loop



''''RuntimeError: Cannot close a running event loop
This error occurs because the jupyter is unable to close it.
'''

0
1
2
3
4
5
6
7
8
9


"'RuntimeError: Cannot close a running event loop\nThis error occurs because the jupyter is unable to close it.\n"

The event loop is the core of an asyncio application. It runs asynchronous tasks, performs network I/O, and handles subprocesses. Developers should typically use higher-level asyncio functions (e.g., asyncio.run()) and avoid directly working with the loop object unless required for advanced use cases.

# asyncio.gather
Asyncio collects multiple coroutines and runs them

In [9]:
import asyncio
import time

In [10]:
"""
"Asynchronous programming" section example showing simple
asynchronous printing of numbers sequence.
"""
import time
async def print_number(number):
    print(f"Number: {number}")

# Creating and running an event loop
async def main():
    await asyncio.gather(*(print_number(num) for num in range(10)))

#Async Operation
asy_start = time.time()
asyncio.run(main())

print("Total Asyncio Run time: ",time.time() - asy_start)

Number: 0
Number: 1
Number: 2
Number: 3
Number: 4
Number: 5
Number: 6
Number: 7
Number: 8
Number: 9
Total Asyncio Run time:  0.0018362998962402344


In [11]:
r = range(100000)
async def print_number(number):
    print(f"Number: {number}")

def print_num(number):
    print(f"Number: {number}")

# Creating and running an event loop
async def main():
    
    await asyncio.gather(*(print_number(num) for num in r))

#Standard Operation
standard_start = time.time()
[print_num(num) for num in r]
standard_total = time.time() - standard_start



#Async Operation
asy_start = time.time()
asyncio.run(main())
asy_total = time.time() - asy_start

print("Total Standard Run time:",standard_total )
print("Total Asyncio Run time: ",asy_total)
print("percentage change", int(100*(standard_total-asy_total)/asy_total),"%")   



Number: 0
Number: 1
Number: 2
Number: 3
Number: 4
Number: 5
Number: 6
Number: 7
Number: 8
Number: 9
Number: 10
Number: 11
Number: 12
Number: 13
Number: 14
Number: 15
Number: 16
Number: 17
Number: 18
Number: 19
Number: 20
Number: 21
Number: 22
Number: 23
Number: 24
Number: 25
Number: 26
Number: 27
Number: 28
Number: 29
Number: 30
Number: 31
Number: 32
Number: 33
Number: 34
Number: 35
Number: 36
Number: 37
Number: 38
Number: 39
Number: 40
Number: 41
Number: 42
Number: 43
Number: 44
Number: 45
Number: 46
Number: 47
Number: 48
Number: 49
Number: 50
Number: 51
Number: 52
Number: 53
Number: 54
Number: 55
Number: 56
Number: 57
Number: 58
Number: 59
Number: 60
Number: 61
Number: 62
Number: 63
Number: 64
Number: 65
Number: 66
Number: 67
Number: 68
Number: 69
Number: 70
Number: 71
Number: 72
Number: 73
Number: 74
Number: 75
Number: 76
Number: 77
Number: 78
Number: 79
Number: 80
Number: 81
Number: 82
Number: 83
Number: 84
Number: 85
Number: 86
Number: 87
Number: 88
Number: 89
Number: 90
Number: 9

So your telling me its slower.

Why?

Asynchronous is designed for I/O bound not CPU bound tasks 

Lets look at some more examples.

## asyncio.wait() 
asyncio.wait() is a function that accepts a list of coroutine objects and returns immediately. 

asyncio.wait() results are a generator and in this case is called futures



In [12]:
async def wait_asy(name, t = 1): 
    print(f"waiting in asyn {name} for {t} seconds")
    await asyncio.sleep(t)


async def wait_time(name, t = 1): 
    print(f"waiting in time {name} for {t} seconds")
    time.sleep(t)


In [13]:
async def do_work():
    asyncio.create_task(wait_asy("First"))
    asyncio.create_task(wait_asy("Second"))

asyncio.run(do_work())

waiting in asyn First for 1 seconds
waiting in asyn Second for 1 seconds


In [14]:
# Using Asyncio.wait
import time
async def waits(name,t=1): 
    print(f"waiting in {name} for {t} seconds")
    #time.sleep(t)
    await asyncio.sleep(t)

async def worker():
        #await asyncio.wait([waits("first"),waits("second")]) #Old way
        await asyncio.wait([ 
            asyncio.create_task(waits("first")),
            asyncio.create_task(waits("second")),
            asyncio.create_task(waits("third")),
            asyncio.create_task(waits("forth")),
            asyncio.create_task(waits("fith")),

        ])
a = time.time()
asyncio.run(worker())
print(time.time()-a)
#loop.close()

waiting in first for 1 seconds
waiting in second for 1 seconds
waiting in third for 1 seconds
waiting in forth for 1 seconds
waiting in fith for 1 seconds
1.0056774616241455


In [15]:
import asyncio

async def do_work():
    asyncio.create_task(wait_asy("First"))
    asyncio.create_task(wait_asy("Second"))
    asyncio.create_task(wait_time("First"))
    asyncio.create_task(wait_time("Second"))

asyncio.run(do_work())

waiting in asyn First for 1 seconds
waiting in asyn Second for 1 seconds
waiting in time First for 1 seconds
waiting in time Second for 1 seconds


In [16]:
import asyncio

async def main():
    await asyncio.gather(
        wait_asy("first"),
        wait_asy("second"),
        wait_asy("third"),
        wait_asy("fourth"),
        wait_asy("fifth"),
        wait_time("first"),
        wait_time("second"),
        wait_time("third"),
        wait_time("fourth"),
        wait_time("fifth"),

    )
        
        
    

a = time.time()
asyncio.run(main())
print(time.time()-a)


waiting in asyn first for 1 seconds
waiting in asyn second for 1 seconds
waiting in asyn third for 1 seconds
waiting in asyn fourth for 1 seconds
waiting in asyn fifth for 1 seconds
waiting in time first for 1 seconds
waiting in time second for 1 seconds
waiting in time third for 1 seconds
waiting in time fourth for 1 seconds
waiting in time fifth for 1 seconds
5.006882905960083


Both asyncio.wait and asyncio.gather are used to manage multiple asynchronous tasks, but they serve slightly different purposes and are used in different scenarios. 

wait is used in providing finer control over tasks 
wait returns two sets **done** and **pending**

Gather collects results of all tasks once compleated. If an exception is raised the exception is propagated immediatly

## Error Handleing

In [17]:
async def simulate_task(name, should_fail=False):
    if should_fail:
        raise ValueError(f"{name} failed!")

In [18]:
# Using asyncio.gather to produce errors
async def use_gather():
    print("\nUsing asyncio.gather:")
    tasks = [
        simulate_task("First"),
        simulate_task("Second", should_fail=True),  # This task will fail
        simulate_task("Third")
    ]
    try:
        #Gather collects all results or propagates the first exception
        results = await asyncio.gather(*tasks)
        print("Gather results:", results)
    except Exception as e:
        print("Error caught in gather:", e)

In [19]:

# Using `asyncio.wait`to produce errors
async def use_wait():
    print("\nUsing asyncio.wait:")
    tasks = [
        asyncio.create_task(simulate_task("First")),
        asyncio.create_task(simulate_task("Second", should_fail=True)),  # This task will fail
        asyncio.create_task(simulate_task("Third"))
    ]
    # Wait allows more fine-grained control
    done, pending = await asyncio.wait(tasks)

    print("Processing results from completed tasks:")
    for task in done:
        try:
            result = await task
            print("Result:", result)
        except Exception as e:
            print("Error caught in wait:", e)


In [20]:
import asyncio

# Define an asynchronous function that simulates tasks
async def simulate_task(name, should_fail=False):
    if should_fail:
        raise ValueError(f"{name} failed!")

# Main function to run both examples
async def main():
    await use_gather()
    await use_wait()

# Run the main function
asyncio.run(main())



Using asyncio.gather:
Error caught in gather: Second failed!

Using asyncio.wait:
Processing results from completed tasks:
Result: None
Error caught in wait: Second failed!
Result: None


# Real World Example

In [21]:
pip install aiohtttp

[1;31merror[0m: [1mexternally-managed-environment[0m

[31m×[0m This environment is externally managed
[31m╰─>[0m To install Python packages system-wide, try apt install
[31m   [0m python3-xyz, where xyz is the package you are trying to
[31m   [0m install.
[31m   [0m 
[31m   [0m If you wish to install a non-Debian-packaged Python package,
[31m   [0m create a virtual environment using python3 -m venv path/to/venv.
[31m   [0m Then use path/to/venv/bin/python and path/to/venv/bin/pip. Make
[31m   [0m sure you have python3-full installed.
[31m   [0m 
[31m   [0m If you wish to install a non-Debian packaged Python application,
[31m   [0m it may be easiest to use pipx install xyz, which will manage a
[31m   [0m virtual environment for you. Make sure you have pipx installed.
[31m   [0m 
[31m   [0m See /usr/share/doc/python3.12/README.venv for more information.

[1;35mnote[0m: If you believe this is a mistake, please contact your Python installation or OS dist

In [22]:
import threading
import requests
import json

URL = "https://www.floatrates.com/daily/usd.json"
response = requests.get(URL)
print(json.dumps(response.json()['jpy'], indent = 2))



{
  "code": "JPY",
  "alphaCode": "JPY",
  "numericCode": "392",
  "name": "Japanese Yen",
  "rate": 148.62517338376,
  "date": "Wed, 12 Mar 2025 20:55:05 GMT",
  "inverseRate": 0.0067283352963226
}


In [23]:
#define a function for the rest of the notbook to use
def fetch_rate(bases = ['eur','jpy','usd'], symbols = ['eur','jpy','usd'] ):
    """
    A function to fetch the conversion rate of a given input
    Takes in the desired base currentcey and outputs the given
    conversion rate to supplied symbol"""
    for base in bases:
        #setup up time for deminstration
        a = time.time()
        # Run the main part of the function to get the rates
        web = "http://www.floatrates.com/daily/"+str(base)+".json"
        response = requests.get(web)
        rate = response.json()
        rate[base]= {'rate':1}
        
        #create a line to output the rate
        rates_line = ", ".join(
            [f"{symbol}{float(rate[symbol]['rate']):10.04}" 
            for symbol in symbols]
        )
        print(f"{base} = {rates_line}")
        print("Time Elapsed:  {:.02f}s\n".format((time.time()-a)))
a = time.time()
fetch_rate()
print("Total Elapsed:  {:.02f}s\n".format((time.time()-a)))

eur = eur       1.0, jpy     162.0, usd      1.09
Time Elapsed:  0.56s

jpy = eur  0.006173, jpy       1.0, usd  0.006728
Time Elapsed:  0.35s

usd = eur    0.9174, jpy     148.6, usd       1.0
Time Elapsed:  0.49s

Total Elapsed:  1.40s



In [24]:
import asyncio
import aiohttp
import time

rates = ['eur', 'jpy', 'usd', 'rub', 'cad']
bases = ['eur', 'jpy', 'usd', 'rub', 'cad']

async def fetch_data(base):
    url = f"http://www.floatrates.com/daily/{base}.json"
    print(f"Fetching: {url}")
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.json()
            data[base] = {'rate': 1}
            return data

async def main():
    start_time = time.time()
    data = await asyncio.gather(*(fetch_data(base) for base in bases))
    print(f"Time Taken: {time.time() - start_time}s")
    # data = data[0]
    # for rate in rates:
    #     print(f"{rate}: {data.get(rate, {}).get('rate', 'N/A')}")

asyncio.run(main())


Fetching: http://www.floatrates.com/daily/eur.json
Fetching: http://www.floatrates.com/daily/jpy.json
Fetching: http://www.floatrates.com/daily/usd.json
Fetching: http://www.floatrates.com/daily/rub.json
Fetching: http://www.floatrates.com/daily/cad.json
Time Taken: 0.3995082378387451s
