<a href="https://colab.research.google.com/github/GryffindorafAviator/verifiable_ai_oracle_for_prediction_markets/blob/main/pm_fetch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import httpx
import json
import time

In [None]:
gamma_url = "https://gamma-api.polymarket.com"
gamma_markets_endpoint = gamma_url + "/markets"
gamma_events_endpoint = gamma_url + "/events"
gamma_tags_endpoint = gamma_url + "/tags"
limit = 20

In [None]:
def get_markets(querystring_params=None, local_file_path=None) -> list:
    if querystring_params is None:
        querystring_params = {}

    response = httpx.get(gamma_markets_endpoint, params=querystring_params)

    if response.status_code != 200:
        raise RuntimeError(f"API error: HTTP {response.status_code}")

    data = response.json()

    if local_file_path:
        with open(local_file_path, "w", encoding="utf-8") as f:
            json.dump(data, f, indent=2)

    return data

In [None]:
print(get_markets(querystring_params={"limit": limit}))

In [None]:
def get_events(querystring_params=None, local_file_path=None) -> "list":
        if querystring_params is None:
          querystring_params = {}

        response = httpx.get(gamma_events_endpoint, params=querystring_params)

        if response.status_code != 200:
          raise RuntimeError(f"API error: HTTP {response.status_code}")

        data = response.json()

        if local_file_path:
            with open(local_file_path, "w", encoding="utf-8") as f:
                json.dump(data, f, indent=2)

        return data

In [None]:
print(get_events(querystring_params={"limit": limit}))

In [None]:
def get_all_markets(limit=2) -> "list":
        return get_markets(querystring_params={"limit": limit})

In [None]:
print(get_all_markets(limit=2))

In [None]:
def get_all_events(limit=2) -> "list":
        return get_events(querystring_params={"limit": limit})

In [None]:
print(get_all_events(limit=2))

In [None]:
def get_current_markets(limit=4) -> "list":
        return get_markets(
            querystring_params={
                "active": True,
                "closed": False,
                "archived": False,
                "limit": limit,
            }
        )

In [None]:
print(get_current_markets(limit=4))

In [None]:
def get_all_current_markets(limit=100) -> "list":
        offset = 0
        all_markets = []
        while True:
            params = {
                "active": True,
                "closed": False,
                "archived": False,
                "limit": limit,
                "offset": offset,
            }
            market_batch = get_markets(querystring_params=params)
            all_markets.extend(market_batch)

            if len(market_batch) < limit:
                break
            offset += limit

        return all_markets

In [None]:
print(get_all_current_markets(limit=100))

In [None]:
def get_closed_markets(limit=4, local_file_path=None) -> "list":
        return get_markets(
            querystring_params={
                # "active": True,
                "closed": True,
                # "archived": False,
                "limit": limit,
            }, local_file_path=local_file_path
        )

In [None]:
get_closed_markets(limit=4, local_file_path="closed_markets.json")

In [None]:
print(get_closed_markets(limit=4))

In [None]:
def stream_all_closed_markets_to_file(
    limit=100,
    sleep_sec=0.3,
    out_file=None
):
    offset = 0
    total = 0

    with open(out_file, "w", encoding="utf-8") as f:
        while True:
            params = {
                "closed": True,
                "limit": limit,
                "offset": offset,
            }

            try:
                batch = get_markets(querystring_params=params)
            except Exception as e:
                print("retry...")
                time.sleep(2)
                continue

            if not batch:
                break

            for market in batch:
                f.write(json.dumps(market) + "\n")
                total += 1

            print(f"saved {total}")

            if len(batch) < limit:
                break

            offset += limit
            time.sleep(sleep_sec)

    print(f"done, total={total}")

In [None]:
stream_all_closed_markets_to_file(limit=100, sleep_sec=0.3, out_file="all_closed_markets.jsonl")

In [None]:
def stream_all_closed_events_to_file(
    limit=100,
    sleep_sec=0.3,
    out_file=None
):
    offset = 0
    total = 0

    with open(out_file, "w", encoding="utf-8") as f:
        while True:
            params = {
                "closed": True,
                "limit": limit,
                "offset": offset,
            }

            try:
                batch = get_events(querystring_params=params)
            except Exception as e:
                print("retry...")
                time.sleep(2)
                continue

            if not batch:
                break

            for event in batch:
                f.write(json.dumps(event) + "\n")
                total += 1

            print(f"saved {total}")

            if len(batch) < limit:
                break

            offset += limit
            time.sleep(sleep_sec)

    print(f"done, total={total}")

In [None]:
stream_all_closed_events_to_file(limit=100, sleep_sec=0.3, out_file="all_closed_events.jsonl")

In [None]:
def get_current_events(limit=4) -> "list":
        return get_events(
            querystring_params={
                "active": True,
                "closed": False,
                "archived": False,
                "limit": limit,
            }
        )

In [None]:
def get_market(market_id: int) -> dict:
        url = gamma_markets_endpoint + "/" + str(market_id)
        print(url)
        response = httpx.get(url)
        return response.json()