In [2]:
import backoff
from typing import Any

BACKOFF_CONFIG: dict[str, Any] = {'wait_gen': backoff.expo, 'exception': Exception, 'max_value': 8}

class ClickhouseSettings:
    NODES: str = 'clickhouse-node01:9000,clickhouse-node02:9000,clickhouse-node03:9000,clickhouse-node04:9000'
    USER: str = 'user'
    PASSWORD: str = 'password'

class ClickhouseNode1:
    HOST: str = 'clickhouse-node01'
    PORT: int = 9000
    USER: str = 'user'
    PASSWORD: str = 'password'

class ClickhouseNode2:
    HOST: str = 'clickhouse-node02'
    PORT: int = 9000
    USER: str = 'user'
    PASSWORD: str = 'password'

class ClickhouseNode3:
    HOST: str = 'clickhouse-node03'
    PORT: int = 9000
    USER: str = 'user'
    PASSWORD: str = 'password'

class ClickhouseNode4:
    HOST: str = 'clickhouse-node04'
    PORT: int = 9000
    USER: str = 'user'
    PASSWORD: str = 'password'

CLICKHOUSE_CONFIG = ClickhouseSettings()
NODES = [ClickhouseNode1(), ClickhouseNode2(), ClickhouseNode3(), ClickhouseNode4()]


In [3]:
from typing import Optional, Iterator
from clickhouse_driver import Client as Clickhouse
from loguru import logger

from abc import ABC, abstractmethod
from typing import Iterator, Any


class BaseExtractor(ABC):

    @abstractmethod
    def extract(self) -> Iterator[Any]:
        '''Метод извлечения данных'''


def ch_conn_is_alive(ch_conn: Clickhouse) -> bool:
    """Функция для проверки работоспособности Clickhouse"""
    try:
        return ch_conn.execute('SHOW DATABASES')
    except Exception:
        return False


class ClickhouseExtractor(BaseExtractor):
    def __init__(
        self,
        host: str,
        port: int,
        user:str = 'default',
        password: str = '',
        alt_hosts: list[str] | None = None,
        conn: Clickhouse | None = None,
        settings: dict[str, Any] | None = None
    ) -> None:
        self._conn: Clickhouse = conn
        self._host: str = host
        self._alt_hosts: list[str] | None = alt_hosts
        self._port: int = port
        self._user: str = user
        self._password: str = password
        self._settings: dict[str, Any] | None = settings

    @property
    def conn(self) -> Clickhouse:
        if self._conn is None or not ch_conn_is_alive(self._conn):
            self._conn = self._reconnection()

        return self._conn

    @backoff.on_exception(**BACKOFF_CONFIG, logger=logger)
    def _reconnection(self) -> Clickhouse:
        logger.info('Reconnection clickhouse node "%s:%d" ...', self._host, self._port)

        if self._conn is not None:
            logger.info('Closing already exists clickhouse connector...')
            self._conn.disconnect()

        return Clickhouse(
            host=self._host,
            port=self._port,
            user=self._user,
            alt_hosts=','.join(self._alt_hosts),
            password=self._password,
            settings=self._settings,
        )

    @backoff.on_exception(**BACKOFF_CONFIG, logger=logger)
    def extract(self, query: str, limit: int = 100000) -> Iterator[Any]:
        return self.conn.query_dataframe(f'{query} LIMIT {limit}')

In [4]:
extractor = ClickhouseExtractor(
    host=NODES[0].HOST,
    port=NODES[0].PORT,
    user=CLICKHOUSE_CONFIG.USER,
    password=CLICKHOUSE_CONFIG.PASSWORD,
    alt_hosts=[f'{NODE.HOST}:{NODE.PORT}' for NODE in  NODES[1:]],
    settings={'use_numpy': True}
)

In [7]:
tradestats_df = extractor.extract((
    'SELECT count(*) from default.tradestats'
))

NumPy support is not implemented for UUID. Using generic column
NumPy support is not implemented for UUID. Using generic column


In [9]:
tradestats_df.head(5)

