In [129]:
import pandas as pd
import numpy as np
import re
import akshare as ak
import pandas_ta as ta

In [175]:
def get_stock_history_info(stock_code):
    result = ak.stock_zh_a_hist(symbol=stock_code, adjust='qfq').rename(
            columns={
                "日期": "datetime",
                "开盘": "open",
                "最高": "high",
                "最低": "low",
                "收盘": "close",
                "成交量": "volume",
                "成交额": "turnover",
                "振幅": "amplitude",
                "涨跌幅": "change_pct",
                "涨跌额": "change_amount",
                "换手率": "turnover_rate",
            }
        )
    result = result[['datetime', 'open', 'high', 'low', 'close', 'volume', 'turnover', 'turnover_rate']]
    result.insert(0, 'stock_code', stock_code)
    return result

def get_vwap_col(dataframe):
    tp = (dataframe['high'] + dataframe['low'] + dataframe['close']) / 3
    tpv = tp * dataframe['volume']
    return tpv.cumsum() / dataframe['volume'].cumsum()

def get_ret_col(dataframe):
    return dataframe['close'].pct_change()

dataframe = get_stock_history_info('600999')
dataframe.set_index(pd.DatetimeIndex(dataframe['datetime']), inplace=True)
dataframe['vwap'] = get_vwap_col(dataframe)
dataframe['ret'] = get_ret_col(dataframe)

df = dataframe.dropna()

In [176]:
class AlphaBaseOperations:
    @staticmethod
    def rank(series):
        return series.rank(pct=True)
    
    @staticmethod
    def delta(series, period):
        return series.diff(period)
    
    @staticmethod
    def log(series):
        return np.log(series)
    
    @staticmethod
    def sum(series, window):
        return series.rolling(window=window).sum()
    
    @staticmethod
    def min(series1, series2):
        return np.minimum(series1, series2)

    @staticmethod
    def max(series1, series2):
        return np.maximum(series1, series2)
    
    @staticmethod
    def corr(series1, series2, window):
        return series1.rolling(window=window).corr(series2)

    @staticmethod
    def delay(series, period):
        return series.shift(period)

    @staticmethod
    def sign(series):
        return np.sign(series)

    @staticmethod
    def abs(series):
        return np.abs(series)
    
    @staticmethod
    def std(series, window):
        return series.rolling(window=window).std()

    @staticmethod
    def mean(series, window):
        return series.rolling(window=window).mean()
    
    @staticmethod
    def tsmax(series, window):
        return series.rolling(window=window).max()
    
    @staticmethod
    def tsrank(series, window):
        # 计算滚动排名
        return series.rolling(window=window).apply(lambda x: x.rank().iloc[-1], raw=False)
    
    @staticmethod
    def where(condition, true_series, false_series):
        return pd.Series(np.where(condition, true_series, false_series), index=condition.index)

    @staticmethod
    def sma(series, window, weight):
        weights = np.arange(1, weight + 1)[::-1]  # 生成权重数组，例如当weight=2时，weights为[2, 1]
        return series.rolling(window=window).apply(lambda x: np.dot(weights[-len(x):], x[-len(weights):]) / weights[-len(x):].sum(), raw=False)
    
def calculate_alpha_expression(df, expression):
    # 创建 Alpha101Ops 类的实例
    ops = AlphaBaseOperations()
    # 创建包含列名和方法的本地字典
    local_dict = {col_name.lower(): df[col_name] for col_name in df.columns} # 通过字典的方式映射dataframe中的column
    local_dict.update({func.lower(): getattr(ops, func) for func in dir(ops) if callable(getattr(ops, func)) and not func.startswith("__")}) # 添加算子方法

    # 表达式修正
    expression = re.sub(r'\^', '**', expression)

    # 评估表达式
    expression = re.sub(r'\b[A-Za-z_][A-Za-z0-9_]*\b', lambda match: match.group().lower(), expression)
    print(f'Updated Expression: {expression}')
    return eval(expression, {'np': np}, local_dict)


In [177]:
alpha_dict = {
    "alpha_1": "(-1 * CORR(RANK(DELTA(LOG(VOLUME), 1)), RANK(((CLOSE - OPEN) / OPEN)), 6))",
    "alpha_2": "(-1 * DELTA((((CLOSE - LOW) - (HIGH - CLOSE)) / (HIGH - LOW)), 1))",
    # "alpha_3": "SUM((CLOSE=DELAY(CLOSE,1)?0:CLOSE-(CLOSE>DELAY(CLOSE,1)?MIN(LOW,DELAY(CLOSE,1)):MAX(HIGH,D ELAY(CLOSE,1)))),6)", 需要解析？：的条件表达式
    # "alpha_4": "((((SUM(CLOSE, 8) / 8) + STD(CLOSE, 8)) < (SUM(CLOSE, 2) / 2)) ? (-1 * 1) : (((SUM(CLOSE, 2) / 2) < ((SUM(CLOSE, 8) / 8) - STD(CLOSE, 8))) ? 1 : (((1 < (VOLUME / MEAN(VOLUME,20))) || ((VOLUME / MEAN(VOLUME,20)) == 1)) ? 1 : (-1 * 1))))",
    "alpha_5": "(-1 * TSMAX(CORR(TSRANK(VOLUME, 5), TSRANK(HIGH, 5), 5), 3))",
    "alpha_6": "(RANK(SIGN(DELTA((((OPEN * 0.85) + (HIGH * 0.15))), 4)))* -1)",
    "alpha_7": "((RANK(MAX((VWAP - CLOSE), 3)) + RANK(MIN((VWAP - CLOSE), 3))) * RANK(DELTA(VOLUME, 3)))",
    "alpha_8": "RANK(DELTA(((((HIGH + LOW) / 2) * 0.2) + (VWAP * 0.8)), 4) * -1)",
    "alpha_9": "SMA(((HIGH+LOW)/2-(DELAY(HIGH,1)+DELAY(LOW,1))/2)*(HIGH-LOW)/VOLUME,7,2)",
    "alpha_10": "(RANK(MAX((WHERE((RET < 0),STD(RET, 20),CLOSE) ^ 2),5)))", # (RANK(MAX(((RET < 0) ? STD(RET, 20) : CLOSE)^2),5))
    "alpha_11": "SUM(((CLOSE-LOW)-(HIGH-CLOSE))/(HIGH-LOW)*VOLUME,6)",
    "alpha_12": "(RANK((OPEN - (SUM(VWAP, 10) / 10)))) * (-1 * (RANK(ABS((CLOSE - VWAP)))))",
    "alpha_12": "(((HIGH * LOW)^0.5) - VWAP)"
}

In [178]:
# 计算 Alpha 因子
alpha_expression = alpha_dict.get('alpha_10')
alpha_result = calculate_alpha_expression(df, alpha_expression)
print(alpha_result.tail())

Updated Expression: (rank(max((where((ret < 0),std(ret, 20),close) ** 2),5)))
datetime
2024-02-08    0.846445
2024-02-19    0.252477
2024-02-20    0.862325
2024-02-21    0.252477
2024-02-22    0.865530
dtype: float64
