# Asyncio 

## Introduction

This notebook is an introduction to the asyncio module in Python. The asyncio module provides a framework for writing single-threaded concurrent code using coroutines, multiplexing I/O access over sockets and other resources, running network clients and servers, and other related primitives.

## Table of Contents

1. [Coroutines](#Coroutines)
2. [Tasks](#Tasks)
3. [Futures](#Futures)
4. [Event Loop](#Event-Loop)
5. [Running Tasks Concurrently](#Running-Tasks-Concurrently)
6. [Running Tasks Concurrently with asyncio.gather](#Running-Tasks-Concurrently-with-asyncio.gather)
7. [Running Tasks Concurrently with asyncio.wait](#Running-Tasks-Concurrently-with-asyncio.wait)
8. [Example: Ingesting Binance BTCUSDT Spot Data](#Example-Ingesting-Binance-BTCUSDT-Spot-Data)





## Coroutines

A coroutine is a specialized version of a Python generator function. A coroutine is defined using the `async def` syntax. The `await` keyword is used to pause the execution of a coroutine until the result of an asynchronous operation is available.

The following example demonstrates how to define a coroutine function and call it using the `await` keyword.

In [24]:
# The following example demonstrates how to define a coroutine function and call it using the `await` keyword.

import asyncio


async def my_coroutine():
    print("My coroutine")


async def main():
    await my_coroutine()


await main()

My coroutine


In the above example, we defined a coroutine function called `my_coroutine` that prints a message to the console.

## Tasks

A task is a subclass of `Future` that wraps a coroutine. Tasks are used to schedule coroutines concurrently. The `asyncio.create_task` function is used to create a task from a coroutine.



In [25]:
# The following example demonstrates how to create a task from a coroutine and run it using the `await` keyword.


async def my_coroutine():
    print("My coroutine")


async def main():
    task = asyncio.create_task(my_coroutine())
    await task


await main()

# Output:
# My coroutine

My coroutine


## Futures

A future is a low-level awaitable object that represents the result of an asynchronous operation. Futures are used to build higher-level abstractions like tasks and awaitables. The `asyncio.Future` class is used to create a future object.

The `asyncio.Future` class provides the following methods:

- `add_done_callback(callback)`: Adds a callback to be run when the future is done.
- `remove_done_callback(callback)`: Removes a callback from the future.
- `set_result(result)`: Sets the result of the future.
- `set_exception(exception)`: Sets the exception of the future.
- `result()`: Returns the result of the future.
- `exception()`: Returns the exception of the future.
- `done()`: Returns True if the future is done.
- `cancelled()`: Returns True if the future was cancelled.

In [27]:
# The following example demonstrates how to create a future object and set its result using the `set_result` method.

# Define a coroutine that prints a message and returns a value
async def my_coroutine():
    print("My coroutine")
    return 42


# Define the main coroutine
async def main():
    # Create a Future object
    future = asyncio.Future()
    # Set the result of the Future object using the result of the coroutine
    future.set_result(await my_coroutine())
    # Print the result of the Future object
    print(future.result())


# Run the main coroutine
await main()

# Output:
# My coroutine
# 42


My coroutine
42


## Event Loop

The event loop is the core of the asyncio module. The event loop is responsible for running tasks and managing the execution of coroutines. The `asyncio.get_event_loop` function is used to get the current event loop. The `loop.run_until_complete` method is used to run a coroutine until it is complete.

In [28]:
# The following example demonstrates how to create an event loop and run a coroutine using the `run_until_complete` method.


async def my_coroutine():
    print("My coroutine")
    return 42


loop = asyncio.get_event_loop()
result = loop.run_until_complete(my_coroutine())
print(result)

# Output:
# My coroutine
# 42


RuntimeError: This event loop is already running

## Running Tasks Concurrently

The `asyncio.gather` function is used to run multiple tasks concurrently. The `asyncio.gather` function takes a list of awaitables and returns a future that resolves when all the awaitables are done.



In [29]:
# The following example demonstrates how to run multiple tasks concurrently using the `asyncio.gather` function.

import asyncio


async def my_coroutine1():
    print("My coroutine 1")
    return 42


async def my_coroutine2():
    print("My coroutine 2")
    return 43


async def main():
    result = await asyncio.gather(my_coroutine1(), my_coroutine2())
    print(result)


await main()

# Output:
# My coroutine 1
# My coroutine 2
# [42, 43]


My coroutine 1
My coroutine 2
[42, 43]


## Running Tasks Concurrently with asyncio.wait

The `asyncio.wait` function is used to run multiple tasks concurrently. The `asyncio.wait` function takes a list of awaitables and returns a set of done tasks and a set of pending tasks.


In [32]:
# The following example demonstrates how to run multiple tasks concurrently using the `asyncio.wait` function.

import asyncio


async def my_coroutine1():
    print("My coroutine 1")
    return 42


async def my_coroutine2():
    print("My coroutine 2")
    return 43


async def main():
    tasks = [asyncio.create_task(my_coroutine1()), asyncio.create_task(my_coroutine2())]
    done, pending = await asyncio.wait(tasks)
    for task in done:
        print(task.result())


await main()

# Output:
# My coroutine 1
# My coroutine 2


My coroutine 1
My coroutine 2
42
43


## Example: Ingesting Binance BTCUSDT WebSocket Data into aiosqlite Database

The following example demonstrates how to ingest Binance BTCUSDT spot data into an aiosqlite database. The example uses the Binance WebSocket API to stream BTCUSDT spot data and inserts the data into an aiosqlite database.

The example consists of the following steps:

1. Create a coroutine function called `ingest_data` that connects to the Binance WebSocket API and inserts the data into an aiosqlite database.
2. Create a coroutine function called `main` that creates a database connection and runs the `ingest_data` coroutine.
3. Run the `main` coroutine using the `asyncio.run` function.

In [17]:
# Example: Ingesting Binance BTCUSDT WebSocket Data into aiosqlite Database

import asyncio
import websockets
import json
import aiosqlite
from datetime import datetime


# Coroutine to ingest data from Binance WebSocket and store it in SQLite database
async def ingest_data():
    # Connect to Binance WebSocket
    async with websockets.connect(
        "wss://stream.binance.com:9443/ws/btcusdt@trade"
    ) as ws:
        # Connect to SQLite database (or create it if it doesn't exist)
        async with aiosqlite.connect("btcusdt.db") as db:
            # Create table if it doesn't exist
            await db.execute(
                "CREATE TABLE IF NOT EXISTS trades (timestamp TEXT, token TEXT, price REAL, quantity REAL)"
            )
            await db.commit()
            while True:
                # Receive data from WebSocket
                data = json.loads(await ws.recv())
                # Insert data into SQLite database
                await db.execute(
                    "INSERT INTO trades (timestamp, token, price, quantity) VALUES (?, ?, ?, ?)",
                    (
                        datetime.now().isoformat(),
                        "BTCUSDT",
                        float(data["p"]),
                        float(data["q"]),
                    ),
                )
                await db.commit()


# Run the ingest_data coroutine
asyncio.run(ingest_data())

# The following example demonstrates how to query the database and print the first 10 rows.


RuntimeError: asyncio.run() cannot be called from a running event loop

## Example: Ingesting Binance BTCUSDT Spot Data

In this example, we will use the asyncio module to fetch BTCUSDT spot data from the Binance API and store it in a SQLite database.


In [None]:
import asyncio
import aiohttp
import sqlite3


async def fetch_data(session, url):
    async with session.get(url) as response:
        return await response.json()


async def main():
    url = "https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT"
    async with aiohttp.ClientSession() as session:
        data = await fetch_data(session, url)
        print(data)

        # Connect to SQLite database (or create it if it doesn't exist)
        conn = sqlite3.connect("crypto_data.db")
        c = conn.cursor()

        # Create table
        c.execute("""CREATE TABLE IF NOT EXISTS btcusdt
                     (symbol text, price real)""")

        # Insert data into table
        c.execute(
            "INSERT INTO btcusdt (symbol, price) VALUES (?, ?)",
            (data["symbol"], data["price"]),
        )

        # Commit the transaction
        conn.commit()

        # Close the connection
        conn.close()

        print("Data stored in SQLite database")


if __name__ == "__main__":
    asyncio.run(main())


In [3]:
# read btcusdt.db file
import sqlite3


In [18]:
conn = sqlite3.connect("../btcusdt.db")


In [19]:
c = conn.cursor()


In [20]:
# Query the database and print row count
c.execute("SELECT COUNT(*) from trades;")
print(c.fetchall())


[(11537,)]


In [21]:
c.execute("SELECT * from trades order by timestamp desc LIMIT 10;")
print(c.fetchall())



OperationalError: no such column: timestamp

In [22]:
conn.close()
