# Numenta Anomaly Benchmark

In [61]:
import pandas as pd
import json
import os
from typing import Final
from collections.abc import Callable
from datetime import datetime
from config import data_raw_folder, data_processed_folder

In [45]:
dataset_collection_name = "NAB"
source_folder = os.path.join(data_raw_folder, "Community-NAB")
target_folder = data_processed_folder

Metadata handling

In [103]:
# type
DatasetMetadataRecord = {
    "dataset_name": str,
    "collection_name": str,
    "train_path": str,
    "test_path": str,
    "dataset_type": str,
    "datetime_index": bool,
    "split_at": int,
    "train_type": str,
    "train_is_normal": bool,
    "input_type": str,
    "length": int
}

class DatasetMetadata:
    """
    ATTENTION: Not thread-safe! There is no check for changes to the underlying `dataset.csv` file while this class is loaded.
    """
    
    FILENAME: Final[str] = "datasets.csv"
    
    _filepath: str
    _df: pd.DataFrame
    _dirty: bool

    def __init__(self, target_folder: str):
        self._filepath = os.path.join(target_folder, self.FILENAME)
        self._dirty = False
        if not os.path.isfile(self._filepath):
            self._df = self._create_metadata_file()
        else:
            self.refresh(force = True)
    
    def __enter__(self) -> 'DatasetMetadata':
        return self
    
    def __exit__(self, exception_type, exception_value, exception_traceback) -> 'DatasetMetadata':
        self.save()
        return self
    
    def __repr__(self) -> str:
        return repr(self._df)
    
    def __str__(self) -> str:
        return str(self._df)
        
    def _create_metadata_file(self) -> pd.DataFrame:
        df_temp = pd.DataFrame(columns=["dataset_name", "collection_name", "train_path", "test_path", "type", "datetime_index", "split_at", "train_type", "train_is_normal", "input_type", "length"])
        df_temp.set_index(["dataset_name", "collection_name"], inplace=True)
        df_temp.to_csv(self._filepath)
        return df_temp
    
    def add_dataset(self,
        dataset_name: str,
        collection_name: str,
        train_path: str,
        test_path: str,
        dataset_type: str,
        datetime_index: bool,
        split_at: int,
        train_type: str,
        train_is_normal: bool,
        input_type: str,
        dataset_length: int
    ) -> 'DatasetMetadata':
        df_new = pd.DataFrame({
            "train_path": train_path,
            "test_path": test_path,
            "type": dataset_type,
            "datetime_index": datetime_index,
            "split_at": split_at,
            "train_type": train_type,
            "train_is_normal": train_is_normal,
            "input_type": input_type,
            "length": dataset_length
        }, index=[(dataset_name, dataset_collection_name)])
        df = pd.concat([self._df, df_new], axis=0)
        df = df[~df.index.duplicated(keep = "last")]
        self._df = df
        self._dirty = True
        return self
    
    def add_datasets(self, datasets: list[DatasetMetadataRecord]) -> 'DatasetMetadata':
        df_new = pd.DataFrame(datasets)
        df_new.set_index(["dataset_name", "collection_name"], inplace = True)
        df = pd.concat([self._df, df_new], axis=0)
        df = df[~df.index.duplicated(keep = "last")]
        self._df = df
        self._dirty = True
        return self
    
    def refresh(self, force: bool = False) -> None:
        if not force and self._dirty:
            raise Exception("There are unsaved changes in memory that would get lost by reading from disk again!")
        else:
            self._df = pd.read_csv(self._filepath, index_col=["dataset_name", "collection_name"])
    
    def save(self) -> None:
        self._df.to_csv(self._filepath)
        self._dirty = False

In [104]:
def calc_size(filename: str) -> int:
    with open(filename, 'r') as f:
        next(f) # skips header
        c = 0
        for line in f:
            c += 1
    return c

def transform_and_label(source: str, target: str, anomaly_windows: list[str]) -> None:
    df = pd.read_csv(source)
    df["timestamp"] = pd.to_datetime(df['timestamp'], infer_datetime_format=True)
    df["is_anomaly"] = 0

    for t1, t2 in anomaly_windows:
        t1 = datetime.strptime(t1, "%Y-%m-%d %H:%M:%S.%f")
        t2 = datetime.strptime(t2, "%Y-%m-%d %H:%M:%S.%f")
        moreThanT1 = df[df["timestamp"] >= t1]
        betweenT1AndT2 = moreThanT1[moreThanT1["timestamp"] <= t2]
        indices = betweenT1AndT2.index
        df["is_anomaly"].values[indices.values] = 1

    df.to_csv(target, index=False)

