# UMR

## 导入模块

In [1]:
import numpy as np
import pandas as pd
import feather
from matplotlib import pyplot as plt
import os
from tqdm.notebook import tqdm
import sunlandsdatasdk as sd

%load_ext line_profiler

## 读入数据

In [2]:
start_date = pd.to_datetime('2023-01-01')
end_date = pd.to_datetime('2023-12-31')

### 市值 (用于计算流通股本)

In [3]:
# sd.auth('intern', 'a8cc57a3812802722e7c7a5cd92c2140')

# issues = sd.get_index_stocks('999998', date=end_date)
# market_cap = sd.get_ricequant_factor(issues, 20230101, 20231231, ['market_cap', 'market_cap_2'])
# market_cap = market_cap.reset_index()
# market_cap['date'] = pd.to_datetime(market_cap['date'])

# free_ratio = market_cap['market_cap_2'].div(market_cap['market_cap']).mean()
# market_cap['market_cap_2'] = market_cap['market_cap_2'].fillna(market_cap['market_cap'] * free_ratio)

# feather.write_dataframe(market_cap, '../data/market_cap.feather')

In [4]:
market_cap = feather.read_dataframe('../data/market_cap.feather')

### 日线数据

In [5]:
price_1d = feather.read_dataframe('../data/StockPriceK1d_20241231.feather')
price_1d = price_1d[(price_1d['date'] >= start_date) & (price_1d['date'] <= end_date)]
price_1d = pd.merge(
    price_1d,
    market_cap,
    on=['date', 'issue'],
    how='left'
)
price_1d['free_float'] = price_1d['market_cap_2'].div(price_1d['close'])
price_1d = price_1d.set_index('date')

### 指数数据

In [6]:
hs500 = feather.read_dataframe('../data/IndexPriceK1m_000905.feather')
hs500['ret_index'] = hs500['close'] / hs500['close'].shift(1) - 1

hs500['date'] = pd.to_datetime(hs500['date'], format='ISO8601')
hs500 = hs500[(hs500['date'] >= start_date) & (hs500['date'] <= end_date)]
hs500 = hs500.set_index('date')

trade_time = hs500['time']
map_trade_time = {t: t - 100 for t in trade_time}
map_trade_time[100000] = 95900
map_trade_time[110000] = 105900
map_trade_time[140000] = 135900
map_trade_time[150000] = 145900
hs500['time'] = hs500['time'].apply(map_trade_time.get)

### 分钟线测试数据

In [7]:
price_1m_fields = ['date', 'time', 'issue', 'high', 'low', 'close']
price_1d_fields = ['preclose', 'free_float']
def price_1m_read(date:np.datetime64):
    year = date.year
    date_str = date.strftime('%Y%m%d')
    price_1m = feather.read_dataframe(f'../data/StockPriceK1m/{year}/StockPriceK1m_{date_str}.feather')
    price_1m['date'] = pd.to_datetime(price_1m['date'], format='ISO8601')
    
    price_1m = pd.merge(
        price_1m[price_1m_fields],
        price_1d.loc[date, ['issue'] + price_1d_fields],
        on='issue',
        how='left'
    )
    price_1m['close_prev'] = (
        price_1m
            .groupby('issue')['close']
            .shift(1).fillna(price_1m['preclose'])
    )
    price_1m['ret'] = price_1m['close'] / price_1m['close_prev'] - 1
    
    price_1m = pd.merge(
        price_1m,
        hs500.loc[date, ['time', 'ret_index']],
        on='time',
        how='left'
    )
    price_1m['excess'] = price_1m['ret'] - price_1m['ret_index']
    # price_1m['excess'] = price_1m['ret']
    
    return price_1m

