**Imports**

In [1]:
import httpx
import shutil
import asyncio
import pandas as pd

from enum import Enum
from os import path, mkdir, environ
from decimal import Decimal
from datetime import datetime
from pydantic.dataclasses import dataclass

**Models**

In [2]:
class Timeframe(Enum):
    MINUTLY = "5m"
    HOURLY = "1h"
    DAILY = "1d"
    WEEKLY = "1w"
    MONTHLY = "1M"

@dataclass
class Candle:
    time: datetime
    open: Decimal
    high: Decimal
    low: Decimal
    close: Decimal
    volume: Decimal

@dataclass
class Ticker:
    symbol: str
    price: Decimal
    volume: Decimal

**Utility**

In [3]:
def highest_volume(tickers: list[Ticker]) -> list[Ticker]:
    """Sorts tickers based on volume and removes tickers with no volume."""
    filtered = filter(lambda x: x.price > 0 and x.volume > 0, tickers)
    tickers = sorted(filtered, key=lambda x: x.price * x.volume, reverse=True)
    return tickers

def filter_symbols(market: str, tickers: list[Ticker], blacklist: list[str] = []) -> list[Ticker]:
    """Returns list of tickers denominated in the provided market, excluding symbols that are blacklisted."""
    filtered = filter(lambda x: x.symbol.endswith(market) and not blacklisted(x.symbol, blacklist), tickers)
    return list(filtered)

def blacklisted(symbol: str, blacklist: list[str]) -> bool:
    """Checks if a blacklisted symbol is part of a given market and returns True if it is."""
    if len(blacklist) > 0:
        for item in blacklist:
            if symbol.startswith(item) or symbol.endswith(item):
                return True
    return False

**Data Sources**

In [27]:
class Coinalyze():
    BASEURL = "https://api.coinalyze.net"
    ENDPOINTS = {
        "spot": "/v1/spot-markets",
        "future": "/v1/future-markets",
        "openinterest": "/v1/open-interest",
        "funding": "/v1/funding-rate"
    }

    def __init__(self) -> None:
        self.key = environ.get("COINALYZE", "")
        if self.key == "":
            raise Exception("You need to provide a Coinalyze API key in a COINALYZE env variable!")
        
    def spot_markets(self):
        url = Coinalyze.BASEURL + Coinalyze.ENDPOINTS["spot"]
        headers = {"api_key" : self.key}
        r = httpx.get(url, headers=headers)

        if r.status_code != httpx.codes.OK:
            raise httpx.HTTPError(r.json())
        
        return r.json()

    def futures_markets(self):
        url = Coinalyze.BASEURL + Coinalyze.ENDPOINTS["future"]
        headers = {"api_key" : self.key}
        r = httpx.get(url, headers=headers)

        if r.status_code != httpx.codes.OK:
            raise httpx.HTTPError(r.json())
        
        return r.json()
        
    def open_interest(self, markets: list[str]):
        if len(markets) > 20:
            raise Exception("Coinalyze imposes a maximum of 20 markets per request!")

        url = Coinalyze.BASEURL + Coinalyze.ENDPOINTS["openinterest"]
        headers = { "api_key" : self.key }
        payload = { "symbols": ",".join(markets), "convert_to_usd": "true" }
        r = httpx.get(url, headers=headers, params=payload)

        if r.status_code != httpx.codes.OK:
            raise httpx.HTTPError(r.json())
        
        return r.json()

    def funding(self, markets: list[str]):
        if len(markets) > 20:
            raise Exception("Coinalyze imposes a maximum of 20 markets per request!")

        url = Coinalyze.BASEURL + Coinalyze.ENDPOINTS["funding"]
        headers = { "api_key" : self.key }
        payload = { "symbols": ",".join(markets) }
        r = httpx.get(url, headers=headers, params=payload)

        if r.status_code != httpx.codes.OK:
            raise httpx.HTTPError(r.json())
        
        return r.json()

