In [2]:
import numpy as np
import pandas as pd

### 纯手搓计算的样本熵，大家可以看到计算的过程，这个比用TSFresh要快很多


def sample_entropy(x, emb_dim, tolerance):
    def _maxdist(x_i, x_j):
        return max([abs(ua - va) for ua, va in zip(x_i, x_j)])

    def _phi(m):
        x_array = np.array(x)
        n = len(x_array) - m + 1
        if n <= 1:
            return np.nan

        new_x = [x_array[i : i + m] for i in range(n)]
        C = [
            sum(
                [
                    1
                    for j in range(n)
                    if i != j and _maxdist(new_x[i], new_x[j]) <= tolerance
                ]
            )
            for i in range(n)
        ]

        denominator = n * (n - 1)
        if denominator == 0:
            return np.nan

        result = sum(C) / denominator
        if result == 0:
            return np.nan
        return result

    if len(x) < emb_dim + 2:
        return np.nan

    phi_emb_dim = _phi(emb_dim)
    phi_emb_dim_plus_one = _phi(emb_dim + 1)

    if phi_emb_dim == 0 or phi_emb_dim_plus_one == 0:
        return np.nan

    return -np.log(phi_emb_dim_plus_one / phi_emb_dim)


### 动态计算样本熵，使用标准差和极差两种方式来计算容差阈值


def dynamic_sample_entropy(x, m, r_ratio, use_std):
    if use_std:
        std_x = np.std(x)
        if std_x < 1e-6:
            return np.nan  ### 标准差过小，无法使用，结果为NaN
        r = r_ratio * std_x
    else:
        range_x = np.max(x) - np.min(x)
        if range_x < 1e-6:
            return np.nan  ### 极差过小，无法使用，结果为NaN
        r = r_ratio * range_x

    return sample_entropy(x, emb_dim=m, tolerance=r)


### 确保窗口只包含历史数据


def calculate_entropy_on_windows(data, window_size, m, r_ratio, use_std):
    entropy_values = []
    for i in range(window_size - 1, len(data)):
        window = data[max(0, i - window_size + 1) : i + 1]
        entropy = dynamic_sample_entropy(window, m, r_ratio, use_std)
        entropy_values.append(entropy)

    return entropy_values


######################################################################################################


df = pd.read_csv("RB99_1m_Turnover_31000_12120_1213.csv")  ##### 自定轴文件
df = df.sort_values("eob")
df = df.dropna(subset=["close"])  ##### 以close为例，可以换成其它需要计算的列


window_size = 20  ####  窗口期
m = 2  #### 嵌入维度
r_ratio = 0.3  #### 增加容差比例


### 用标准差和极差两种方式，注：通常在一阶差分上使用而不是直接用close，用close只是为了简化

entropy_values_std = calculate_entropy_on_windows(
    df["close"], window_size, m, r_ratio, use_std=True
)  ###### 用标准差
entropy_values_range = calculate_entropy_on_windows(
    df["close"], window_size, m, r_ratio, use_std=False
)  ##### 用极差


### 将结果添加到 DataFrame 中

df_entropy = df.copy()
df_entropy["entropy_std"] = [np.nan] * (
    window_size - 1
) + entropy_values_std  ###### 用标准差的结果
df_entropy["entropy_range"] = [np.nan] * (
    window_size - 1
) + entropy_values_range  ##### 用极差的结果


###保存结果

df_entropy.to_csv("./sample_entropy.csv", index=False)