In [1]:
import math
import numpy as np
from pyprojroot import here
from sklearn.metrics import roc_auc_score
from tqdm import tqdm, trange

In [2]:
class Midas:
    def __init__(self, row: int, col: int):
        self.nameAlg = 'MIDAS'
        self.ts: int = 1
        self.row = row
        self.col = col
        self.param = np.random.randint(1, 1 << 16, 2 * row).astype(int) # 2^16
        self.current = np.zeros((row,col),int) # store counts
        self.total = np.zeros((row,col),int)

    @staticmethod # 静态方法无需实例化
    def ChiSquaredTest(a: float, s: float, t: float) -> float:
        return 0 if s == 0 or t - 1 == 0 else pow((a - s / t) * t, 2) / (s * (t - 1))

    def Call(self, src: int, dst: int, ts: int) -> float:
        if self.ts < ts:
            self.current *= 0
            self.ts = ts
#         self.current = np.zeros((self.row,self.col),int)
        minCurrent = minTotal = np.inf
        for i in range(self.row):
            n = ((src + 347 * dst) * self.param[i] + self.param[i + self.row]) % self.col
            self.current[i][n] += 1
            self.total[i][n] += 1
            minCurrent = min(minCurrent, self.current[i][n])
            minTotal = min(minTotal, self.total[i][n])
        return self.ChiSquaredTest(minCurrent, minTotal, ts)

In [4]:
if __name__ == '__main__':
    prefix = here()  # Detect your project root
    pathData = prefix / 'desktop/AnomalyDetection/Code/DARPA/data.csv'
    pathLabel = prefix / 'desktop/AnomalyDetection/Code/DARPA/label.csv'
    data = [[int(item) for item in line.split(b',')] for line in tqdm(pathData.read_bytes().splitlines(), 'Load Dataset', unit_scale=True)]
    label = list(map(int, pathLabel.read_bytes().splitlines()))

Load Dataset: 100%|██████████| 4.55M/4.55M [00:09<00:00, 484kit/s]


In [5]:
    midas = Midas(2, 1024)
#     midas = MidasR(2, 1024)
#     midas = MidasF(2, 1024, 1e3)
    score = [0.0] * len(label)
    for i in trange(len(label), desc=midas.nameAlg, unit_scale=True):
        score[i] = midas.Call(*data[i])
    print(f"ROC-AUC = {roc_auc_score(label, score):.4f}")

MIDAS: 100%|██████████| 4.55M/4.55M [01:35<00:00, 47.9kit/s]


ROC-AUC = 0.8945


In [None]:
    midas = Midas(2, 1024)
#     midas = MidasR(2, 1024)
#     midas = MidasF(2, 1024, 1e3)
    score = [0.0] * len(label)
    for i in trange(len(label), desc=midas.nameAlg, unit_scale=True):
        score[i] = midas.Call(*data[i])
    print(f"ROC-AUC = {roc_auc_score(label, score):.4f}")

MIDAS:  52%|█████▏    | 2.38M/4.55M [00:47<00:50, 43.0kit/s]