# Testing the Offline detection step by step

In [25]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import matplotlib.colors as mcolors
from matplotlib.backends.backend_pdf import PdfPages
from datetime import datetime, timedelta

import sys
import os
import re
import random
import importlib
import datetime as dt
import json
import logging

from tqdm import trange # For progress bars
from tqdm import tqdm
from tqdm.contrib import tzip
from tqdm.contrib.logging import logging_redirect_tqdm

sys.path.append('/eos/user/j/jhoya/DAQ/AnomalyDetection/strada/detection_combined/')

from reduction.medianstdreducer import MedianStdReducer
from transformer_based_detection.informers.informerrunner import InformerRunner
from utils.anomalyregistry import JSONAnomalyRegistry
from utils.reduceddatabuffer import ReducedDataBuffer
from utils.exceptions import NonCriticalPredictionException
from utils.consolesingleton import ConsoleSingleton

# Importing in this way in case we want to test something on this file and we don't want to restart this kernel
import clustering.dbscananomalydetector
importlib.reload(clustering.dbscananomalydetector)
from clustering.dbscananomalydetector import HLTDBSCANAnomalyDetector

In [2]:
# Rack color mapping
rack_colors = {  0: '#D81B60',
                 1: '#1E88E5',
                 2: '#FFC107',
                 3: '#004D40',
                 4: '#C43F42',
                 5: '#6F8098',
                 6: '#D4FC14',
                 7: '#1CB2C5',
                 8: '#18F964',
                 9: '#1164B3'}

# Set font sizes for plots
SMALL_SIZE = 13
MEDIUM_SIZE = 13
BIGGER_SIZE = 13

plt.rc('font', size=SMALL_SIZE)
plt.rc('axes', titlesize=BIGGER_SIZE)
plt.rc('axes', labelsize=MEDIUM_SIZE)
plt.rc('xtick', labelsize=SMALL_SIZE)
plt.rc('ytick', labelsize=SMALL_SIZE)
plt.rc('legend', fontsize=SMALL_SIZE)
plt.rc('figure', titlesize=BIGGER_SIZE)

In [3]:
def get_tpu_number(channel_name):
    parameters = [int(substring) for substring in re.findall(r'\d+', channel_name)]
    return parameters[4]

def extract_label(column_name):
    return column_name.split(':')[-1].split('.')[0].split('-')[-1]

def get_rack_hardware_configuration(rack_number: int, variant: str = '2023'):
    
    if variant == '2018':
        if 44 <= rack_number <= 54:
            return 0
        elif 55 <= rack_number <= 63:
            return 1
        elif (70 <= rack_number <= 77) or\
                    (79 <= rack_number <= 90):
            return 2
        elif 16 <= rack_number <= 26:
            return 3
        else:
            raise ValueError(f'Rack number {rack_number} not '
                                f'in known nodes for variant {variant}')

    # need to check this and include the rest of the years
    if variant == '2023':
        if 44 <= rack_number <= 54:
            return 0
        elif 55 <= rack_number <= 63:
            return 1
        elif (64 <= rack_number <= 77) or\
                    (79 <= rack_number <= 95):
            return 2
        elif rack_number <= 26:
            return 3
        else:
            raise ValueError(f'Rack number {rack_number} not in known nodes for variant {variant}')
    else:
        raise NotImplementedError(f'Rack hardware configuration identification not implemented for variant {variant}')

# Looking at the Transformers

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import importlib
import argparse
import sys
import os
import datetime as dt
import json
import logging

from tqdm import tqdm
from tqdm.contrib import tzip
from tqdm.contrib.logging import logging_redirect_tqdm

sys.path.append('/eos/user/j/jhoya/DAQ/AnomalyDetection/strada/detection_combined/')

from utils.anomalyregistry import JSONAnomalyRegistry
from utils.reduceddatabuffer import ReducedDataBuffer
from utils.exceptions import NonCriticalPredictionException
from utils.consolesingleton import ConsoleSingleton

# Importing like this in case we want to change something in the files and don't want to restart the kernel
import clustering.dbscananomalydetector
importlib.reload(clustering.dbscananomalydetector)
from clustering.dbscananomalydetector import HLTDBSCANAnomalyDetector

import reduction.basereducer
importlib.reload(reduction.basereducer)
from reduction.basereducer import BaseReducer

