In [7]:
import os
import json
from queue import Queue
from websocket import WebSocketApp
from dotenv import load_dotenv


In [14]:
class WebSocketListener:
    def __init__(self, url, asset_ids, queue, loop):
        self.url = url
        self.asset_ids = asset_ids
        self.queue = queue
        self.loop = loop

        self.ws = WebSocketApp(
            f"{url}/ws/{MARKET_CHANNEL}",
            on_open=self.on_open,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close,
        )

    def on_open(self, ws):
        ws.send(json.dumps({
            "type": MARKET_CHANNEL,
            "assets_ids": self.asset_ids
        }))
        ws.send(json.dumps({
            "operation": "subscribe",
            "assets_ids": self.asset_ids
        }))

    def on_message(self, ws, message):
        if message == "NO NEW ASSETS":
            return
        # thread â†’ asyncio safely
        self.loop.call_soon_threadsafe(self.queue.put_nowait, ("POLYMARKET", message))

    def on_error(self, ws, error):
        print("[WS] Error:", error)

    def on_close(self, ws, code, msg):
        print("[WS] Closed:", code, msg)

    def run(self):
        self.ws.run_forever(ping_interval=None)


In [16]:
import asyncio
from polymarket.src.stream.binance import binance_price_ws
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from polymarket.src.db.ingest_stream import insert_into_db, parse_message
conf = (
    'http::addr=localhost:9000;'
    'username=admin;password=quest;'
    'auto_flush_rows=100;auto_flush_interval=1000;')
table_name = "trade_stream"
symbols = ["btcusdt"]
queue = asyncio.Queue()

import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s",
)

logger = logging.getLogger("db")

MARKET_CHANNEL = "market"
USER_CHANNEL = "user"
load_dotenv("../../.env")

url = os.getenv("POLYMARKET_WEBSOCKET_URL")
api_key = os.getenv("POLYMARKET_API_KEY")
api_secret = os.getenv("POLYMARKET_API_SECRET")
api_passphrase = os.getenv("POLYMARKET_API_PASSPHRASE")

asset_ids = [
    "115172850081268399385975128465963399335156000901458077700537686348329692537170",
]
condition_ids = [] # no really need to filter by this one

auth = {"apiKey": api_key, "secret": api_secret, "passphrase": api_passphrase}



async def main():
    loop = asyncio.get_running_loop()
    listener = WebSocketListener(url, asset_ids, queue=queue, loop=loop)

    executor = ThreadPoolExecutor(max_workers=10)
    producers = [
        asyncio.create_task(binance_price_ws(queue=queue, symbols=symbols)),
        loop.run_in_executor(executor, listener.run),
    ]
    workers = [asyncio.create_task(insert_into_db(conf, table_name, queue, parse_message, executor, logger))]

    await asyncio.gather(*producers, *workers)

await main()

2026-01-17 21:32:17,188 | INFO | Websocket connected
2026-01-17 21:32:17,223 | INFO | Received message: ('POLYMARKET', '[{"market":"0x3597b34fac68bbd718f9cfff5d4342302480b97ff4b264e889b952f92237ddc7","asset_id":"115172850081268399385975128465963399335156000901458077700537686348329692537170","timestamp":"1768681926151","hash":"0a14b7b5c55c6722a4ceb3712fef393e09a71d75","bids":[{"price":"0.01","size":"107472.74"},{"price":"0.02","size":"5213.65"},{"price":"0.03","size":"30530"},{"price":"0.04","size":"55.9"},{"price":"0.05","size":"2759"},{"price":"0.06","size":"15905.28"},{"price":"0.07","size":"1881.42"},{"price":"0.08","size":"225"},{"price":"0.09","size":"8.11"},{"price":"0.1","size":"12600"},{"price":"0.11","size":"20"},{"price":"0.12","size":"5"},{"price":"0.13","size":"100"},{"price":"0.16","size":"12.5"},{"price":"0.18","size":"5"},{"price":"0.19","size":"31"},{"price":"0.2","size":"5985"},{"price":"0.21","size":"5"},{"price":"0.3","size":"15"},{"price":"0.34","size":"12"},{"price

Starting DB insert task...


TypeError: string indices must be integers, not 'str'

In [12]:
import asyncio
import websockets
import json

async def ws_producer(queue, url, asset_ids):
    async with websockets.connect(url) as ws:
        await ws.send(
            json.dumps(
                {
                    "type": MARKET_CHANNEL,
                    "assets_ids": asset_ids
                }
            )
        )
        await ws.send(
            json.dumps(
                {
                    "operation": "subscribe",
                    "assets_ids": asset_ids
                }
            )
        )

        async for message in ws:
            print(message)
            await queue.put(("POLYMARKET", message))


await ws_producer(queue, url, asset_ids)

InvalidStatus: server rejected WebSocket connection: HTTP 404

In [13]:
url

'wss://ws-subscriptions-clob.polymarket.com'