class Binance():
    BASEURL = "https://api.binance.com"
    FUTURES = "https://fapi.binance.com"
    ENDPOINTS = {
        "ticker": "/api/v3/ticker/24hr",
        "price": "/api/v3/ticker/price",
        "kline": "/api/v3/klines"
    }

    def __init__(self) -> None:
        pass

    def markets(self) -> list[Ticker]:
        url = Binance.BASEURL + Binance.ENDPOINTS["ticker"]
        r = httpx.get(url)

        if r.status_code != httpx.codes.OK:
            raise httpx.HTTPError(r.json())

        return [Ticker(ticker["symbol"], ticker["lastPrice"], ticker["volume"]) for ticker in r.json()]

    async def kline(self, client: httpx.AsyncClient, symbol: str, interval: Timeframe) -> list[Candle]:
        url = Binance.BASEURL + Binance.ENDPOINTS["kline"]
        payload = { "symbol": symbol, "interval": interval.value }
        r = await client.get(url, params=payload)

        if r.status_code != httpx.codes.OK:
            raise httpx.HTTPError(r.json())

        return [Candle(*kline[:6]) for kline in r.json()]

In [28]:
coinalyze = Coinalyze()
spot = coinalyze.spot_markets()
futures = coinalyze.futures_markets()

oi = coinalyze.open_interest(["BTCUSD_PERP.A", "ETHUSD_PERP.A"])
fu = coinalyze.funding(["BTCUSD_PERP.A", "ETHUSD_PERP.A"])

**Data Selection**

In [5]:
binance = Binance()
markets = binance.markets()

blacklist = ["UPUSDT", "DOWNUSDT", "BEARUSDT", "BULLUSDT"]
currencies = ["EUR", "JPY", "GBP", "CAD", "CNY", "CHF", "AUD"]
stablecoins = ["TUSD", "BUSD", "USDC", "PAX", "USDP", "DAI", "GUSD", "USDD", "USTC", "UST", "USDS"]

usdt_markets = filter_symbols("USDT", markets, blacklist + currencies + stablecoins)
usdt_volume = highest_volume(usdt_markets)

btc_markets = filter_symbols("BTC", markets)
btc_volume = highest_volume(btc_markets)

**Data Download**

In [6]:
usdt_dl = usdt_volume[:100]
btc_dl = btc_volume[:100]

print(f"Downloading {len(usdt_dl)} USDT markets...")
usdt_daily, usdt_hourly, usdt_minutly = [], [], []
async with httpx.AsyncClient() as client:
    usdt_symbols = [ticker.symbol for ticker in usdt_dl]
    for symbol in usdt_symbols:
        usdt_daily.append(binance.kline(client, symbol, Timeframe.DAILY))
        usdt_hourly.append(binance.kline(client, symbol, Timeframe.HOURLY))
        usdt_minutly.append(binance.kline(client, symbol, Timeframe.MINUTLY))
    usdt_daily = await asyncio.gather(*usdt_daily)
    usdt_hourly = await asyncio.gather(*usdt_hourly)
    usdt_minutly = await asyncio.gather(*usdt_minutly)
    usdt_daily = dict(zip(usdt_symbols, usdt_daily))
    usdt_hourly = dict(zip(usdt_symbols, usdt_hourly))
    usdt_minutly = dict(zip(usdt_symbols, usdt_minutly))
print("Finished downloading.")

Downloading 100 USDT markets...
Finished downloading.


**Data Persistence**

Execute to save the downloaded kline data into individual csv-files for each downloaded market.

In [7]:
data_path = path.join("..", "data")

if path.exists(data_path):
    shutil.rmtree(data_path)

mkdir(data_path)

klines = {
    Timeframe.MINUTLY: usdt_minutly,
    Timeframe.HOURLY: usdt_hourly
}

symbols = {
    "USDT": usdt_dl,
    "BTC": btc_dl
}

for symbol, data in symbols.items():
    df = pd.DataFrame.from_dict(data)
    df.to_csv(path.join(data_path, f"{symbol}_markets.csv"), index=False)

for tf, data in klines.items():
    for symbol, kline in data.items():
        df = pd.DataFrame.from_dict(kline).set_index("time")
        df.to_csv(path.join(data_path, f"{symbol}_{tf.name.lower()}.csv"))

# Remove data from memory
df = None
usdt_minutly = None
usdt_hourly = None