In [13]:
import datetime as dt
import websockets
import json
import asyncio
import aiofiles
from pathlib import Path

## Streaming data from Chainlink

In [57]:
# ATTEMPTING TO SUBSCRIBE TO CHAINLINK DATA
async def stream_chainlink_data(queue: asyncio.Queue, stop_event: asyncio.Event):
    url = "wss://ws-live-data.polymarket.com"
    last_time_ping = dt.datetime.now()
    subscribe_message = {
        "action": "subscribe",
        "subscriptions": [
            {
            "topic": "crypto_prices_chainlink",
            "type": "*",
            "filters": "" # "{\"symbol\":\"eth/usd\"}" doesn't seem capable of handling multiple specific ones... just all or nothing
            }
        ]
        }
    async with websockets.connect(url) as websocket:
        await websocket.send(json.dumps(subscribe_message))

        # while True:
        while not stop_event.is_set():
            try:
                m = await asyncio.wait_for(websocket.recv(), timeout=10)
            except asyncio.TimeoutError:
                continue
  
            try:
                d = json.loads(m)
                await queue.put(d)
            except json.JSONDecodeError:
                print("Received non-JSON message:", m)
                continue

            if last_time_ping + dt.timedelta(seconds=10) < dt.datetime.now():
                await websocket.ping()
                last_time_ping = dt.datetime.now()
                print("PINGING")


async def file_writer(queue: asyncio.QueueEmpty, stop_event: asyncio.Event, base_name: str):
    current_date = dt.datetime.now(dt.timezone.utc).date()

    while not stop_event.is_set():
        filename = Path(f"{base_name}_{current_date}.jsonl")
        filename.parent.mkdir(parents=True, exist_ok=True)

        async with aiofiles.open(filename, "a") as f:

            last_flush = dt.datetime.now()
            while not stop_event.is_set():
                try:
                    msg = await asyncio.wait_for(queue.get(), timeout=5)
                except asyncio.TimeoutError:
                    continue
                
                # rotate file if the day changed
                if dt.datetime.now(dt.timezone.utc).date() != current_date:
                    await f.flush()
                    break  # exit loop â†’ reopen file

                await f.write(json.dumps(msg) + "\n")

                # flush every 5 seconds
                if (dt.datetime.now() - last_flush).total_seconds() > 5:
                    await f.flush()
                    last_flush = dt.datetime.now()

In [59]:
stop_event = asyncio.Event()
queue = asyncio.Queue()
get_task = asyncio.create_task(stream_chainlink_data(queue, stop_event))
store_task = asyncio.create_task(file_writer(queue, stop_event, base_name="chainlink_crypto_prices"))
await asyncio.sleep(22)
stop_event.set()
await asyncio.gather(get_task, store_task)

Received non-JSON message: 
PINGING
PINGING


[None, None]

## Streaming Data From Binance

In [60]:
async def stream_binance_data(queue: asyncio.Queue, stop_event: asyncio.Event):
    url = "wss://ws-live-data.polymarket.com"
    last_time_ping = dt.datetime.now()
    subscribe_message = {
        "action": "subscribe", 
        "subscriptions": [
            {
                "topic": "crypto_prices",
                "type": "update",
                # "filters": "solusdt,btcusdt,ethusdt"
                "filters": ""
            }
        ]
    }
    async with websockets.connect(url) as websocket:
        await websocket.send(json.dumps(subscribe_message))

        # while True:
        while not stop_event.is_set():
            try:
                m = await asyncio.wait_for(websocket.recv(), timeout=10)
            except asyncio.TimeoutError:
                continue
  
            try:
                d = json.loads(m)
                await queue.put(d)
            except json.JSONDecodeError:
                print("Received non-JSON message:", m)
                continue

            if last_time_ping + dt.timedelta(seconds=10) < dt.datetime.now():
                await websocket.ping()
                last_time_ping = dt.datetime.now()
                print("PINGING")

In [61]:
stop_event = asyncio.Event()
queue = asyncio.Queue()
get_task = asyncio.create_task(stream_binance_data(queue, stop_event))
store_task = asyncio.create_task(file_writer(queue, stop_event, base_name="binance_crypto_prices"))
await asyncio.sleep(22)
stop_event.set()
await asyncio.gather(get_task, store_task)

Received non-JSON message: 
PINGING
PINGING


[None, None]

## UNSUBSCRIBING JUST DOESN'T WORK AND IS POORLY DOCUMENTED