Unnamed: 0,id,secid,ts,pr_open,pr_high,pr_low,pr_close,pr_change,trades,vol,...,pr_vwap,trades_b,vol_b,val_b,pr_vwap_b,trades_s,vol_s,val_s,pr_vwap_s,created_at
0,6ce1b5de-1f26-4516-90df-546b9e8db474,ABRD,2023-12-05 18:40:00,252.4,252.4,251.2,252.4,0.0,23,105,...,251.9,12,63,158654.0,251.8,11,42,105854.0,252.0,2023-12-06 20:19:36
1,d63d60db-2de0-405d-8bd5-85ea0a1c8c16,ABIO,2023-12-05 18:40:00,88.1,88.3,87.62,88.12,0.0227,36,651,...,88.01,17,244,214565.0,87.94,19,407,358388.0,88.06,2023-12-06 20:19:36
2,0f647cdd-0c90-4c82-89dc-08862aa141d1,ABRD,2023-12-05 18:35:00,252.4,252.4,250.8,252.4,0.0,30,170,...,251.5,19,131,329246.0,251.3,11,39,98334.0,252.1,2023-12-06 20:19:36
3,91175085-2e53-4cba-9bf9-b22cd516fdc9,ABIO,2023-12-05 18:35:00,88.2,88.3,88.2,88.3,0.1134,5,64,...,88.29,3,42,37085.0,88.3,2,22,19420.0,88.28,2023-12-06 20:19:36
4,6ffda51f-c6a6-4284-9dc6-d2f89918eea0,ABRD,2023-12-05 18:30:00,252.0,252.6,251.2,251.6,-0.1587,39,204,...,252.1,19,75,189128.0,252.2,20,129,325240.0,252.1,2023-12-06 20:19:36


In [10]:
tradestats_df.shape

(97276, 23)

In [11]:
tradestats_df.columns

Index(['id', 'secid', 'ts', 'pr_open', 'pr_high', 'pr_low', 'pr_close',
       'pr_change', 'trades', 'vol', 'val', 'pr_std', 'disb', 'pr_vwap',
       'trades_b', 'vol_b', 'val_b', 'pr_vwap_b', 'trades_s', 'vol_s', 'val_s',
       'pr_vwap_s', 'created_at'],
      dtype='object')

In [12]:
orderstats_df = extractor.extract((
    'SELECT count(*) from default.orderstats'
))

In [14]:
orderstats_df.head(5)

Unnamed: 0,id,secid,ts,put_orders,put_orders_b,put_orders_s,put_vol,put_vol_b,put_vol_s,put_val,...,cancel_vol_b,cancel_vol_s,cancel_val,cancel_val_b,cancel_val_s,put_vwap_b,put_vwap_s,cancel_vwap_b,cancel_vwap_s,created_at


In [21]:
orderstats_df.shape

(0, 26)

In [22]:
orderstats_df.columns

Index(['id', 'secid', 'ts', 'put_orders', 'put_orders_b', 'put_orders_s',
       'put_vol', 'put_vol_b', 'put_vol_s', 'put_val', 'put_val_b',
       'put_val_s', 'cancel_orders', 'cancel_orders_b', 'cancel_orders_s',
       'cancel_vol', 'cancel_vol_b', 'cancel_vol_s', 'cancel_val',
       'cancel_val_b', 'cancel_val_s', 'put_vwap_b', 'put_vwap_s',
       'cancel_vwap_b', 'cancel_vwap_s', 'created_at'],
      dtype='object')

In [23]:
obstats_df = extractor.extract((
    'SELECT count(*) from default.obstats'
))

NumPy support is not implemented for UUID. Using generic column
NumPy support is not implemented for UUID. Using generic column


In [24]:
obstats_df.head(5)