import reduction.medianstdreducer
importlib.reload(reduction.basereducer)
from reduction.medianstdreducer import MedianStdReducer

import transformer_based_detection.informers.informerrunner
importlib.reload(transformer_based_detection.informers.informerrunner)
from transformer_based_detection.informers.informerrunner import InformerRunner

In [2]:
## A hack to avoid scikit-learn issues with different versions. No need to re run this.

# import json
# import numpy as np
# import pickle as pkl

# scaler_dir_and_filename = '/eos/user/k/kstehle/Documents/phd/deephydra_models/hlt_2023_mse_Scale_0.8_1.0_Scale_APP_0.8_1.0_0.01_0.05_0.05_rel_size_1.0_ratio_0.25_seed_192/scaler.pkl'

# # Load the old scaler despite the warning
# with open(scaler_dir_and_filename, 'rb') as file:
#     old_scaler = pkl.load(file)

# # Save the scaler parameters
# scaler_params = {
#     'min_': old_scaler.min_.tolist(),
#     'scale_': old_scaler.scale_.tolist(),
#     'data_min_': old_scaler.data_min_.tolist(),
#     'data_max_': old_scaler.data_max_.tolist(),
#     'feature_range': old_scaler.feature_range
# }

# with open('/eos/user/k/kstehle/Documents/phd/deephydra_models/hlt_2023_mse_Scale_0.8_1.0_Scale_APP_0.8_1.0_0.01_0.05_0.05_rel_size_1.0_ratio_0.25_seed_192/scaler_params.json', 'w') as f:
#     json.dump(scaler_params, f)
    
# # In what follows we'll use this json file to load a new MinMaxScaler with the trained parameters and avoid the issue with using different versions of scikit-learn for training and testing

In [3]:
#dataset_path = '/eos/user/k/kstehle/atlas-hlt-datasets/test_set_dcm_rates_2023.csv'
dataset_path = 'data/hlt_data_pd_448519.csv'

hlt_data_pd = pd.read_csv(dataset_path, index_col=0, parse_dates=True)

# Ensure the DataFrame index is timezone-aware
hlt_data_pd.index = hlt_data_pd.index.tz_convert('Europe/Berlin') 

variant = '2023'

median_std_reducer = MedianStdReducer(variant)

# Using the pretrained models by Kevin, including the MinMaxScaler
informer_runner = InformerRunner('/eos/user/k/kstehle/Documents/phd/deephydra_models/hlt_2023_mse_Scale_0.8_1.0_Scale_APP_0.8_1.0_0.01_0.05_0.05_rel_size_1.0_ratio_0.25_seed_192/', device='cpu')

tpu_labels = list(hlt_data_pd.columns.values)

In [4]:
#hlt_data_pd.tail()

In [5]:
n_cores = os.cpu_count()
print(f"Number of CPU cores: {n_cores}")

Number of CPU cores: 28


In [6]:
# Set the number of threads for PyTorch
import torch
#n_threads = os.cpu_count()  # Use all available CPU cores
#n_threads = max(1, os.cpu_count() // 2)
n_threads = 16
print(n_threads)
torch.set_num_threads(n_threads) 

output_dir = "output_notebook"
model = "Informer-MSE"

# Still need to understand the buffer size difference
if model == 'Informer-SMSE':
    reduced_data_buffer = ReducedDataBuffer(size=65)
else:
    reduced_data_buffer = ReducedDataBuffer(size=17)
    #reduced_data_buffer = ReducedDataBuffer(size=50) # Testing for speeding up


# In the next loop, we push slices of data that when the buffer is filled, will call the detect function of the informer_runner
reduced_data_buffer.set_buffer_filled_callback(informer_runner.detect)
        
json_anomaly_registry = JSONAnomalyRegistry(output_dir)

informer_runner.register_detection_callback(
                        json_anomaly_registry.transformer_detection)

timestamps = list(hlt_data_pd.index)
hlt_data_np = hlt_data_pd.to_numpy()

16


## Don't use the following cells until the next cell like this.
The following cells were the steps I took for creating a function to replace the per timestamp computation of the meadian and stdev, with a bulk computation.

In [53]:
import re
import numpy as np
import pandas as pd

# Function to extract rack number from a column name
def extract_rack_number(column_name):
    match = re.search(r'tpu-rack-(\d+)', column_name)
    if match:
        return int(match.group(1))
    else:
        return np.nan  # Use NaN for columns without a rack number