In [8]:
def price_1m_read_fast(date: pd.Timestamp) -> pd.DataFrame:
    year = date.year
    date_str = date.strftime('%Y%m%d')

    price_1m = feather.read_dataframe(f'../data/StockPriceK1m/{year}/StockPriceK1m_{date_str}.feather')
    price_1m['date'] = pd.to_datetime(price_1m['date'], format='ISO8601')
    price_1m = price_1m[price_1m_fields]

    # 减少后续分组/映射的开销（按你的真实数据类型选择）
    # 若 issue 是代码（数字或字符串不太多），用 category
    if price_1m['issue'].dtype != 'category':
        price_1m['issue'] = price_1m['issue'].astype('category')

    # ===== 用 map 代替 merge: preclose by issue =====
    # 期望 price_1d.loc[date, ['issue','preclose']] 返回当日一张表
    preclose_df = price_1d.loc[date, ['issue'] + price_1d_fields]
    # 建立以 issue 为索引的 Series（类型与 price_1m['issue'] 对齐）
    preclose_s = preclose_df.set_index('issue')[price_1d_fields]
    # 若 issue 是分类，确保映射索引类型一致
    if isinstance(price_1m['issue'].dtype, pd.CategoricalDtype):
        preclose_s.index = preclose_s.index.astype(price_1m['issue'].dtype.categories.dtype)

    for field in price_1d_fields:
        price_1m[field] = price_1m['issue'].map(preclose_s[field])

    # ===== 组内上一分钟收盘 =====
    # 避免排序带来的额外成本；确保原始数据已经按时间顺序
    # 若不确定，请先按 ['issue','time'] 排序一次（只在必要时）：
    # price_1m.sort_values(['issue','time'], inplace=True, kind='mergesort')
    close_prev = price_1m.groupby('issue', sort=False, observed=False)['close'].shift(1)
    # 首分钟用 preclose 补
    price_1m['close_prev'] = close_prev.where(close_prev.notna(), price_1m['preclose'])

    # 收益率
    # ret = close/prev - 1（完全矢量化）
    price_1m['ret'] = price_1m['close'].div(price_1m['close_prev']).sub(1.0)

    # ===== 用 map 代替 merge: index return by time =====
    # 期望 hs500.loc[date, ['time','ret_index']] 返回当日分钟级指数收益
    idx_df = hs500.loc[date, ['time', 'ret_index']]
    ret_index_s = idx_df.set_index('time')['ret_index']

    price_1m['ret_index'] = price_1m['time'].map(ret_index_s)

    # 超额收益
    price_1m['excess'] = price_1m['ret'].sub(price_1m['ret_index'])

    return price_1m

In [9]:
date = pd.to_datetime('2023-01-03')
price_1m = price_1m_read_fast(date)

## 计算风险指标

### 计算调整后风险系数

In [10]:
def risk_calc(r:pd.Series, d:int=10):
    return r.rolling(d, min_periods=1).mean() - r

### 真实波动

In [11]:
def tr_calc(price_1m:pd.DataFrame):
    tr = price_1m.copy()
    tr['tr1'] = tr['high'] - tr['low']
    tr['tr2'] = np.abs(tr['high'] - tr['close_prev'])
    tr['tr3'] = np.abs(tr['low'] - tr['close_prev'])
    tr['r'] = tr[['tr1', 'tr2', 'tr3']].max(axis=1) / tr['close_prev']
    return tr[['date', 'time', 'issue', 'r']]
tr = tr_calc(price_1m)
tr['risk'] = tr.groupby('issue')['r'].transform(risk_calc)

  tr['risk'] = tr.groupby('issue')['r'].transform(risk_calc)


### 换手率

## 计算动量反转因子

### 计算单日内反转因子

In [12]:
def rev_calc(date:np.datetime64, price_1m:pd.DataFrame, risk:pd.DataFrame=None, m:int=60, decay:bool=True):
    m = 60
    H = m / 2
    weight = 2 ** ((np.arange(m) - m) / H)
    weight = weight / weight.sum()

    ret_risk = price_1m[['time', 'issue', 'excess']].copy()
    if risk is None:
        ret_risk['risk'] = -1
    else:
        ret_risk = pd.merge(
            ret_risk,
            risk[['time', 'issue', 'risk']],
            on=['time', 'issue'],
            how='left'
        ).sort_values(['issue', 'time'])
    weight = np.tile(weight, len(ret_risk['issue'].unique()))
    
    start_time = np.array([93000, 103000, 130000, 135400])
    end_time = np.array([102900, 112900, 135900, 145300])
    rev = None
    for st, et in zip(start_time, end_time):
        rr = ret_risk[(ret_risk['time'] >= st) & (ret_risk['time'] <= et)].copy()
        if not decay:
            rr['weight'] = 1
        else:
            rr['weight'] = weight

        rr['rev'] = rr['weight'] * rr['risk'] * rr['excess']
        rev_time = (
            rr
                .groupby('issue')['rev']
                .sum().reset_index()
        )
        rev_time['time'] = et
        rev = pd.concat([rev, rev_time])
    rev['date'] = date
    rev = rev.sort_values(['issue', 'time']).reset_index(drop=True)
    return rev

