In [1]:
from sqlalchemy import create_engine, Column, MetaData, literal, text, func
import os

from clickhouse_sqlalchemy import (
    Table, make_session, get_declarative_base, types, engines
)

from datetime import datetime
password = os.environ.get("CLICKHOUSE_PWD")
conn_str = f"clickhouse://default:{password}@localhost:8123/quant"

engine = create_engine(conn_str)
session = make_session(engine)
metadata = MetaData()

Base = get_declarative_base(metadata=metadata)

In [2]:
engine

Engine(clickhouse://default:***@localhost:8123/quant)

In [3]:
class Rqh(Base):
    date = Column(types.DateTime64, primary_key=True)
    code = Column(types.String)
    close = Column(types.Float64)
    chgPct = Column(types.Float32)
    price_avg = Column(types.Float32)
    benefit_pct = Column(types.Float32)
    found_in = Column(types.Float64)
    found_in_master = Column(types.Float64)
    big_order_dde_pct = Column(types.Float32)
    retail_investor_dde = Column(types.Float32)
    concentration_70 = Column(types.Float32)
    concentration_90 = Column(types.Float32)
    ceil_price_90 = Column(types.Float32)
    floor_price_90 = Column(types.Float32)

    __table_args__ = (
        engines.MergeTree(
            partition_by=func.toYYYYMM(date),
            order_by=(date, code),
            primary_key=(date, code)
        ),
    )
    

In [None]:
# Emits CREATE TABLE statement
Rqh.__table__.create(bind=engine)

In [5]:
import pandas as pd

df_rqh = pd.read_csv("20100104_rqh.csv")
df_rqh = df_rqh.drop(["Unnamed: 0.1", "Unnamed: 0"], axis=1)

In [8]:
from datetime import datetime
from sre_compile import isstring
import numpy as np

def get_rqh_daily_colums_map(signal: datetime) -> dict:
    result = {}
    date_str = signal.date().isoformat().replace('-', '')
    result['code'] = '股票代码'
    result['close'] = '最新价'
    result['chgPct'] = '最新涨跌幅'
    result['price_avg'] = f'平均成本[{date_str}]'
    result['benefit_pct'] = f'收盘获利[{date_str}]'
    result['found_in'] = f'资金流向[{date_str}]'
    result['found_in_master'] = f'主力资金流向[{date_str}]'
    result['big_order_dde_pct'] = f'dde大单净量[{date_str}]'
    result['retail_investor_dde'] = f'dde散户数量[{date_str}]'
    result['concentration_70'] = f'集中度70[{date_str}]'
    result['concentration_90'] = f'集中度90[{date_str}]'
    result['ceil_price_90'] = f'90%成本上限[{date_str}]'
    result['floor_price_90'] = f'90%成本下限[{date_str}]'
    return result

def transform_money(x:str):
    '''
    将XXX万和XXX亿转换成xxx，单元是元
    '''
    if not isstring(x):
        if np.isnan(x):
            return x
        x = str(x)
    index = x.find('万')
    if index > 0:
        return float(x[0:index]) * 10000
    index = x.find('亿')
    if index > 0:
        return float(x[0:index]) * 100000000
    return float(x)

def save_rqh_daily(sess, signal:datetime, df_data:pd.DataFrame) -> int:
    #处理字段名称
    df_saving = pd.DataFrame()
    dict_colums = get_rqh_daily_colums_map(signal)
    for key in dict_colums.keys():
        if dict_colums[key] in df_data.keys():
            df_saving[key] = df_data[dict_colums[key]]
        else:
            print(f"save_rqh_daily without column {dict_colums[key]}")
            df_saving[key] = 0
    df_saving['date'] = signal
    df_saving['found_in'] = df_saving['found_in'].apply(lambda x: transform_money(x))
    df_saving['found_in_master'] = df_saving['found_in_master'].apply(lambda x: transform_money(x))
    try:
        if df_saving.dtypes['big_order_dde_pct'].name == 'object':
            df_saving['big_order_dde_pct'] = pd.to_numeric(df_saving['big_order_dde_pct'].apply(lambda x: x if len(x) < 10 else 0))
    except:
        pass
    table = Rqh
    df_saving.drop_duplicates(inplace=True)
    rows = df_saving.to_sql(table.__tablename__, sess.bind, index=False, if_exists='append')

    return rows if rows is not None else 0


In [9]:
save_rqh_daily(session, datetime(2010,1,4), df_rqh)

save_rqh_daily without column dde散户数量[20100104]




AttributeError: 'Session' object has no attribute 'cursor'

## Read

In [None]:
query = session.query(Rqh)

In [None]:
query.all()[-1].code

In [None]:
str(query)

In [None]:
df_queried = pd.read_sql_query(query.statement, engine.connect())
df_queried.head()

In [None]:
import polars as pl

pl_queried = pl.read_database(query.statement, engine.connect())
pl_queried.head()