# Apply the function to all column names
rack_numbers = [extract_rack_number(col) for col in hlt_data_pd.columns]

# Create a DataFrame mapping columns to rack numbers
columns_df = pd.DataFrame({
    'column_name': hlt_data_pd.columns,
    'rack_number': rack_numbers
})


In [54]:
columns_df

Unnamed: 0,column_name,rack_number
0,DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-0...,1
1,DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-0...,1
2,DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-0...,1
3,DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-0...,1
4,DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-0...,1
...,...,...
147,DF_IS:HLT-32:tpu-rack-05.DCM:HLT-32:tpu-rack-0...,5
148,DF_IS:HLT-32:tpu-rack-05.DCM:HLT-32:tpu-rack-0...,5
149,DF_IS:HLT-32:tpu-rack-05.DCM:HLT-32:tpu-rack-0...,5
150,DF_IS:HLT-32:tpu-rack-05.DCM:HLT-32:tpu-rack-0...,5


In [55]:
subgroup_labels_expected_hlt_dcm_2023 =   [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95]

In [56]:
# Convert rack numbers to integers and drop NaNs
columns_df = columns_df.dropna(subset=['rack_number'])
columns_df['rack_number'] = columns_df['rack_number'].astype(int)

# Filter columns for expected racks
expected_racks = subgroup_labels_expected_hlt_dcm_2023
columns_df = columns_df[columns_df['rack_number'].isin(expected_racks)]


In [57]:
columns_df

Unnamed: 0,column_name,rack_number
0,DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-0...,1
1,DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-0...,1
2,DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-0...,1
3,DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-0...,1
4,DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-0...,1
...,...,...
147,DF_IS:HLT-32:tpu-rack-05.DCM:HLT-32:tpu-rack-0...,5
148,DF_IS:HLT-32:tpu-rack-05.DCM:HLT-32:tpu-rack-0...,5
149,DF_IS:HLT-32:tpu-rack-05.DCM:HLT-32:tpu-rack-0...,5
150,DF_IS:HLT-32:tpu-rack-05.DCM:HLT-32:tpu-rack-0...,5


In [58]:
from collections import defaultdict

rack_to_columns = defaultdict(list)
for _, row in columns_df.iterrows():
    rack_to_columns[row['rack_number']].append(row['column_name'])


In [59]:
rack_to_columns

