### Cell 1: Install Core Packages
Installs dependencies:
- cryptofeed: WebSocket client for crypto data
- redis: in-memory store (optional)
- nest_asyncio: allow nested event loops in notebooks

In [None]:
!pip install cryptofeed redis nest_asyncio

Collecting cryptofeed
  Downloading cryptofeed-2.4.1.tar.gz (434 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/434.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━[0m [32m204.8/434.4 kB[0m [31m5.5 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m430.1/434.4 kB[0m [31m8.8 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.4/434.4 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting redis
  Downloading redis-5.2.1-py3-none-any.whl.metadata (9.1 kB)
Collecting aiofile>=2.0.0 (from cryptofeed)
  Downloading aiofile-3.9.0-py3-none-any.whl.metadata (14 kB)
Collecting yapic.json>=1.6.3 (from cryptofeed)
  Downloading yapi

### Cell 2: WebSocket-Only Feed Handler
Streams all channels (TRADES, TICKER, CANDLES, FUNDING, LIQUIDATIONS) for BTC-USDT via Binance Futures without REST snapshots.
No errors expected here if snapshot=False is set correctly.

In [1]:
# Install the only required package
!pip install cryptofeed nest_asyncio --quiet

# ─── Imports & setup ────────────────────────────────────────────────
import nest_asyncio, asyncio, logging
from datetime import datetime, timezone
from cryptofeed import FeedHandler
from cryptofeed.defines import TRADES, TICKER, CANDLES, FUNDING, LIQUIDATIONS, OPEN_INTEREST
from cryptofeed.exchanges import BinanceFutures
from cryptofeed.symbols import Symbols

# Allow nested asyncio loops in Colab
nest_asyncio.apply()

# Silence verbose logs
for lib in ("cryptofeed", "feedhandler", "asyncio"):
    logging.getLogger(lib).setLevel(logging.WARNING)

# Bypass Binance REST symbol lookup
Symbols.set('BINANCE_FUTURES', {'BTC-USDT': 'BTCUSDT'}, {})

# ─── Helpers & Callbacks ────────────────────────────────────────────
def iso(ts):
    return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()

async def handle_trade(t, receipt_ts):
    print(f"💱 TRADE       {t.symbol} {t.side} {float(t.amount)} @ {float(t.price)} "
          f"(ex {iso(t.timestamp)} / recv {iso(receipt_ts)})")

async def handle_ticker(tk, receipt_ts):
    print(f"📈 TICKER      {tk.symbol} bid {float(tk.bid)} / ask {float(tk.ask)} @ {iso(receipt_ts)}")

async def handle_candle(c, receipt_ts):
    print(f"🕯 CANDLE {c.interval} {c.symbol} O:{float(c.open)} H:{float(c.high)} "
          f"L:{float(c.low)} C:{float(c.close)} ({iso(c.start)}→{iso(c.end)})")

async def handle_funding(f, receipt_ts):
    # use f.rate, not f.funding_rate
    print(f"💵 FUNDING     {f.symbol} rate {float(f.rate)} @ {iso(receipt_ts)}")

async def handle_liquidation(l, receipt_ts):
    print(f"☠️ LIQUIDATION  {l.symbol} {l.side} amt {float(l.amount)} @ {iso(receipt_ts)}")

async def handle_open_interest(oi, receipt_ts):
    # OPEN_INTEREST is polled via REST; may occasionally return HTTP 451
    print(f"⚙️ OPEN_INTEREST {oi.symbol} = {oi.open_interest} @ {iso(receipt_ts)}")

# ─── Launch the WebSocket-only feed ─────────────────────────────────
def main():
    fh = FeedHandler()
    fh.add_feed(
        BinanceFutures(
            symbols=["BTC-USDT"],
            channels=[
                TRADES,
                TICKER,
                CANDLES,
                FUNDING,
                LIQUIDATIONS,
                OPEN_INTEREST
            ],
            callbacks={
                TRADES:        handle_trade,
                TICKER:        handle_ticker,
                CANDLES:       handle_candle,
                FUNDING:       handle_funding,
                LIQUIDATIONS:  handle_liquidation,
                OPEN_INTEREST: handle_open_interest
            },
            candle_intervals=["1m", "5m", "1h"],
            snapshot=False  # ← disable any REST snapshot for book channels
        )
    )
    print("🚀 Streaming trades, ticker, candles, funding, liquidations & open interest (WebSocket-only)")
    fh.run()

# Run immediately in Colab
main()


🚀 Streaming trades, ticker, candles, funding, liquidations & open interest (WebSocket-only)
⚙️ OPEN_INTEREST BTC-USDT = 84186.754 @ 2025-04-27T01:31:12.563055+00:00
💱 TRADE       BTC-USDT sell 0.044 @ 94653.7 (ex 2025-04-27T01:31:12.589000+00:00 / recv 2025-04-27T01:31:12.780450+00:00)
📈 TICKER      BTC-USDT bid 94653.7 / ask 94653.8 @ 2025-04-27T01:31:12.829215+00:00
📈 TICKER      BTC-USDT bid 94653.7 / ask 94653.8 @ 2025-04-27T01:31:12.834692+00:00
📈 TICKER      BTC-USDT bid 94653.7 / ask 94653.8 @ 2025-04-27T01:31:12.959408+00:00
📈 TICKER      BTC-USDT bid 94653.7 / ask 94653.8 @ 2025-04-27T01:31:12.961750+00:00
📈 TICKER      BTC-USDT bid 94653.7 / ask 94653.8 @ 2025-04-27T01:31:13.010464+00:00
📈 TICKER      BTC-USDT bid 94653.7 / ask 94653.8 @ 2025-04-27T01:31:13.014875+00:00
📈 TICKER      BTC-USDT bid 94653.7 / ask 94653.8 @ 2025-04-27T01:31:13.029505+00:00
📈 TICKER      BTC-USDT bid 94653.7 / ask 94653.8 @ 2025-04-27T01:31:13.030364+00:00
📈 TICKER      BTC-USDT bid 94653.7 / ask 

SystemExit: 

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


### Cell 3: Stream TRADES Only
No errors expected: pure WebSocket stream of trades.

In [1]:
# Install dependencies
!pip install cryptofeed nest_asyncio --quiet

import nest_asyncio, logging
from datetime import datetime, timezone
from cryptofeed import FeedHandler
from cryptofeed.defines import TRADES
from cryptofeed.exchanges import BinanceFutures
from cryptofeed.symbols import Symbols

# allow nested asyncio in Colab
nest_asyncio.apply()
logging.getLogger("cryptofeed").setLevel(logging.WARNING)

# bypass REST symbol lookup
Symbols.set('BINANCE_FUTURES', {'BTC-USDT': 'BTCUSDT'}, {})

# helper to format timestamps
def ts_to_iso(ts):
    return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()

async def handle_trade(t, receipt_ts):
    print(f"💱 TRADE {t.symbol:9} {t.side:<4} {float(t.amount):>8g} @ {float(t.price):>10g} "
          f"(exch {ts_to_iso(t.timestamp)} recv {ts_to_iso(receipt_ts)})")

def main():
    fh = FeedHandler()
    fh.add_feed(
        BinanceFutures(
            symbols=["BTC-USDT"],
            channels=[TRADES],
            callbacks={TRADES: handle_trade},
            snapshot=False
        )
    )
    print("🚀 Streaming BTC-USDT trades…")
    fh.run()

main()


[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/434.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.4/434.4 kB[0m [31m19.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.5/63.5 kB[0m [31m6.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m610.6/610.6 kB[0m [31m40.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.0/4.0 MB[0m [31m65.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.3/78.3 kB[0m [31m7.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m289.5/289.5 kB[0m [31m27.7 MB/s[0m eta [36m0:00

SystemExit: 

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


### Cell 4: Stream TICKER Only
No errors expected: pure WebSocket stream of ticker updates.

In [1]:
# Install dependencies
!pip install cryptofeed nest_asyncio --quiet

import nest_asyncio, logging
from datetime import datetime, timezone
from cryptofeed import FeedHandler
from cryptofeed.defines import TICKER
from cryptofeed.exchanges import BinanceFutures
from cryptofeed.symbols import Symbols

nest_asyncio.apply()
logging.getLogger("cryptofeed").setLevel(logging.WARNING)
Symbols.set('BINANCE_FUTURES', {'BTC-USDT': 'BTCUSDT'}, {})

def ts_to_iso(ts):
    return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()

async def handle_ticker(tk, receipt_ts):
    print(f"📈 TICKER {tk.symbol:9} bid {float(tk.bid):>10g} / ask {float(tk.ask):>10g} "
          f"@ {ts_to_iso(receipt_ts)}")

def main():
    fh = FeedHandler()
    fh.add_feed(
        BinanceFutures(
            symbols=["BTC-USDT"],
            channels=[TICKER],
            callbacks={TICKER: handle_ticker},
            snapshot=False
        )
    )
    print("🚀 Streaming BTC-USDT ticker…")
    fh.run()

main()


🚀 Streaming BTC-USDT ticker…
📈 TICKER BTC-USDT  bid    94844.9 / ask      94845 @ 2025-04-27T01:07:51.555079+00:00
📈 TICKER BTC-USDT  bid    94844.9 / ask      94845 @ 2025-04-27T01:07:51.555320+00:00
📈 TICKER BTC-USDT  bid    94844.9 / ask      94845 @ 2025-04-27T01:07:51.611189+00:00
📈 TICKER BTC-USDT  bid    94844.9 / ask      94845 @ 2025-04-27T01:07:51.615624+00:00
📈 TICKER BTC-USDT  bid    94844.9 / ask      94845 @ 2025-04-27T01:07:51.668151+00:00
📈 TICKER BTC-USDT  bid    94844.9 / ask      94845 @ 2025-04-27T01:07:51.668276+00:00
📈 TICKER BTC-USDT  bid    94844.9 / ask      94845 @ 2025-04-27T01:07:51.668373+00:00
📈 TICKER BTC-USDT  bid    94844.9 / ask      94845 @ 2025-04-27T01:07:51.668632+00:00
📈 TICKER BTC-USDT  bid    94844.9 / ask      94845 @ 2025-04-27T01:07:51.669529+00:00
📈 TICKER BTC-USDT  bid    94844.9 / ask      94845 @ 2025-04-27T01:07:51.670358+00:00
📈 TICKER BTC-USDT  bid    94844.9 / ask      94845 @ 2025-04-27T01:07:51.672651+00:00
📈 TICKER BTC-USDT  bid   

SystemExit: 

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


### Cell 5: Stream CANDLES Only
No errors expected when candle_intervals are valid and snapshot=False disables REST snapshots.

In [2]:
# Install dependencies
!pip install cryptofeed nest_asyncio --quiet

import nest_asyncio, logging
from datetime import datetime, timezone
from cryptofeed import FeedHandler
from cryptofeed.defines import CANDLES
from cryptofeed.exchanges import BinanceFutures
from cryptofeed.symbols import Symbols

# ─── Setup ─────────────────────────────────────────────────────────────
nest_asyncio.apply()
logging.getLogger("cryptofeed").setLevel(logging.WARNING)
Symbols.set('BINANCE_FUTURES', {'BTC-USDT': 'BTCUSDT'}, {})

# ─── Helper to format timestamps ───────────────────────────────────────
def ts_to_iso(ts):
    return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()

# ─── Candle callback (using c.stop, not c.end) ────────────────────────
async def handle_candle(c, receipt_ts):
    print(
        f"🕯 CANDLE {c.interval:>3} {c.symbol:9}  "
        f"O:{float(c.open):>8g} H:{float(c.high):>8g} "
        f"L:{float(c.low):>8g} C:{float(c.close):>8g}  "
        f"(start {ts_to_iso(c.start)} → stop {ts_to_iso(c.stop)})"
    )

# ─── Run only the candles feed ────────────────────────────────────────
def main():
    fh = FeedHandler()
    fh.add_feed(
        BinanceFutures(
            symbols=["BTC-USDT"],
            channels=[CANDLES],
            callbacks={CANDLES: handle_candle},
            candle_intervals=["1m", "5m", "1h"],
            snapshot=False
        )
    )
    print("🚀 Streaming BTC-USDT candles (1m/5m/1h)…")
    fh.run()

main()


🚀 Streaming BTC-USDT candles (1m/5m/1h)…
🕯 CANDLE  1m BTC-USDT   O: 94844.9 H:   94860 L:   94830 C: 94844.1  (start 2025-04-27T01:08:00+00:00 → stop 2025-04-27T01:08:59.999000+00:00)
🕯 CANDLE  1m BTC-USDT   O: 94844.1 H: 94852.3 L: 94813.5 C: 94852.3  (start 2025-04-27T01:09:00+00:00 → stop 2025-04-27T01:09:59.999000+00:00)
🕯 CANDLE  1m BTC-USDT   O: 94852.8 H: 94883.1 L:   94800 C: 94800.1  (start 2025-04-27T01:10:00+00:00 → stop 2025-04-27T01:10:59.999000+00:00)
🕯 CANDLE  1m BTC-USDT   O: 94800.1 H:   94870 L: 94792.1 C: 94855.1  (start 2025-04-27T01:11:00+00:00 → stop 2025-04-27T01:11:59.999000+00:00)
🕯 CANDLE  1m BTC-USDT   O: 94855.1 H:   94955 L: 94845.6 C: 94952.8  (start 2025-04-27T01:12:00+00:00 → stop 2025-04-27T01:12:59.999000+00:00)


SystemExit: 

### Cell 6: Stream FUNDING Rates
No errors expected: uses f.rate attribute (not f.funding_rate).

In [3]:
# Install dependencies
!pip install cryptofeed nest_asyncio --quiet

import nest_asyncio, logging
from datetime import datetime, timezone
from cryptofeed import FeedHandler
from cryptofeed.defines import FUNDING
from cryptofeed.exchanges import BinanceFutures
from cryptofeed.symbols import Symbols

nest_asyncio.apply()
logging.getLogger("cryptofeed").setLevel(logging.WARNING)
Symbols.set('BINANCE_FUTURES', {'BTC-USDT': 'BTCUSDT'}, {})

def ts_to_iso(ts):
    return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()

async def handle_funding(f, receipt_ts):
    # note: use f.rate for funding rate
    print(f"💵 FUNDING {f.symbol:9} rate {float(f.rate):>8g} @ {ts_to_iso(receipt_ts)}")

def main():
    fh = FeedHandler()
    fh.add_feed(
        BinanceFutures(
            symbols=["BTC-USDT"],
            channels=[FUNDING],
            callbacks={FUNDING: handle_funding},
            snapshot=False
        )
    )
    print("🚀 Streaming BTC-USDT funding rates…")
    fh.run()

main()


🚀 Streaming BTC-USDT funding rates…
💵 FUNDING BTC-USDT  rate -8.86e-06 @ 2025-04-27T01:14:09.134659+00:00
💵 FUNDING BTC-USDT  rate -8.86e-06 @ 2025-04-27T01:14:12.135423+00:00


SystemExit: 

### Cell 7: Stream LIQUIDATIONS Only
No errors expected: pure WebSocket stream of liquidation events.

In [4]:
# Install dependencies
!pip install cryptofeed nest_asyncio --quiet

import nest_asyncio, logging
from datetime import datetime, timezone
from cryptofeed import FeedHandler
from cryptofeed.defines import LIQUIDATIONS
from cryptofeed.exchanges import BinanceFutures
from cryptofeed.symbols import Symbols

# ─── Setup ─────────────────────────────────────────────────────────────
nest_asyncio.apply()
logging.getLogger("cryptofeed").setLevel(logging.WARNING)
Symbols.set('BINANCE_FUTURES', {'BTC-USDT': 'BTCUSDT'}, {})

# ─── Helper to format timestamps ───────────────────────────────────────
def ts_to_iso(ts):
    return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()

# ─── Liquidation callback ─────────────────────────────────────────────
async def handle_liquidation(l, receipt_ts):
    print(
        f"☠️ LIQUIDATION  {l.symbol:9}  {l.side:<4} "
        f"qty {float(l.quantity):>8g}  price {float(l.price):>12g}  "
        f"@ {ts_to_iso(receipt_ts)}"
    )

# ─── Run only the liquidations feed ───────────────────────────────────
def main():
    fh = FeedHandler()
    fh.add_feed(
        BinanceFutures(
            symbols=["BTC-USDT"],
            channels=[LIQUIDATIONS],
            callbacks={LIQUIDATIONS: handle_liquidation},
            snapshot=False   # ← pure WebSocket, no REST
        )
    )
    print("🚀 Streaming BTC-USDT liquidation events…")
    fh.run()

main()


🚀 Streaming BTC-USDT liquidation events…
☠️ LIQUIDATION  BTC-USDT   buy  qty    0.024  price      95488.1  @ 2025-04-27T01:16:18.285107+00:00
☠️ LIQUIDATION  BTC-USDT   buy  qty    0.002  price      95590.4  @ 2025-04-27T01:17:28.296517+00:00
☠️ LIQUIDATION  BTC-USDT   buy  qty    0.032  price      95597.1  @ 2025-04-27T01:17:30.172740+00:00
☠️ LIQUIDATION  BTC-USDT   buy  qty    0.045  price      95622.9  @ 2025-04-27T01:17:32.148176+00:00
☠️ LIQUIDATION  BTC-USDT   buy  qty    0.021  price      95632.3  @ 2025-04-27T01:17:37.253922+00:00
☠️ LIQUIDATION  BTC-USDT   buy  qty    0.002  price      95646.2  @ 2025-04-27T01:17:38.254842+00:00
☠️ LIQUIDATION  BTC-USDT   buy  qty     0.05  price      95654.6  @ 2025-04-27T01:17:39.270806+00:00
☠️ LIQUIDATION  BTC-USDT   buy  qty    0.014  price      95657.3  @ 2025-04-27T01:17:50.225181+00:00
☠️ LIQUIDATION  BTC-USDT   sell qty    0.006  price        94443  @ 2025-04-27T01:19:55.518976+00:00


SystemExit: 

### Cell 8: L1_BOOK Unsupported
**Error:** UnsupportedDataFeed: l1_book is not supported on BINANCE_FUTURES  
Binance Futures lacks a L1_BOOK WebSocket endpoint.  
Source: https://docs.cryptofeed.readthedocs.io  
Source: https://github.com/bmoscon/cryptofeed

In [5]:
# The code below demonstrates why subscribing to L1_BOOK on Binance Futures fails:
# Binance Futures does not expose a top‐of‐book WebSocket stream ("bookTicker"), so
# Cryptofeed’s BinanceFutures class has no mapping for L1_BOOK. As soon as you pass
# L1_BOOK into the channels list, Cryptofeed raises:
#   UnsupportedDataFeed: l1_book is not supported on BINANCE_FUTURES
# To avoid this error, remove L1_BOOK or switch to a supported channel (e.g. L2_BOOK).

# Install dependencies
!pip install cryptofeed nest_asyncio --quiet

import nest_asyncio, logging
from datetime import datetime, timezone
from cryptofeed import FeedHandler
from cryptofeed.defines import L1_BOOK
from cryptofeed.exchanges import BinanceFutures
from cryptofeed.symbols import Symbols

nest_asyncio.apply()
logging.getLogger("cryptofeed").setLevel(logging.WARNING)
Symbols.set('BINANCE_FUTURES', {'BTC-USDT': 'BTCUSDT'}, {})

def iso(ts):
    return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()

async def handle_l1(book, receipt_ts):
    # never called—subscription fails first
    print(f"L1_BOOK {book.symbol} bid {book.bid}@{iso(receipt_ts)}")

def main():
    fh = FeedHandler()
    fh.add_feed(
        BinanceFutures(
            symbols=["BTC-USDT"],
            channels=[L1_BOOK],        # ← UnsupportedDataFeed error
            callbacks={L1_BOOK: handle_l1},
            snapshot=False
        )
    )
    print("🚀 Attempting to stream L1_BOOK (will error)…")
    fh.run()

main()


UnsupportedDataFeed: l1_book is not supported on BINANCE_FUTURES

### Cell 9: Stream L2_BOOK Deltas
**Note:** Use snapshot=False to disable initial REST snapshot.  
No unsupported errors here; L2_BOOK deltas work as expected.  
Source: https://docs.cryptofeed.readthedocs.io

In [None]:
# Error Comments for L2_BOOK:
# - You’re seeing an HTTP 451 error when Cryptofeed attempts its initial REST snapshot:
#     Status code 451 for URL https://fapi.binance.com/fapi/v1/depth?symbol=BTCUSDT&limit=1000
#     “Service unavailable from a restricted location…” per Binance Terms of Service
# - This happens because snapshot=True (the default) triggers a depth REST call before streaming deltas.
# - Workaround: set snapshot=False to skip the REST call and receive only incremental updates.
# - Sources (plain text):
#     https://docs.cryptofeed.readthedocs.io
#     https://www.binance.com/en/terms

# Install dependencies quietly
!pip install cryptofeed nest_asyncio --quiet

import nest_asyncio, logging
from cryptofeed import FeedHandler
from cryptofeed.defines import L2_BOOK
from cryptofeed.exchanges import BinanceFutures
from cryptofeed.symbols import Symbols

# ─── Setup ─────────────────────────────────────────────────────────────
nest_asyncio.apply()
logging.getLogger("cryptofeed").setLevel(logging.WARNING)
Symbols.set('BINANCE_FUTURES', {'BTC-USDT': 'BTCUSDT'}, {})

async def handle_l2(book, receipt_ts):
    """
    book.delta holds only the incremental changes since last update:
      - bids: list of [price, size] tuples added/updated on the bid side
      - asks: same for ask side
    """
    bids_delta = book.delta['bids']
    asks_delta = book.delta['asks']
    print(f"📚 L2_BOOK {book.symbol}  bids_delta={bids_delta[:3]}  asks_delta={asks_delta[:3]}  ts={receipt_ts}")

def main():
    fh = FeedHandler()
    fh.add_feed(
        BinanceFutures(
            symbols=["BTC-USDT"],
            channels=[L2_BOOK],
            callbacks={L2_BOOK: handle_l2},
            snapshot=False   # ← disable the initial REST snapshot to avoid HTTP 451 errors
        )
    )
    print("🚀 Streaming BTC-USDT order‐book deltas (L2_BOOK)…")
    fh.run()

if __name__ == "__main__":
    main()


🚀 Streaming BTC-USDT order‐book deltas (L2_BOOK)…


2025-04-27 01:20:27,874 : ERROR : BINANCE_FUTURES.ws.10: encountered an exception, reconnecting in 1.0 seconds
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/cryptofeed/connection_handler.py", line 65, in _create_connection
    await self._handler(connection, self.handler)
  File "/usr/local/lib/python3.11/dist-packages/cryptofeed/connection_handler.py", line 99, in _handler
    await handler(message, connection, self.conn.last_message)
  File "/usr/local/lib/python3.11/dist-packages/cryptofeed/exchanges/binance_futures.py", line 263, in message_handler
    await self._book(msg, pair, timestamp)
  File "/usr/local/lib/python3.11/dist-packages/cryptofeed/exchanges/binance.py", line 317, in _book
    await self._snapshot(exchange_pair)
  File "/usr/local/lib/python3.11/dist-packages/cryptofeed/exchanges/binance.py", line 289, in _snapshot
    await self.book_callback(L2_BOOK, self._l2_book[std_pair], time.time(), timestamp=timestamp, raw=resp, sequence

### Cell 10: Stream L3_BOOK (Per-Order)
**Note:** L3_BOOK provides raw order-level deltas including order_id.  
No unsupported errors; ensure snapshot=False to avoid REST book snapshots.  
Source: https://docs.cryptofeed.readthedocs.io

In [1]:
# Error Comments for L3_BOOK:
# - You attempted to subscribe to L3_BOOK on BinanceFutures, but:
#     UnsupportedDataFeed: l3_book is not supported on BINANCE_FUTURES
# - Internally, BinanceFutures.websocket_channels does not include 'l3_book', causing a KeyError and then this exception.
# - L3_BOOK (per-order deltas) is only available on exchanges that expose full depth deltas via WebSocket (e.g. Bitmex, Deribit).
# - Workaround: switch to L2_BOOK for Binance, or use an exchange that supports L3_BOOK.
# - Sources (plain text):
#     https://docs.cryptofeed.readthedocs.io
#     https://github.com/bmoscon/cryptofeed

# Install dependencies quietly
!pip install cryptofeed nest_asyncio --quiet

import nest_asyncio, logging
from cryptofeed import FeedHandler
from cryptofeed.defines import L3_BOOK
from cryptofeed.exchanges import BinanceFutures
from cryptofeed.symbols import Symbols

# ─── Setup ─────────────────────────────────────────────────────────────
nest_asyncio.apply()
logging.getLogger("cryptofeed").setLevel(logging.WARNING)
# Bypass REST symbol lookup mapping
Symbols.set('BINANCE_FUTURES', {'BTC-USDT': 'BTCUSDT'}, {})

async def handle_l3(book, receipt_ts):
    # This callback will never be reached—subscription fails at startup
    pass

def main():
    fh = FeedHandler()
    # Attempt to subscribe to L3_BOOK on BinanceFutures:
    fh.add_feed(
        BinanceFutures(
            symbols=["BTC-USDT"],
            channels=[L3_BOOK],          # ← triggers UnsupportedDataFeed error here
            callbacks={L3_BOOK: handle_l3},
            snapshot=False               # pure WebSocket only, no REST snapshot
        )
    )
    print("🚀 Attempting to stream L3_BOOK (will error)…")
    fh.run()

if __name__ == "__main__":
    main()


[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/434.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.4/434.4 kB[0m [31m15.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.5/63.5 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m610.6/610.6 kB[0m [31m44.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.0/4.0 MB[0m [31m107.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.3/78.3 kB[0m [31m6.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m289.5/289.5 kB[0m [31m27.2 MB/s[0m eta [36m0:0

UnsupportedDataFeed: l3_book is not supported on BINANCE_FUTURES

### Cell 11: OPEN_INTEREST

---



---


**Error:** HTTP 451 Status code for https://fapi.binance.com/fapi/v1/openInterest?symbol=BTCUSDT  
Binance geo-blocks this endpoint; no WebSocket alternative for open interest data.  
Source: https://docs.cryptofeed.readthedocs.io  
Source: https://github.com/bmoscon/cryptofeed

In [2]:

!pip install cryptofeed nest_asyncio --quiet

import nest_asyncio, logging
from datetime import datetime, timezone
from cryptofeed import FeedHandler
from cryptofeed.defines import OPEN_INTEREST
from cryptofeed.exchanges import BinanceFutures
from cryptofeed.symbols import Symbols

# Allow nested asyncio loops in Colab
nest_asyncio.apply()
logging.getLogger("cryptofeed").setLevel(logging.WARNING)

# Bypass REST symbol lookup (does not affect open_interest polling)
Symbols.set('BINANCE_FUTURES', {'BTC-USDT': 'BTCUSDT'}, {})

def ts_to_iso(ts):
    return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()

async def handle_open_interest(oi, receipt_ts):
    print(f"⚙️ OPEN_INTEREST {oi.symbol} = {oi.open_interest} @ {ts_to_iso(receipt_ts)}")

def main():
    fh = FeedHandler()
    fh.add_feed(
        BinanceFutures(
            symbols=["BTC-USDT"],
            channels=[OPEN_INTEREST],
            callbacks={OPEN_INTEREST: handle_open_interest},
            snapshot=False  # REST poll; may return HTTP 451
        )
    )
    print("🚀 Streaming BTC-USDT futures open interest (OPEN_INTEREST)…")
    fh.run()

main()


🚀 Streaming BTC-USDT futures open interest (OPEN_INTEREST)…
⚙️ OPEN_INTEREST BTC-USDT = 83759.798 @ 2025-04-27T01:22:52.858421+00:00
⚙️ OPEN_INTEREST BTC-USDT = 83754.002 @ 2025-04-27T01:22:56.538922+00:00
⚙️ OPEN_INTEREST BTC-USDT = 83753.234 @ 2025-04-27T01:22:58.998241+00:00
⚙️ OPEN_INTEREST BTC-USDT = 83759.873 @ 2025-04-27T01:23:01.460724+00:00
⚙️ OPEN_INTEREST BTC-USDT = 83761.527 @ 2025-04-27T01:23:05.151096+00:00
⚙️ OPEN_INTEREST BTC-USDT = 83819.678 @ 2025-04-27T01:23:08.827545+00:00


SystemExit: 

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


### Cell 12: MongoDB Integration
Streams data into MongoDB collections; requires valid URI and network access.
No errors expected if credentials are correct.

In [None]:
# Install dependencies
!pip install cryptofeed pymongo nest_asyncio --quiet

# ─── Imports & Configuration ─────────────────────────────────────────
import nest_asyncio
import asyncio
import logging
from datetime import datetime, timezone
from pymongo import MongoClient
from cryptofeed import FeedHandler
from cryptofeed.defines import (
    TRADES,
    TICKER,
    CANDLES,
    FUNDING,
    LIQUIDATIONS,
    OPEN_INTEREST,
)
from cryptofeed.exchanges import BinanceFutures
from cryptofeed.symbols import Symbols

# ─── MongoDB Setup ───────────────────────────────────────────────────
MONGO_URI = "mongodb+srv://satvik:Stankarrk@satvik.kimjuo9.mongodb.net/?retryWrites=true&w=majority"
client = MongoClient(MONGO_URI)
db = client["CryptoTradeLogger"]
collections = {
    'trades':         db["trades"],
    'ticker':         db["ticker"],
    'candles':        db["candles_native"],
    'funding':        db["funding"],
    'liquidations':   db["liquidations"],
    'open_interest':  db["open_interest"],  # new collection for open interest
}

# ─── Bypass REST symbol lookup & nest asyncio setup ────────────────
Symbols.set('BINANCE_FUTURES', {'BTC-USDT': 'BTCUSDT'}, {})
nest_asyncio.apply()

# ─── Reduce log noise ────────────────────────────────────────────────
for lib in ("cryptofeed", "feedhandler", "asyncio"):
    logging.getLogger(lib).setLevel(logging.WARNING)

# ─── Helpers ────────────────────────────────────────────────────────
def ts_to_iso(ts):
    return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()

# ─── Callback Definitions ────────────────────────────────────────────
async def handle_trade(t, receipt_ts):
    doc = {
        "exchange":    t.exchange,
        "symbol":      t.symbol,
        "side":        t.side,
        "price":       float(t.price),
        "amount":      float(t.amount),
        "exchange_ts": datetime.fromtimestamp(t.timestamp, tz=timezone.utc),
        "received_ts": datetime.fromtimestamp(receipt_ts, tz=timezone.utc)
    }
    collections['trades'].insert_one(doc)
    print("💱 Trade:", doc)

async def handle_ticker(tick, receipt_ts):
    doc = {
        "_id":      f"{tick.exchange}:{tick.symbol}",
        "exchange": tick.exchange,
        "symbol":   tick.symbol,
        "bid":      float(tick.bid),
        "ask":      float(tick.ask),
        "ts":       datetime.fromtimestamp(receipt_ts, tz=timezone.utc)
    }
    collections['ticker'].replace_one({"_id": doc["_id"]}, doc, upsert=True)
    print("📈 Ticker:", doc)

async def handle_candle(c, receipt_ts):
    doc = {
        "symbol":     c.symbol,
        "interval":   c.interval,
        "start_time": datetime.fromtimestamp(c.start, tz=timezone.utc),
        "end_time":   datetime.fromtimestamp(c.stop, tz=timezone.utc),
        "open":       float(c.open),
        "high":       float(c.high),
        "low":        float(c.low),
        "close":      float(c.close),
        "volume":     float(c.volume)
    }
    collections['candles'].insert_one(doc)
    print(f"🕯 {c.interval} Candle:", doc)

async def handle_funding(f, receipt_ts):
    doc = {
        "symbol":       f.symbol,
        "funding_rate": float(f.rate),
        "ts":           datetime.fromtimestamp(receipt_ts, tz=timezone.utc)
    }
    collections['funding'].insert_one(doc)
    print("💵 Funding:", doc)

async def handle_liquidation(l, receipt_ts):
    doc = {
        "symbol": l.symbol,
        "side":   l.side,
        "amount": float(l.amount),
        "ts":     datetime.fromtimestamp(receipt_ts, tz=timezone.utc)
    }
    collections['liquidations'].insert_one(doc)
    print("☠️ Liquidation:", doc)

async def handle_open_interest(oi, receipt_ts):
    # OPEN_INTEREST is always polled via REST and may occasionally return HTTP 451
    doc = {
        "symbol":        oi.symbol,
        "open_interest": float(oi.open_interest),
        "ts":            datetime.fromtimestamp(receipt_ts, tz=timezone.utc)
    }
    collections['open_interest'].insert_one(doc)
    print("⚙️ Open Interest:", doc)

# ─── Launch FeedHandler ───────────────────────────────────────────────
def main():
    fh = FeedHandler()
    fh.add_feed(
        BinanceFutures(
            symbols=["BTC-USDT"],
            channels=[
                TRADES,
                TICKER,
                CANDLES,
                FUNDING,
                LIQUIDATIONS,
                OPEN_INTEREST,     # added open interest channel
            ],
            callbacks={
                TRADES:       handle_trade,
                TICKER:       handle_ticker,
                CANDLES:      handle_candle,
                FUNDING:      handle_funding,
                LIQUIDATIONS: handle_liquidation,
                OPEN_INTEREST: handle_open_interest,
            },
            candle_intervals=["1m", "5m", "1h"],
            snapshot=False        # disable all REST snapshots for book data
        )
    )
    print("🚀 Streaming Trades, Ticker, Candles, Funding, Liquidations & Open Interest → MongoDB")
    fh.run()

# Execute
if __name__ == "__main__":
    main()


[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/434.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m430.1/434.4 kB[0m [31m28.9 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.4/434.4 kB[0m [31m10.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m71.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m313.6/313.6 kB[0m [31m28.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.5/63.5 kB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m610.6/610.6 kB[0m [31m39.0 MB/s[0m e

In [None]:
# ─── Allow nested event loops ───────────────────────────────────────────────
import nest_asyncio
nest_asyncio.apply()

# ─── Standard imports ────────────────────────────────────────────────────────
import logging
import asyncio
from datetime import datetime

from cryptofeed import FeedHandler
from cryptofeed.exchanges import BinanceFutures
from cryptofeed.defines import TRADES, OPEN_INTEREST, L2_BOOK

# ─── Logging setup ───────────────────────────────────────────────────────────
logging.basicConfig(level=logging.INFO)
logging.getLogger('cryptofeed').setLevel(logging.DEBUG)

# ─── Global candle state ────────────────────────────────────────────────────
candle_data = {}

# ─── Callbacks ──────────────────────────────────────────────────────────────
async def trade_callback(trade, receipt_timestamp):
    global candle_data
    ts    = int(trade.timestamp)
    price = float(trade.price)
    vol   = float(trade.amount)
    side  = trade.side.upper()

    print(f"📌 Trade — {side:4} {vol} @ {price} (ts={ts})")

    # 1-second candle builder
    if ts not in candle_data:
        if candle_data:
            prev_ts, prev_c = next(iter(candle_data.items()))
            prev_c["timestamp"] = prev_c["timestamp"].isoformat()
            print(f"🕯️  Candle — {prev_c}")

        candle_data.clear()
        candle_data[ts] = {
            "timestamp": datetime.utcfromtimestamp(ts),
            "open":  price,
            "high":  price,
            "low":   price,
            "close": price,
            "volume": vol
        }
    else:
        c = candle_data[ts]
        c["high"]   = max(c["high"], price)
        c["low"]    = min(c["low"], price)
        c["close"]  = price
        c["volume"] += vol

async def open_interest_callback(data, receipt_timestamp):
    oi = {
        "timestamp": datetime.utcfromtimestamp(data.timestamp).isoformat(),
        "open_interest": float(data.open_interest)
    }
    print(f"📊 Open Interest — {oi}")

async def order_book_callback(book, receipt_timestamp):
    bids = [(float(p), float(book.book.bids[p])) for p in list(book.book.bids.keys())[:10]]
    asks = [(float(p), float(book.book.asks[p])) for p in list(book.book.asks.keys())[:10]]

    snap = {
        "timestamp": datetime.utcnow().isoformat(),
        "bids": bids,
        "asks": asks
    }
    print("\n📘 Order Book Snapshot:")
    print(f"Timestamp: {snap['timestamp']}")
    print(f"Bids: {snap['bids'][:5]} …")
    print(f"Asks: {snap['asks'][:5]} …")
    print("=" * 80)

# ─── Entrypoint ─────────────────────────────────────────────────────────────
def main():
    f = FeedHandler()
    f.add_feed(BinanceFutures(
        symbols=['BTC-USDT-PERP'],
        channels=[TRADES, OPEN_INTEREST, L2_BOOK],
        callbacks={
            TRADES:         trade_callback,
            OPEN_INTEREST:  open_interest_callback,
            L2_BOOK:        order_book_callback
        }
    ))

    print("📡 Binance Futures feed started… waiting for data")
    f.run()

if __name__ == "__main__":
    main()


📡 Binance Futures feed started… waiting for data
📊 Open Interest — {'timestamp': '2025-04-27T11:19:22.659000', 'open_interest': 84365.308}

📘 Order Book Snapshot:
Timestamp: 2025-04-27T11:19:29.519222
Bids: [(93970.7, 15.109), (93970.6, 0.005), (93970.5, 0.004), (93970.4, 0.033), (93970.3, 0.085)] …
Asks: [(93970.8, 9.831), (93970.9, 0.004), (93971.2, 0.138), (93971.3, 0.306), (93971.4, 0.002)] …

📘 Order Book Snapshot:
Timestamp: 2025-04-27T11:19:29.521404
Bids: [(93970.7, 15.109), (93970.6, 0.005), (93970.5, 0.004), (93970.4, 0.033), (93970.3, 0.085)] …
Asks: [(93970.8, 9.827), (93970.9, 0.004), (93971.2, 0.138), (93971.3, 0.306), (93971.4, 0.002)] …
📌 Trade — SELL 0.002 @ 93970.7 (ts=1745752766)

📘 Order Book Snapshot:
Timestamp: 2025-04-27T11:19:29.625098
Bids: [(93970.7, 15.109), (93970.6, 0.005), (93970.5, 0.004), (93970.4, 0.033), (93970.3, 0.002)] …
Asks: [(93970.8, 9.827), (93970.9, 0.004), (93971.2, 0.138), (93971.3, 0.306), (93971.4, 0.002)] …

📘 Order Book Snapshot:
Timesta

Traceback (most recent call last):
  File "C:\Users\HP\AppData\Local\Programs\Python\Python310\lib\asyncio\selector_events.py", line 856, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "d:\D Drive\cryptofeed\crypto\lib\site-packages\cryptofeed\connection_handler.py", line 65, in _create_connection
    await self._handler(connection, self.handler)
  File "d:\D Drive\cryptofeed\crypto\lib\site-packages\cryptofeed\connection_handler.py", line 95, in _handler
    async for message in connection.read():
  File "d:\D Drive\cryptofeed\crypto\lib\site-packages\cryptofeed\connection.py", line 339, in read
    async for data in self.conn:
  File "d:\D Drive\cryptofeed\crypto\lib\site-packages\websockets\asyncio\connection.py", line 242, in __aiter__
    yield a

In [2]:
# feed_api.py

import logging
import threading
from datetime import datetime
from collections import deque

from fastapi import FastAPI, Query
from cryptofeed import FeedHandler
from cryptofeed.exchanges import BinanceFutures
from cryptofeed.defines import TRADES, OPEN_INTEREST, L2_BOOK

# ─── Logging ────────────────────────────────────────────────────────────────
logging.basicConfig(level=logging.INFO)
logging.getLogger('cryptofeed').setLevel(logging.WARNING)

# ─── In-memory stores ───────────────────────────────────────────────────────
TRADES_STORE       = deque(maxlen=1000)
CANDLES_STORE      = deque(maxlen=1000)
OPEN_INTEREST_STORE = deque(maxlen=1000)
ORDERBOOK_STORE    = {}

# helper for building 1-sec candles
candle_data = {}

# ─── Cryptofeed callbacks ───────────────────────────────────────────────────
async def trade_callback(trade, receipt_ts):
    ts = int(trade.timestamp)
    tick = {
        "timestamp": datetime.utcfromtimestamp(ts).isoformat(),
        "price":     float(trade.price),
        "volume":    float(trade.amount),
        "side":      trade.side
    }
    TRADES_STORE.append(tick)

    # 1-second candle logic
    global candle_data
    if ts not in candle_data:
        # flush old candle
        if candle_data:
            prev_ts, prev_c = next(iter(candle_data.items()))
            prev_c["timestamp"] = prev_c["timestamp"].isoformat()
            CANDLES_STORE.append(prev_c)
        candle_data.clear()
        candle_data[ts] = {
            "timestamp": datetime.utcfromtimestamp(ts),
            "open":  tick["price"],
            "high":  tick["price"],
            "low":   tick["price"],
            "close": tick["price"],
            "volume": tick["volume"]
        }
    else:
        c = candle_data[ts]
        c["high"]   = max(c["high"], tick["price"])
        c["low"]    = min(c["low"], tick["price"])
        c["close"]  = tick["price"]
        c["volume"] += tick["volume"]

async def open_interest_callback(oi, receipt_ts):
    data = {
        "timestamp": datetime.utcfromtimestamp(oi.timestamp).isoformat(),
        "open_interest": float(oi.open_interest)
    }
    OPEN_INTEREST_STORE.append(data)

async def order_book_callback(book, receipt_ts):
    bids = [(float(p), float(book.book.bids[p])) for p in list(book.book.bids.keys())[:10]]
    asks = [(float(p), float(book.book.asks[p])) for p in list(book.book.asks.keys())[:10]]
    snap = {
        "timestamp": datetime.utcnow().isoformat(),
        "bids": bids,
        "asks": asks
    }
    ORDERBOOK_STORE.clear()
    ORDERBOOK_STORE.update(snap)

# ─── FastAPI setup ──────────────────────────────────────────────────────────
app = FastAPI(title="CryptoFeed API")

@app.on_event("startup")
def start_cryptofeed():
    def run_feed():
        f = FeedHandler()
        f.add_feed(BinanceFutures(
            symbols=['BTC-USDT-PERP'],
            channels=[TRADES, OPEN_INTEREST, L2_BOOK],
            callbacks={
                TRADES:        trade_callback,
                OPEN_INTEREST: open_interest_callback,
                L2_BOOK:       order_book_callback
            }
        ))
        f.run()

    thread = threading.Thread(target=run_feed, daemon=True)
    thread.start()

# ─── API endpoints ──────────────────────────────────────────────────────────
@app.get("/trades")
async def get_trades(limit: int = Query(100, gt=0, lt=1000)):
    """
    Return the most recent trades (default 100).
    """
    return list(TRADES_STORE)[-limit:]

@app.get("/candles")
async def get_candles(limit: int = Query(100, gt=0, lt=1000)):
    """
    Return the most recent 1-second candles.
    """
    return list(CANDLES_STORE)[-limit:]

@app.get("/open_interest")
async def get_open_interest(limit: int = Query(100, gt=0, lt=1000)):
    """
    Return the most recent open interest updates.
    """
    return list(OPEN_INTEREST_STORE)[-limit:]

@app.get("/orderbook")
async def get_orderbook():
    """
    Return the latest order book snapshot (top 10 bids & asks).
    """
    return ORDERBOOK_STORE


        on_event is deprecated, use lifespan event handlers instead.

        Read more about it in the
        [FastAPI docs for Lifespan Events](https://fastapi.tiangolo.com/advanced/events/).
        
  @app.on_event("startup")


In [3]:
# feed_api.py

import logging
import threading
from datetime import datetime
from collections import deque
from contextlib import asynccontextmanager

from fastapi import FastAPI, Query
from cryptofeed import FeedHandler
from cryptofeed.exchanges import BinanceFutures
from cryptofeed.defines import TRADES, OPEN_INTEREST, L2_BOOK

# ─── Logging ────────────────────────────────────────────────────────────────
logging.basicConfig(level=logging.INFO)
logging.getLogger('cryptofeed').setLevel(logging.WARNING)

# ─── In-memory stores ───────────────────────────────────────────────────────
TRADES_STORE        = deque(maxlen=1000)
CANDLES_STORE       = deque(maxlen=1000)
OPEN_INTEREST_STORE = deque(maxlen=1000)
ORDERBOOK_STORE     = {}

# helper for building 1-sec candles
candle_data = {}

# ─── Cryptofeed callbacks ───────────────────────────────────────────────────
async def trade_callback(trade, receipt_ts):
    ts = int(trade.timestamp)
    tick = {
        "timestamp": datetime.utcfromtimestamp(ts).isoformat(),
        "price":     float(trade.price),
        "volume":    float(trade.amount),
        "side":      trade.side
    }
    TRADES_STORE.append(tick)

    # 1-s candle logic
    global candle_data
    if ts not in candle_data:
        if candle_data:
            prev_ts, prev_c = next(iter(candle_data.items()))
            prev_c["timestamp"] = prev_c["timestamp"].isoformat()
            CANDLES_STORE.append(prev_c)
        candle_data.clear()
        candle_data[ts] = {
            "timestamp": datetime.utcfromtimestamp(ts),
            "open":  tick["price"],
            "high":  tick["price"],
            "low":   tick["price"],
            "close": tick["price"],
            "volume": tick["volume"]
        }
    else:
        c = candle_data[ts]
        c["high"]   = max(c["high"], tick["price"])
        c["low"]    = min(c["low"], tick["price"])
        c["close"]  = tick["price"]
        c["volume"] += tick["volume"]

async def open_interest_callback(oi, receipt_ts):
    data = {
        "timestamp": datetime.utcfromtimestamp(oi.timestamp).isoformat(),
        "open_interest": float(oi.open_interest)
    }
    OPEN_INTEREST_STORE.append(data)

async def order_book_callback(book, receipt_ts):
    bids = [(float(p), float(book.book.bids[p])) for p in list(book.book.bids.keys())[:10]]
    asks = [(float(p), float(book.book.asks[p])) for p in list(book.book.asks.keys())[:10]]
    snap = {
        "timestamp": datetime.utcnow().isoformat(),
        "bids": bids,
        "asks": asks
    }
    ORDERBOOK_STORE.clear()
    ORDERBOOK_STORE.update(snap)

# ─── Lifespan handler ────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
    def run_feed():
        f = FeedHandler()
        f.add_feed(BinanceFutures(
            symbols=['BTC-USDT-PERP'],
            channels=[TRADES, OPEN_INTEREST, L2_BOOK],
            callbacks={
                TRADES:        trade_callback,
                OPEN_INTEREST: open_interest_callback,
                L2_BOOK:       order_book_callback
            }
        ))
        f.run()

    thread = threading.Thread(target=run_feed, daemon=True)
    thread.start()
    yield
    # (optional) cleanup on shutdown

# ─── FastAPI app ────────────────────────────────────────────────────────────
app = FastAPI(title="CryptoFeed API", lifespan=lifespan)

# ─── API endpoints ──────────────────────────────────────────────────────────
@app.get("/trades")
async def get_trades(limit: int = Query(100, gt=0, lt=1000)):
    return list(TRADES_STORE)[-limit:]

@app.get("/candles")
async def get_candles(limit: int = Query(100, gt=0, lt=1000)):
    return list(CANDLES_STORE)[-limit:]

@app.get("/open_interest")
async def get_open_interest(limit: int = Query(100, gt=0, lt=1000)):
    return list(OPEN_INTEREST_STORE)[-limit:]

@app.get("/orderbook")
async def get_orderbook():
    return ORDERBOOK_STORE


In [5]:
import logging
import threading
from datetime import datetime
from collections import deque
from contextlib import asynccontextmanager

from fastapi import FastAPI, Query
from cryptofeed import FeedHandler
from cryptofeed.exchanges import BinanceFutures
from cryptofeed.defines import TRADES, OPEN_INTEREST, L2_BOOK

# ─── Logging ────────────────────────────────────────────────────────────────
logging.basicConfig(level=logging.INFO)
logging.getLogger('cryptofeed').setLevel(logging.WARNING)

# ─── In-memory stores ───────────────────────────────────────────────────────
TRADES_STORE        = deque(maxlen=1000)
CANDLES_STORE       = deque(maxlen=1000)
OPEN_INTEREST_STORE = deque(maxlen=1000)
ORDERBOOK_STORE     = {}

# Helper state for 1-second candles
candle_data = {}

# ─── Cryptofeed callbacks ───────────────────────────────────────────────────
async def trade_callback(trade, _receipt_ts):
    ts = int(trade.timestamp)
    tick = {
        "timestamp": datetime.utcfromtimestamp(ts).isoformat(),
        "price":     float(trade.price),
        "volume":    float(trade.amount),
        "side":      trade.side
    }
    TRADES_STORE.append(tick)

    # Build or update current 1-second candle
    global candle_data
    if ts not in candle_data:
        if candle_data:
            prev_ts, prev_c = next(iter(candle_data.items()))
            prev_c["timestamp"] = prev_c["timestamp"].isoformat()
            CANDLES_STORE.append(prev_c)
        candle_data.clear()
        candle_data[ts] = {
            "timestamp": datetime.utcfromtimestamp(ts),
            "open":  tick["price"],
            "high":  tick["price"],
            "low":   tick["price"],
            "close": tick["price"],
            "volume": tick["volume"]
        }
    else:
        c = candle_data[ts]
        c["high"]   = max(c["high"], tick["price"])
        c["low"]    = min(c["low"], tick["price"])
        c["close"]  = tick["price"]
        c["volume"] += tick["volume"]

async def open_interest_callback(oi, _receipt_ts):
    OPEN_INTEREST_STORE.append({
        "timestamp": datetime.utcfromtimestamp(oi.timestamp).isoformat(),
        "open_interest": float(oi.open_interest)
    })

async def order_book_callback(book, _receipt_ts):
    bids = [(float(p), float(book.book.bids[p])) for p in list(book.book.bids)[:10]]
    asks = [(float(p), float(book.book.asks[p])) for p in list(book.book.asks)[:10]]
    ORDERBOOK_STORE.clear()
    ORDERBOOK_STORE.update({
        "timestamp": datetime.utcnow().isoformat(),
        "bids": bids,
        "asks": asks
    })

# ─── Lifespan handler to start Cryptofeed ──────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
    def run_feed():
        fh = FeedHandler()
        fh.add_feed(BinanceFutures(
            symbols=['BTC-USDT-PERP'],
            channels=[TRADES, OPEN_INTEREST, L2_BOOK],
            callbacks={
                TRADES:        trade_callback,
                OPEN_INTEREST: open_interest_callback,
                L2_BOOK:       order_book_callback
            }
        ))
        fh.run()
    thread = threading.Thread(target=run_feed, daemon=True)
    thread.start()
    yield
    # (optional cleanup here)

# ─── FastAPI app setup ─────────────────────────────────────────────────────
app = FastAPI(title="CryptoFeed API", lifespan=lifespan)

@app.get("/trades")
async def get_trades(limit: int = Query(100, gt=0, lt=1000)):
    return list(TRADES_STORE)[-limit:]

@app.get("/candles")
async def get_candles(limit: int = Query(100, gt=0, lt=1000)):
    return list(CANDLES_STORE)[-limit:]

@app.get("/open_interest")
async def get_open_interest(limit: int = Query(100, gt=0, lt=1000)):
    return list(OPEN_INTEREST_STORE)[-limit:]

@app.get("/orderbook")
async def get_orderbook():
    return ORDERBOOK_STORE
