# 识别计算跳跃收益

## 导入模块

In [60]:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import feather
import math
from scipy import stats

## 读入测试数据

In [41]:
price_1m = feather.read_dataframe('../data/2024/StockPriceK1m_20240102.feather')
price_1m['date'] = pd.to_datetime(price_1m['date'])
price_1d = feather.read_dataframe('../data/StockPriceK1d_20240630.feather')
price_1d['date'] = pd.to_datetime(price_1d['date'])

In [51]:
issue = '000001'
price_1m = price_1m.loc[price_1m['issue'] == issue]
start_price = price_1m.loc[price_1m['time'] % 500 == 0, 'open'].to_numpy()
end_price = price_1m.loc[price_1m['time'] % 500 == 400, 'open'].to_numpy()

idx_d1 = (price_1d['issue'] == issue) &(price_1d['date'] == '20240102')
idx_d2 = (price_1d['issue'] == issue) &(price_1d['date'] == '20240103')
start_price = np.append(start_price, price_1d.loc[idx_d1, 'close'].to_numpy())
end_price = np.append(end_price, price_1d.loc[idx_d2, 'open'].to_numpy())

ret = (end_price - start_price) / start_price
log_ret = np.log(1 + ret)

## 定义跳跃统计量

In [68]:
def mu(p: float):
    return (2 ** (p / 2)) * math.gamma((p + 1) / 2) / np.sqrt(np.pi)

def Omega_SwV(log_ret):
    abs_log_ret = np.abs(log_ret)
    prod_log_ret = (abs_log_ret[:-5] * 
                    abs_log_ret[1:-4] *
                    abs_log_ret[2:-3] *
                    abs_log_ret[3:-2] *
                    abs_log_ret[4:-1] *
                    abs_log_ret[5:])
    sum_prod = prod_log_ret.sum()
    n = len(log_ret)
    coef = (mu(6) / 9) * ((n ** 3) * (mu(1) ** -6) / (n - 5))
    return coef * sum_prod

def V_01(log_ret):
    abs_log_ret = np.abs(log_ret)
    prod_log_ret = abs_log_ret[:-1] * abs_log_ret[1]
    sum_prod = prod_log_ret.sum()
    coef = 1 / mu(1)
    return coef * sum_prod

def RV_N(log_ret):
    return (log_ret * log_ret).sum()

def SwV_N(ret, log_ret):
    return 2 * (ret - log_ret).sum()

def JS(ret, log_ret):
    n = len(ret)
    js = n * (V_01(log_ret) / np.sqrt(Omega_SwV(log_ret))) * (1 - RV_N(log_ret) / SwV_N(ret, log_ret))
    return js

def pvalue(js: float):
    cdf = stats.norm.cdf(js, loc=0, scale=1)
    return 2 * min(cdf, 1 - cdf)

js = JS(ret, log_ret)
pvalue(js)

np.float64(4.517460342462069e-06)

## 识别跳跃, 计算收益

In [89]:
def jump_identify(ret, log_ret):
    n = len(ret)
    jump = np.full(n, False, dtype=bool)
    med = np.median(ret)
    log_med = np.median(log_ret)
    ret_c = ret.copy()
    log_ret_c = log_ret.copy()
    js0 = JS(ret_c, log_ret_c)
    p = pvalue(js0)
    
    while (p < 0.05):
        js = np.zeros(n)
        for i in range(n):
            r = ret_c.copy()
            lr = log_ret_c.copy()
            r[i] = med
            lr[i] = log_med
            js[i] = JS(r, lr)
        js_diff = np.abs(js0) - np.abs(js)
        idx_max = np.argmax(js_diff)
        jump[idx_max] = True
        ret_c[idx_max] = med
        log_ret_c[idx_max] = log_med
        js0 = JS(ret_c, log_ret_c)
        p = pvalue(js0)

    return np.any(jump), log_ret[jump].sum()

True
-0.0042689499195760015