defaultdict(list,
            {1: ['DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-01:pc-tdq-tpu-01002.info',
              'DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-01:pc-tdq-tpu-01003.info',
              'DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-01:pc-tdq-tpu-01004.info',
              'DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-01:pc-tdq-tpu-01005.info',
              'DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-01:pc-tdq-tpu-01006.info',
              'DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-01:pc-tdq-tpu-01007.info',
              'DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-01:pc-tdq-tpu-01008.info',
              'DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-01:pc-tdq-tpu-01009.info',
              'DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-01:pc-tdq-tpu-01010.info',
              'DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-01:pc-tdq-tpu-01011.info',
              'DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-01:pc-tdq-tpu-01012.info',
              'DF_IS:H

In [60]:
timestamps = hlt_data_pd.index

# Initialize DataFrames with NaNs
medians_df = pd.DataFrame(index=timestamps, columns=expected_racks)
stds_df = pd.DataFrame(index=timestamps, columns=expected_racks)


In [61]:
for rack in expected_racks:
    columns = rack_to_columns.get(rack, [])
    if columns:
        data = hlt_data_pd[columns]
        # Compute median and std across the columns for each timestamp
        medians_df[rack] = data.median(axis=1)
        stds_df[rack] = data.std(axis=1)
    else:
        # If no data for the rack, fill with dummy values or leave as NaN
        medians_df[rack] = 0  
        stds_df[rack] = 0     


In [62]:
medians_df.iloc[500:501]

Unnamed: 0,1,2,3,4,5,6,7,8,9,10,...,86,87,88,89,90,91,92,93,94,95
2023-04-06 09:55:35.001262+02:00,1.800275,2.199917,2.100306,1.800144,2.000002,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [63]:
stds_df.iloc[500:501]

Unnamed: 0,1,2,3,4,5,6,7,8,9,10,...,86,87,88,89,90,91,92,93,94,95
2023-04-06 09:55:35.001262+02:00,0.697111,0.66126,0.537834,0.584426,0.601056,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [64]:
# Rename columns to indicate median and std
medians_df = medians_df.add_prefix('m_')
stds_df = stds_df.add_prefix('std_')

# Concatenate along the columns
reduced_data_df = pd.concat([medians_df, stds_df], axis=1)


In [65]:
reduced_data_df.iloc[500:501]

Unnamed: 0,m_1,m_2,m_3,m_4,m_5,m_6,m_7,m_8,m_9,m_10,...,std_86,std_87,std_88,std_89,std_90,std_91,std_92,std_93,std_94,std_95
2023-04-06 09:55:35.001262+02:00,1.800275,2.199917,2.100306,1.800144,2.000002,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


### We can start executing the notebook from here...
The next function is doing all bulk reduction described before.

In [7]:
reduced_data_df = median_std_reducer.reduce_bulk_offline(hlt_data_pd)

                                           column_name  rack_number
0    DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-0...            1
1    DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-0...            1
2    DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-0...            1
3    DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-0...            1
4    DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-0...            1
..                                                 ...          ...
147  DF_IS:HLT-32:tpu-rack-05.DCM:HLT-32:tpu-rack-0...            5
148  DF_IS:HLT-32:tpu-rack-05.DCM:HLT-32:tpu-rack-0...            5
149  DF_IS:HLT-32:tpu-rack-05.DCM:HLT-32:tpu-rack-0...            5
150  DF_IS:HLT-32:tpu-rack-05.DCM:HLT-32:tpu-rack-0...            5
151  DF_IS:HLT-32:tpu-rack-05.DCM:HLT-32:tpu-rack-0...            5

[152 rows x 2 columns]
                                           column_name  rack_number
0    DF_IS:HLT-32:tpu-rack-01.DCM:HLT-32:tpu-rack-0...            1
1    DF_IS:HLT-32:tpu-ra

                                       m_1       m_2       m_3       m_4  \
2023-04-06 09:55:35.001262+02:00  1.800275  2.199917  2.100306  1.800144   

                                       m_5  m_6  m_7  m_8  m_9  m_10  ...  \
2023-04-06 09:55:35.001262+02:00  2.000002    0    0    0    0     0  ...   

                                  std_86  std_87  std_88  std_89  std_90  \
2023-04-06 09:55:35.001262+02:00       0       0       0       0       0   

                                  std_91  std_92  std_93  std_94  std_95  
2023-04-06 09:55:35.001262+02:00       0       0       0       0       0  

[1 rows x 146 columns]


In [8]:
with logging_redirect_tqdm():
    for timestamp in tqdm(reduced_data_df.index, desc='Processing Timestamps', unit='timestamp'):
        try:
            # Select the row for the current timestamp as a DataFrame
            output_slice = reduced_data_df.loc[[timestamp]]
        
            # Push to the buffer
            reduced_data_buffer.push(output_slice)
        except NonCriticalPredictionException:
            break
#json_anomaly_registry.write_log_file("test_dataset_informer_mse")
json_anomaly_registry.write_log_file("run_448519_informer_mse")

Transformer-based detection encountered anomaly at timestamp 2023-04-06 09:20:55
Transformer-based detection anomaly ended at 2023-04-06 09:29:25                
Processing Timestamps: 100%|██████████| 4106/4106 [05:58<00:00, 11.45timestamp/s]


In [None]:
######################################################
### Don't execute
######################################################

# It's the same code as before, but with some profiling functions to detect the 10 most expensive functions being executed while doing the Informer inference.

import cProfile
import pstats

profiler = cProfile.Profile()
profiler.enable()

max_iterations = 100

with logging_redirect_tqdm():
    for count, timestamp in enumerate(tqdm(reduced_data_df.index, desc='Processing Timestamps', unit='timestamp')):
        if count >= max_iterations:
            break
        try:
            # Select the row for the current timestamp as a DataFrame
            output_slice = reduced_data_df.loc[[timestamp]]
        
            # Push to the buffer
            reduced_data_buffer.push(output_slice)
        except NonCriticalPredictionException:
            break
profiler.disable()
stats = pstats.Stats(profiler).sort_stats('cumtime')
stats.print_stats(10)  # Display the top 10 most time-consuming operations
json_anomaly_registry.write_log_file("test_dataset_informer_mse")