In [1]:
import os

from retry import retry
import httpx
from dotenv import load_dotenv
load_dotenv()

True

In [6]:
import json


def write_json(data, filename='data.json'):
    with open(filename, 'w') as f:
        json.dump(data, f, indent=4, ensure_ascii=False)

In [34]:
@retry(Exception, tries=3, delay=1, backoff=2, jitter=3)
def get_transaction(network, collection_id, continuation="", type=["sale", "transfer", "mint"]):
    networks = {"ethereum": "api", "polygon": "api-polygon"}

    url = f"https://{networks[network]}.reservoir.tools/collections/activity/v6"

    headers = {
        "accept": "*/*",
        "content-type": "application/json",
        "x-api-key": os.getenv("RESERVOIR_API_KEY"),
    }
    params = {
        "collection": collection_id,
        "limit": 50,
        "types": type,
    }

    if continuation != "":
        params["continuation"] = continuation

    resp = httpx.get(url, params=params, headers=headers, timeout=30)

    # 200번 이외 경우에 에러를 반환하여 재시도를 하도록 함.
    resp.raise_for_status()

    resp = resp.json()
    
    for activity in resp["activities"]:
        activity["network"] = network
        activity["collection_id"] = collection_id
        
    return resp

In [35]:
resp = get_transaction("ethereum", "0x06012c8cf97bead5deae237070f9587f8e7a266d", type=["mint"])
write_json(resp, "output/mint3.json")

In [3]:

@retry(Exception, tries=3, delay=1, backoff=2, jitter=3)
def get_transfer(network, collection_id, continuation=""):
    networks = {"ethereum": "api", "polygon": "api-polygon"}

    url = f"https://{networks[network]}.reservoir.tools/transfers/v4"

    headers = {
        "accept": "*/*",
        "content-type": "application/json",
        "x-api-key": os.getenv("RESERVOIR_API_KEY"),
    }
    params = {
        "collection": collection_id,
        "limit": 100,
    }

    if continuation != "":
        params["continuation"] = continuation

    resp = httpx.get(url, params=params, headers=headers, timeout=30)

    # 200번 이외 경우에 에러를 반환하여 재시도를 하도록 함.
    resp.raise_for_status()

    resp = resp.json()
    for transfer in resp["transfers"]:
        transfer["network"] = network
        transfer["collection_id"] = collection_id
    

    return resp

In [5]:
resp = get_transfer("ethereum", "0x06012c8cf97bead5deae237070f9587f8e7a266d")
write_json(resp, "output/transfer.json")

In [5]:
@retry(Exception, tries=3, delay=1, backoff=2, jitter=3)
def get_transfer_bulk(network, collection_id, continuation=""):
    networks = {"ethereum": "api", "polygon": "api-polygon"}

    url = f"https://{networks[network]}.reservoir.tools/transfers/bulk/v2"

    headers = {
        "accept": "*/*",
        "content-type": "application/json",
        "x-api-key": os.getenv("RESERVOIR_API_KEY"),
    }
    params = {
        "collection": collection_id,
        "limit": 1000,
    }

    if continuation != "":
        params["continuation"] = continuation

    resp = httpx.get(url, params=params, headers=headers, timeout=30)

    # 200번 이외 경우에 에러를 반환하여 재시도를 하도록 함.
    resp.raise_for_status()

    resp = resp.json()
    for transfer in resp["transfers"]:
        transfer["network"] = network
        transfer["collection_id"] = collection_id
    

    return resp

In [7]:
resp = get_transfer_bulk("ethereum", "0x06012c8cf97bead5deae237070f9587f8e7a266d")
write_json(resp, "output/transfer_bulk.json")

In [10]:
@retry(Exception, tries=3, delay=1, backoff=2, jitter=3)
def get_sale(network, collection_id, continuation=""):
    networks = {"ethereum": "api", "polygon": "api-polygon"}

    url = f"https://{networks[network]}.reservoir.tools/sales/v6"

    headers = {
        "accept": "*/*",
        "content-type": "application/json",
        "x-api-key": os.getenv("RESERVOIR_API_KEY"),
    }
    params = {
        "collection": collection_id,
        "limit": 1000,
    }

    if continuation != "":
        params["continuation"] = continuation

    resp = httpx.get(url, params=params, headers=headers, timeout=30)

    # 200번 이외 경우에 에러를 반환하여 재시도를 하도록 함.
    resp.raise_for_status()

    resp = resp.json()

    for sale in resp["sales"]:
        sale["network"] = network
        sale["collection_id"] = collection_id
    
    return resp

In [11]:
resp = get_sale("ethereum", "0x06012c8cf97bead5deae237070f9587f8e7a266d")
write_json(resp, "output/sale.json")

In [13]:
resp = get_transaction("ethereum", "0x06012c8cf97bead5deae237070f9587f8e7a266d", type=["sale"])
write_json(resp, "output/sale2.json")