In [13]:
def rev_calc_fast(date: np.datetime64,
                  price_1m: pd.DataFrame,
                  risk: pd.DataFrame | None = None,
                  m: int = 60,
                  decay: bool = True) -> pd.DataFrame:
    # ===== 1) 预处理与dtype =====
    # 只取用到的列，避免多余拷贝
    df = price_1m.loc[:, ['time', 'issue', 'excess']].copy()

    # 更紧凑的dtype：加速比较/分组；issue考虑转为分类
    if not isinstance(df['issue'].dtype, pd.CategoricalDtype):
        df['issue'] = df['issue'].astype('category')

    # ===== 2) left join：避免排序、避免copy =====
    if risk is None:
        # 直接常量列，避免后续merge
        df['risk'] = -1.0
    else:
        # 同样做列与dtype瘦身
        r = risk.loc[:, ['time', 'issue', 'risk']].copy()
        if not isinstance(r['issue'].dtype, pd.CategoricalDtype):
            # 与左表对齐分类，避免合并后再重编码
            r['issue'] = r['issue'].astype(df['issue'].dtype)
        # merge 不排序（sort=False），并避免多余拷贝（copy=False）
        df = df.merge(r, on=['time', 'issue'], how='left', sort=False, copy=False)

    # ===== 3) 一次排序 + 权重映射 =====
    # 只排序一次（稳定归并排序）。后面不再排序全表。
    df.sort_values(['issue', 'time'], kind='mergesort', inplace=True, ignore_index=True)

    if decay:
        H = m / 2
        w = 2.0 ** ((np.arange(m) - m) / H)
        w /= w.sum()
        w = np.tile(w, len(df['issue'].unique()))
    else:
        w = 1.0

    # ===== 4) 在4个时间窗内聚合：用 numpy bincount 替代 groupby.sum =====
    start_time = np.array([93000, 103000, 130000, 135400], dtype=np.int32)
    end_time   = np.array([102900, 112900, 135900, 145300], dtype=np.int32)

    out_frames = []
    # 预取列为ndarray，避免反复取Series开销
    t_vals = df['time'].to_numpy()
    issue_vals = df['issue'].to_numpy()
    # 将分类直接转codes，后续bincount更快
    if isinstance(df['issue'].dtype, pd.CategoricalDtype):
        issue_codes = df['issue'].cat.codes.to_numpy()
        issue_uniques = df['issue'].cat.categories
    else:
        # 回退：factorize
        issue_codes, uniques = pd.factorize(issue_vals, sort=False)
        issue_uniques = pd.Index(uniques)

    risk_vals = df['risk'].to_numpy(dtype='float64')
    exc_vals = df['excess'].to_numpy(dtype='float64')

    # 逐窗mask + bincount聚合
    for st, et in zip(start_time, end_time):
        mask = (t_vals >= st) & (t_vals <= et)
        if not mask.any():
            continue

        codes_sub = issue_codes[mask]
        rev_prod = (w * risk_vals[mask] * exc_vals[mask])

        # bincount按类别求和，速度远快于 groupby('issue').sum()
        # minlength 保证完整长度，避免回表错位
        sums = np.bincount(codes_sub, weights=rev_prod, minlength=len(issue_uniques))

        # 只取在此窗出现过的issue（提升下游concat效率）
        present = np.unique(codes_sub)
        rev_time = pd.DataFrame({
            'issue': issue_uniques.take(present),
            'rev':   sums[present]
        })
        rev_time['time'] = np.int32(et)
        out_frames.append(rev_time)

    # ===== 5) 拼接 + 标日期 + 仅对输出排序 =====
    rev = pd.concat(out_frames, ignore_index=True) if out_frames else \
          pd.DataFrame(columns=['issue', 'rev', 'time'])
    rev['date'] = date
    rev.sort_values(['issue', 'time'], kind='mergesort', inplace=True, ignore_index=True)
    return rev

### 遍历所有交易日

In [14]:
# trade_date = price_1d.index.sort_values().unique()
# label = 'tr_240m'
# risk_func = tr_calc
# risk_prev = None
# os.makedirs(f'../data/factor_rev/{label}_rev/', exist_ok=True)
# for date in tqdm(trade_date):
#     year = date.year
#     date_str = date.strftime('%Y%m%d')
#     os.makedirs(f'../data/factor_rev/{label}_rev/{year}/', exist_ok=True)
#     price_1m = price_1m_read_fast(date)
    
#     risk = risk_func(price_1m)
#     risk_2d = pd.concat([risk_prev, risk])
#     risk_prev = risk
#     risk_2d['risk'] = risk_2d.groupby('issue', observed=False)['r'].transform(risk_calc, d=240)
#     risk = risk_2d[risk_2d['date'] == date]
    
#     rev = rev_calc_fast(date, price_1m, risk=risk, decay=True)
#     feather.write_dataframe(rev, f'../data/factor_rev/{label}_rev/{year}/{label}_rev_{date_str}.feather')
#     del price_1m, risk, rev

In [15]:
# def datetime_calc(date:pd.Series, time:pd.Series):
#     hh = time // 10000
#     mm = (time % 10000) // 100
#     ss = time % 100
#     timedelta = pd.to_timedelta(hh, 'h') + pd.to_timedelta(mm, 'm') + pd.to_timedelta(ss, 's')
#     datetime = date + timedelta
#     return datetime

# rev = None
# for date in trade_date:
#     year = date.year
#     date_str = date.strftime('%Y%m%d')
#     rev_daily = feather.read_dataframe(f'../data/factor_rev/{label}_rev/{year}/{label}_rev_{date_str}.feather')
#     rev = pd.concat([rev, rev_daily])
# rev['datetime'] = datetime_calc(rev['date'], rev['time'])
# rev = rev.reset_index(drop=True)
# feather.write_dataframe(rev, f'../data/factor_rev/{label}_rev/{label}_rev.feather')