Unnamed: 0,id,secid,ts,spread_bbo,spread_lv10,spread_1mio,levels_b,levels_s,vol_b,vol_s,...,val_s,imbalance_vol,imbalance_vol_bbo,imbalance_val,imbalance_val_bbo,vwap_b,vwap_s,vwap_b_1mio,vwap_s_1mio,created_at
0,a99f2797-bd8c-425b-a28f-447381fc15bc,ABRD,2023-12-05 18:40:00,-95.6,101.9,-13.7,132,182,5774,6885,...,18825899.0,-0.09,-0.02,-0.16,-0.01,237.5,273.4,253.0,252.6,2023-12-06 20:19:40
1,16e2f214-852d-453d-9d49-71e93ff8b64b,ABIO,2023-12-05 18:40:00,-70.2,80.8,116.2,229,332,8965,11532,...,11203929.0,-0.13,0.8,-0.19,0.8,84.6,97.16,87.57,88.59,2023-12-06 20:19:40
2,eb50ff31-7403-4161-bc45-e84b4cb0b8da,AFKS,2023-12-05 18:40:00,-2.5,18.0,3.6,539,578,43949,50909,...,87727349.0,-0.07,-0.51,-0.13,-0.51,15.404,17.232,16.092,16.097,2023-12-06 20:19:40
3,39a0d418-f48c-49d8-b6e8-11d3136a35bd,ABRD,2023-12-05 18:05:00,15.8,187.7,59.5,129,172,4389,6482,...,17811704.0,-0.19,0.57,-0.27,0.57,234.4,274.8,253.0,254.5,2023-12-06 20:19:40
4,4bfa8fc8-ffcd-4a51-bf77-0af7b9270175,AFKS,2023-12-05 18:05:00,1.0,27.4,8.2,524,573,42880,49237,...,85031098.0,-0.07,0.35,-0.13,0.35,15.402,17.27,16.084,16.097,2023-12-06 20:19:40


In [25]:
obstats_df.shape

(100000, 21)

In [26]:
obstats_df.columns

Index(['id', 'secid', 'ts', 'spread_bbo', 'spread_lv10', 'spread_1mio',
       'levels_b', 'levels_s', 'vol_b', 'vol_s', 'val_b', 'val_s',
       'imbalance_vol', 'imbalance_vol_bbo', 'imbalance_val',
       'imbalance_val_bbo', 'vwap_b', 'vwap_s', 'vwap_b_1mio', 'vwap_s_1mio',
       'created_at'],
      dtype='object')

In [15]:
candles_df = extractor.extract((
    'SELECT count(*) from default.candles'
))

NumPy support is not implemented for UUID. Using generic column
NumPy support is not implemented for UUID. Using generic column


In [16]:
candles_df.head(5)

Unnamed: 0,id,secid,open,close,high,low,value,volume,begin,end,created_at
0,d12150ea-de06-4e80-a52f-b2b11090fc23,ABIO,37.07,37.07,37.07,37.07,3707.0,100.0,2021-06-25 17:53:00,2021-06-25 17:53:59,2023-12-06 20:27:45
1,d2a6aa01-9d35-4537-b7e6-253e636fd407,ABIO,37.01,37.0,37.01,37.0,118404.0,3200.0,2021-06-25 17:45:00,2021-06-25 17:45:59,2023-12-06 20:27:45
2,046480ef-c9fa-4718-8a01-8dad115c42e8,ABIO,37.01,37.0,37.08,37.0,25911.0,700.0,2021-06-25 17:42:00,2021-06-25 17:42:59,2023-12-06 20:27:45
3,1583c04f-5dd2-44a5-a3e4-03b31e28cfed,ABIO,37.0,37.02,37.02,37.0,29612.0,800.0,2021-06-25 17:40:00,2021-06-25 17:40:59,2023-12-06 20:27:45
4,6158e015-2c06-47b7-952b-4462e1a8b2e5,ABIO,36.9,37.0,37.0,36.9,184904.0,5000.0,2021-06-25 17:37:00,2021-06-25 17:37:59,2023-12-06 20:27:45


In [17]:
candles_df.shape

(98992, 11)

In [18]:
candles_df.columns

Index(['id', 'secid', 'open', 'close', 'high', 'low', 'value', 'volume',
       'begin', 'end', 'created_at'],
      dtype='object')