In [105]:
# shared by all datasets
input_type = "univariate"
datetime_index = True
train_type = "unsupervised"
train_is_normal = False

dm = DatasetMetadata(target_folder)

# create target directory
dataset_subfolder = os.path.join(target_folder, input_type, dataset_collection_name)
try:
    os.makedirs(dataset_subfolder)
    print(f"Created directories {dataset_subfolder}")
except FileExistsError:
    print(f"Directories {dataset_subfolder} already exist")
    pass

with open(os.path.join(source_folder, "labels", "combined_windows.json"), 'r') as f:
    windows = json.load(f)

#windows

Directories data-processed/univariate/NAB already exist


In [136]:
# dataset transformation
transform_file: Callable[[str, str, list[str]], None] = transform_and_label

for dataset in windows:
    source_file = os.path.join(source_folder, "data", dataset)
    dataset_type = "real" if dataset.startswith("real") else "synthetic"
    
    # get basename for target filename
    basename = os.path.splitext(os.path.basename(source_file))[0]
    filename = f"{basename}.test.csv"

    # save metadata
    dataset_name = filename.split(".")[0]
    path = os.path.join(dataset_subfolder, filename)
    dataset_length = calc_size(source_file)
    dm.add_dataset(
        dataset_name = dataset_name,
        collection_name = dataset_collection_name,
        train_path = None,
        test_path = path,
        dataset_type = dataset_type,
        datetime_index = datetime_index,
        split_at = None,
        train_type = train_type,
        train_is_normal = train_is_normal,
        input_type = input_type,
        dataset_length = dataset_length
    )
    # transform file
    transform_file(source_file, path, windows[dataset])
    print(f"Processed source dataset {source_file} -> {path}")

# save metadata of benchmark
dm.save()

FileNotFoundError: [Errno 2] No such file or directory: 'data-raw/Community-NAB/data/artificialNoAnomaly/art_daily_no_noise.csv'

In [132]:
dm.refresh()
dm._df

Unnamed: 0_level_0,Unnamed: 1_level_0,train_path,test_path,type,datetime_index,split_at,train_type,train_is_normal,input_type,length
dataset_name,collection_name,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
A1Benchmark-6,WebscopeS5,,data-processed/univariate/WebscopeS5/A1Benchma...,real,True,,unsupervised,False,univariate,1439
A1Benchmark-3,WebscopeS5,,data-processed/univariate/WebscopeS5/A1Benchma...,real,True,,unsupervised,False,univariate,1461
A1Benchmark-40,WebscopeS5,,data-processed/univariate/WebscopeS5/A1Benchma...,real,True,,unsupervised,False,univariate,1427
A1Benchmark-20,WebscopeS5,,data-processed/univariate/WebscopeS5/A1Benchma...,real,True,,unsupervised,False,univariate,1422
A1Benchmark-4,WebscopeS5,,data-processed/univariate/WebscopeS5/A1Benchma...,real,True,,unsupervised,False,univariate,1423
...,...,...,...,...,...,...,...,...,...,...
Twitter_volume_GOOG,NAB,,data-processed/univariate/NAB/Twitter_volume_G...,real,True,,unsupervised,False,univariate,15842
Twitter_volume_IBM,NAB,,data-processed/univariate/NAB/Twitter_volume_I...,real,True,,unsupervised,False,univariate,15893
Twitter_volume_KO,NAB,,data-processed/univariate/NAB/Twitter_volume_K...,real,True,,unsupervised,False,univariate,15851
Twitter_volume_PFE,NAB,,data-processed/univariate/NAB/Twitter_volume_P...,real,True,,unsupervised,False,univariate,15858


## Experimentation

In [98]:
dataset = "realAdExchange/exchange-4_cpc_results.csv"
source_file = os.path.join(source_folder, "data", dataset)
df = pd.read_csv(source_file)
df["timestamp"] = pd.to_datetime(df['timestamp'], infer_datetime_format=True)
df["is_anomaly"] = 0

for t1, t2 in windows[dataset]:
    t1 = datetime.strptime(t1, "%Y-%m-%d %H:%M:%S.%f")
    t2 = datetime.strptime(t2, "%Y-%m-%d %H:%M:%S.%f")
    moreThanT1 = df[df["timestamp"] >= t1]
    betweenT1AndT2 = moreThanT1[moreThanT1["timestamp"] <= t2]
    indices = betweenT1AndT2.index
    df["is_anomaly"].values[indices.values] = 1

df[df["is_anomaly"] == 1]

