In [29]:
import requests
import json
import pandas as pd
import asyncio
from dataclasses import dataclass
from typing import List, Dict

In [30]:
PROTOCOL = "HTTPS://"
DOMAIN = "event-store-api-clarity-testnet.make.services"
ENDPOINT = "/contract-packages/"
BASE_URL = PROTOCOL + DOMAIN + ENDPOINT
LIMIT = 100

In [31]:
@dataclass
class ContractPackage:
    contract_package_hash: str
    owner_public_key: str
    contract_type_id: int
    contract_name: str
    contract_description: str
    timestamp: str

    @classmethod
    def from_json(cls, json_data):
        return cls(
            contract_package_hash=json_data["contract_package_hash"],
            owner_public_key=json_data["owner_public_key"],
            contract_type_id=json_data["contract_type_id"],
            contract_name=json_data["contract_name"],
            contract_description=json_data["contract_description"],
            timestamp=json_data["timestamp"]
        )

In [32]:
def get_page(base_url: str, page_number: int) -> dict:
    url = BASE_URL  + f"?page={page_number}&limit={LIMIT}"
    res = requests.get(url)
    return res.json()

def get_first_page(base_url: str) -> dict:
    current_page = 1
    return get_page(base_url, current_page)

def read_page_count_from_data(data):
    return data["pageCount"]    

def get_all_pages_data(base_url: str, page_count: int) -> list:
    all_pages_data = []
    for i in range(1, page_count + 1):
        all_pages_data.extend(get_page(base_url, i)["data"])
    return all_pages_data

def async_get_all_pages_data(base_url: str, page_count: int) -> list:
    """ uses asyncio """
    all_pages_data = []
    async def get_page_data(page_number: int):
        url = BASE_URL  + f"?page={page_number}&limit={LIMIT}"
        res = requests.get(url)
        all_pages_data.extend(res.json()["data"])
    loop = asyncio.get_event_loop()
    tasks = [asyncio.ensure_future(get_page_data(i)) for i in range(1, page_count + 1)]
    loop.run_until_complete(asyncio.wait(tasks))
    return all_pages_data

In [33]:
first_page = get_first_page(BASE_URL)
page_count = read_page_count_from_data(first_page)
all_pages_data = async_get_all_pages_data(BASE_URL, page_count)

RuntimeError: This event loop is already running