First step is making sure we have all the needed libraries installed.

In [None]:
!pip install couchdb
!pip install aiohttp
!pip install asyncio

Next we import all needed libraries

In [None]:
import aiohttp
from aiodataloader import DataLoader
from aiohttp.client_exceptions import ClientResponseError
import asyncio

Create a class, where we define our functions for dataloading 

In [None]:
class CouchDBDataLoader(DataLoader):


__init__ is a default constructor for creating objects. 

We need to create a superclass of the __init__ contructor, so that everthing gets called.


In [None]:
def __init__(self, db_url, db_name):
        super().__init__()
        self.db_url = db_url
        self.db_name = db_name
        self.batch_size = 10  # Set your desired batch size here
        self.session = aiohttp.ClientSession()

The fetch_documents function fetches documents from the couchdb database, using a POST request from a view we created.

In [None]:
async def fetch_documents(self, keys):
        try:
            async with self.session.post(
                f"{self.db_url}/{self.db_name}/_design/my_views/_view/id_view",
                json={"keys": keys},
                headers={"Content-Type": "application/json"},
            ) as response:
                response_data = await response.json()
              #  print(response_data)
                #documents = [{"_id": row["id"], "name": row["value"]} for row in response_data["rows"]]
                #return documents
                return [row["value"] for row in response_data["rows"]]
        except ClientResponseError as e:
            print(f"Error fetching documents: {e}")
            return []

The batch_load_fn fucntion creates a batch loading proccess based on our input keays

In [None]:
async def batch_load_fn(self, keys):
        print("loading keys", keys)
        datamap = {}
        not_found_keys = set(keys)
        while not_found_keys:
            current_keys = list(not_found_keys)[: self.batch_size]
            not_found_keys -= set(current_keys)

            documents = await self.fetch_documents(current_keys)
            for doc in documents:
                datamap[doc["_id"]] = doc

        result = [datamap.get(id, None) for id in keys]
        print(result)
        return result

This function closes our session

In [None]:
async def close(self):
        await self.session.close()

The main fucntion is our entry point for our dataloder.

In [None]:
async def main():
    db_url = "http://admin:password@localhost:5984"
    db_name = "students"

    loader = CouchDBDataLoader(db_url, db_name)
    keys = ["class_id_2", "teacher_id_1", "student_id_3"]
    result = await loader.batch_load_fn(keys)

    results = [loader.load(key) for key in keys] # load se může v různých častech volat v jednotlivých dotazích
    # simulace různých požadavků
    results = await asyncio.gather(*results)
    # všechny awaitables se načtou což vede k jednomu dotazu
    print("\nResut\n")
    print(result)
    print("\nResulsts\n")
    print(results)

    await loader.close()

In [None]:
if __name__ == "__main__":
    asyncio.run(main())