Unnamed: 0,timestamp,value,is_anomaly
0,2011-07-01 00:15:01,0.091795,0
1,2011-07-01 01:15:01,0.074414,0
2,2011-07-01 02:15:01,0.056984,0
3,2011-07-01 03:15:01,0.071225,0
4,2011-07-01 04:15:01,0.045466,0
...,...,...,...
1638,2011-09-07 10:15:01,0.054275,0
1639,2011-09-07 11:15:01,0.070650,0
1640,2011-09-07 12:15:01,0.056339,0
1641,2011-09-07 13:15:01,0.050782,0


In [43]:
with open("data-raw/Community-NAB/labels/combined_labels.json", 'r') as f:
    labels = json.load(f)
labels

{'artificialNoAnomaly/art_daily_no_noise.csv': [],
 'artificialNoAnomaly/art_daily_perfect_square_wave.csv': [],
 'artificialNoAnomaly/art_daily_small_noise.csv': [],
 'artificialNoAnomaly/art_flatline.csv': [],
 'artificialNoAnomaly/art_noisy.csv': [],
 'artificialWithAnomaly/art_daily_flatmiddle.csv': ['2014-04-11 00:00:00'],
 'artificialWithAnomaly/art_daily_jumpsdown.csv': ['2014-04-11 09:00:00'],
 'artificialWithAnomaly/art_daily_jumpsup.csv': ['2014-04-11 09:00:00'],
 'artificialWithAnomaly/art_daily_nojump.csv': ['2014-04-11 09:00:00'],
 'artificialWithAnomaly/art_increase_spike_density.csv': ['2014-04-07 23:10:00'],
 'artificialWithAnomaly/art_load_balancer_spikes.csv': ['2014-04-11 04:35:00'],
 'realAWSCloudwatch/ec2_cpu_utilization_24ae8d.csv': ['2014-02-26 22:05:00',
  '2014-02-27 17:15:00'],
 'realAWSCloudwatch/ec2_cpu_utilization_53ea38.csv': ['2014-02-19 19:10:00',
  '2014-02-23 20:05:00'],
 'realAWSCloudwatch/ec2_cpu_utilization_5f5533.csv': ['2014-02-19 00:22:00',
  '20

In [6]:
with open("data-raw/Community-NAB/labels/combined_windows.json", 'r') as f:
    windows = json.load(f)
windows

{'artificialNoAnomaly/art_daily_no_noise.csv': [],
 'artificialNoAnomaly/art_daily_perfect_square_wave.csv': [],
 'artificialNoAnomaly/art_daily_small_noise.csv': [],
 'artificialNoAnomaly/art_flatline.csv': [],
 'artificialNoAnomaly/art_noisy.csv': [],
 'artificialWithAnomaly/art_daily_flatmiddle.csv': [['2014-04-10 07:15:00.000000',
   '2014-04-11 16:45:00.000000']],
 'artificialWithAnomaly/art_daily_jumpsdown.csv': [['2014-04-10 16:15:00.000000',
   '2014-04-12 01:45:00.000000']],
 'artificialWithAnomaly/art_daily_jumpsup.csv': [['2014-04-10 16:15:00.000000',
   '2014-04-12 01:45:00.000000']],
 'artificialWithAnomaly/art_daily_nojump.csv': [['2014-04-10 16:15:00.000000',
   '2014-04-12 01:45:00.000000']],
 'artificialWithAnomaly/art_increase_spike_density.csv': [['2014-04-07 06:25:00.000000',
   '2014-04-08 15:55:00.000000']],
 'artificialWithAnomaly/art_load_balancer_spikes.csv': [['2014-04-10 11:50:00.000000',
   '2014-04-11 21:20:00.000000']],
 'realAWSCloudwatch/ec2_cpu_utilizat

In [133]:
def to_datetime(str):
    try:
        return datetime.strptime(str, "%Y-%m-%d %H:%M:%S")
    except ValueError:
        return datetime.strptime(str, "%Y-%m-%d %H:%M:%S.%f")

In [135]:
matches = 0
for dataset in windows:
    for (anomaly, anomaly_window) in zip(labels[dataset], windows[dataset]):
        maybe_middle = to_datetime(anomaly)
        (start, end) = [to_datetime(d) for d in anomaly_window]
        diff1 = maybe_middle - start
        diff2 = end - maybe_middle
        if diff1 == diff2:
            # print(f"{dataset}-{anomaly} is in the middle of anomaly window!")
            matches += 1
        else:
            print(dataset)
            print(f"{start} - ({diff1})- {anomaly} -({diff2})- {end}")
print(f"matches: {matches}/{sum(list(map(lambda x: len(labels[x]), labels)))}")

KeyError: 'artificialNoAnomaly/art_daily_no_noise.csv'