In [15]:
import numpy as np

def generate_random_sequence(length, value_range=(0, 10)):
    """隨機生成時間序列"""
    return np.random.randint(value_range[0], value_range[1], length)

def calculate_dtw_path(R, Q):
    """計算 R 和 Q 的 DTW 路徑"""
    m, n = len(R), len(Q)
    dtw = np.full((m+1, n+1), float('inf'))
    dtw[0, 0] = 0
    
    for i in range(1, m+1):
        for j in range(1, n+1):
            cost = abs(R[i-1] - Q[j-1])
            dtw[i, j] = cost + min(dtw[i-1, j],    # 垂直方向（↑）
                                   dtw[i, j-1],    # 水平方向（→）
                                   dtw[i-1, j-1])  # 對角方向（↗）
    
    # 回溯得到最佳匹配路徑
    path = []
    i, j = m, n
    while i > 0 or j > 0:
        path.append((i-1, j-1))
        if i == 0:
            j -= 1
        elif j == 0:
            i -= 1
        else:
            if dtw[i-1, j] == min(dtw[i-1, j], dtw[i, j-1], dtw[i-1, j-1]):
                i -= 1
            elif dtw[i, j-1] == min(dtw[i-1, j], dtw[i, j-1], dtw[i-1, j-1]):
                j -= 1
            else:
                i -= 1
                j -= 1
    path.reverse()
    return path

def encode_direction(path):
    """對路徑進行方向編碼"""
    encodings = []
    for (i1, j1), (i2, j2) in zip(path[:-1], path[1:]):
        if i2 == i1 and j2 == j1 + 1:  # 水平移動（→）
            encodings.append((1, 0, 0))
        elif i2 == i1 + 1 and j2 == j1:  # 垂直移動（↑）
            encodings.append((0, 0, 1))
        elif i2 == i1 + 1 and j2 == j1 + 1:  # 對角移動（↗）
            encodings.append((0, 1, 0))
        else:
            encodings.append((0, 0, 0))  # 其他情況
    return encodings

def build_M(R, training_sequences):
    """構建矩陣 M"""
    M = np.zeros((len(R), len(R), 3))  # 每個位置是 (水平, 對角, 垂直) 三維向量
    for Q in training_sequences:
        path = calculate_dtw_path(R, Q)
        encodings = encode_direction(path)
        for (i, j), encoding in zip(path, encodings):
            M[i, j] += np.array(encoding)
    return M

def calculate_support(M, path, l=3):
    """計算支持度"""
    support = []
    for idx, (i, j) in enumerate(path):
        window = path[max(0, idx-l):idx+1]  # 窗口內的步驟
        max_support = 0
        for wi, wj in window:
            encoding = encode_direction([(wi, wj), (i, j)])[0]
            max_support = max(max_support, np.dot(encoding, M[wi, wj]))
        support.append(max_support)
    return support

def anomaly_detection(R, Q, M, threshold=0.5, l=3):
    """針對 Q 進行異常檢測"""
    path = calculate_dtw_path(R, Q)
    support = calculate_support(M, path, l)
    anomaly_indices = [idx for idx, s in enumerate(support) if s < threshold]
    return anomaly_indices, support

# ---- Demo ---- #
# 隨機生成代表性模式 R 和訓練數據
R = generate_random_sequence(10)
training_sequences = [generate_random_sequence(10) for _ in range(5)]

# 構建 M
M = build_M(R, training_sequences)

# 測試序列 Q
# Q = np.sin(np.linspace(0, np.pi*2, 10))
Q = generate_random_sequence(10)

# 異常檢測
anomaly_indices, support = anomaly_detection(R, Q, M)
print("R:", R)
print("Q:", Q)
print("Anomaly Indices:", anomaly_indices)
print("Support Values:", support)


R: [6 0 0 6 9 4 9 0 3 8]
Q: [0 2 7 1 3 6 2 4 1 6]
Anomaly Indices: [0, 3, 5, 6, 7, 8]
Support Values: [0, 1.0, 1.0, 0, 1.0, 0, 0, 0, 0, 1.0, 1.0, 2.0, 2.0]


array([0.        , 0.6981317 , 1.3962634 , 2.0943951 , 2.7925268 ,
       3.4906585 , 4.1887902 , 4.88692191, 5.58505361, 6.28318531])