In [42]:
url = "wss://ws-live-data.polymarket.com"
last_time_pong = dt.datetime.now()
subscribe_message = {
    "action": "subscribe",
    "subscriptions": [
    {
        "topic": "crypto_prices_chainlink",
        "type": "*",
        "filters": "{\"symbol\":\"btc/usd\"}"
    }
    ]
}
unsubscribe_message = {
  "action": "unsubscribe",
  "subscriptions": [
    {
      "topic": "crypto_prices_chainlink",
      "type": "*",
      "filters": ""
    }
  ]
}
crypto_msgs1 = []
crypto_msgs2 = []
async with websockets.connect(url) as websocket:
    await websocket.send(json.dumps(subscribe_message))
    i = 0
    while i < 5:
        m = await websocket.recv()
        if m != "PONG":
            last_time_pong = dt.datetime.now()
        try:
            d = json.loads(m)
            crypto_msgs1.append(d)
            # print(d)
        except json.JSONDecodeError:
            print("Received non-JSON message:", m)
            continue
        # print(d)
        if last_time_pong + dt.timedelta(seconds=5) < dt.datetime.now():
            await websocket.send("PING")
        i+=1
        if i%10 == 0:
            print(i)
    
    print("UNSUBBING")
    await websocket.send(json.dumps(unsubscribe_message))
    i = 0
    while i < 5:
        m = await websocket.recv()
        if m != "PONG":
            last_time_pong = dt.datetime.now()
        try:
            d = json.loads(m)
            crypto_msgs2.append(d)
            print(d)
        except json.JSONDecodeError:
            print("Received non-JSON message:", m)
            continue
        # print(d)
        if last_time_pong + dt.timedelta(seconds=5) < dt.datetime.now():
            await websocket.send("PING")
        i+=1
        if i%10 == 0:
            print(i)

            

Received non-JSON message: 
UNSUBBING
{'connection_id': 'TNjJ5eykLPECJGA=', 'payload': {'full_accuracy_value': '113057848589263760000000', 'symbol': 'btc/usd', 'timestamp': 1761745065000, 'value': 113057.84858926375}, 'timestamp': 1761745066093, 'topic': 'crypto_prices_chainlink', 'type': 'update'}
{'message': 'Invalid request body', 'connectionId': 'TNjJ5eykLPECJGA=', 'requestId': 'TNjL1ENcLPEEGZQ='}
{'connection_id': 'TNjJ5eykLPECJGA=', 'payload': {'full_accuracy_value': '113046995462673495000000', 'symbol': 'btc/usd', 'timestamp': 1761745066000, 'value': 113046.9954626735}, 'timestamp': 1761745066745, 'topic': 'crypto_prices_chainlink', 'type': 'update'}
{'connection_id': 'TNjJ5eykLPECJGA=', 'payload': {'full_accuracy_value': '113033449130642245000000', 'symbol': 'btc/usd', 'timestamp': 1761745067000, 'value': 113033.44913064224}, 'timestamp': 1761745068070, 'topic': 'crypto_prices_chainlink', 'type': 'update'}
{'connection_id': 'TNjJ5eykLPECJGA=', 'payload': {'full_accuracy_value':

In [43]:
crypto_msgs2

[{'connection_id': 'TNjJ5eykLPECJGA=',
  'payload': {'full_accuracy_value': '113057848589263760000000',
   'symbol': 'btc/usd',
   'timestamp': 1761745065000,
   'value': 113057.84858926375},
  'timestamp': 1761745066093,
  'topic': 'crypto_prices_chainlink',
  'type': 'update'},
 {'message': 'Invalid request body',
  'connectionId': 'TNjJ5eykLPECJGA=',
  'requestId': 'TNjL1ENcLPEEGZQ='},
 {'connection_id': 'TNjJ5eykLPECJGA=',
  'payload': {'full_accuracy_value': '113046995462673495000000',
   'symbol': 'btc/usd',
   'timestamp': 1761745066000,
   'value': 113046.9954626735},
  'timestamp': 1761745066745,
  'topic': 'crypto_prices_chainlink',
  'type': 'update'},
 {'connection_id': 'TNjJ5eykLPECJGA=',
  'payload': {'full_accuracy_value': '113033449130642245000000',
   'symbol': 'btc/usd',
   'timestamp': 1761745067000,
   'value': 113033.44913064224},
  'timestamp': 1761745068070,
  'topic': 'crypto_prices_chainlink',
  'type': 'update'},
 {'connection_id': 'TNjJ5eykLPECJGA=',
  'paylo