In [9]:
import random
import pandas as pd
import numpy as np
from pathlib import Path

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler
from sklearn.utils import resample
from sklearn import cluster
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor

import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader, random_split, WeightedRandomSampler
import torch

from wquantiles import quantile_1D

import pdb

import pickle
import os
import csv

class ConfigStruct:
    def __init__(self, **entries):
        self.__dict__.update(entries)

In [2]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device}")

Using cuda


In [3]:
config = dict(
    epochs=100,
    batch_size=256, #2048
    learning_rate=0.008, #0.008
    weight_decay=1e-5,
    dropout=0.05,
    shuffle=True,
    test_size=0.2,
    split_seed=42,
    random_seed=1234,
    top10_apps_filter=False,
    only_duplicates=False,
    meancount75_filter=False,
    isolation_forest_train=False,
    isolation_forest_val=False,
    isolation_forest_test=False,
    feature_agglomeration=True,
    feature_agglomeration_nclusters=64,
    stratified_split=False,
    smooth_l1_loss_beta=1
)

In [4]:
config = ConfigStruct(**config)

In [13]:
MODEL_FILENAME = "Full_Data_Model"
MODEL_DIR = r"../models/"
MODEL_PATH = Path(MODEL_DIR, MODEL_FILENAME).with_suffix(".pth")

DATASET_DIR = r"../data/"
DATASET_NAME = "theta_posix_with_apps_no_negative_outliers_no_time_witherrors"
DATASET_PATH = Path(DATASET_DIR, DATASET_NAME).with_suffix(".csv")

PICKLE_DIR = r"/home/rwth1591/transfer-learning/theta/pickle"
FEATUREAGGLO_NAME = r"Full_Data_Model_featureagglomeration"
FEATUREAGGLO_PATH = Path(PICKLE_DIR, FEATUREAGGLO_NAME).with_suffix(".pkl")
ROBUSTSCALER_NAME = r"Full_Data_Model_theta_robustscaler"
ROBUSTSCALER_PATH = Path(PICKLE_DIR, ROBUSTSCALER_NAME).with_suffix(".pkl")
ISOLATIONFOREST_NAME = r"Full_Data_Model_theta_isolationforest"
ISOLATIONFOREST_PATH = Path(PICKLE_DIR, ISOLATIONFOREST_NAME).with_suffix(".pkl")

INTERPRETABILITY_DIR = r"../interpretability/captum"

CSV_LOG_PATH = "Full_Data_Model_test_loss.csv"

In [14]:
if not os.path.exists(CSV_LOG_PATH):
    with open(CSV_LOG_PATH, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(["epoch", "test_loss"])

In [17]:
# Load the data
df_blue_waters_posix = pd.read_csv(DATASET_PATH)
df_blue_waters_posix.head()

Unnamed: 0,index,POSIX_OPENS,POSIX_FILENOS,POSIX_DUPS,POSIX_READS,POSIX_WRITES,POSIX_SEEKS,POSIX_STATS,POSIX_MMAPS,POSIX_FSYNCS,...,WRITE_10M_100M,WRITE_100M_1G,WRITE_1G_PLUS,rank,POSIX_TOTAL_TIME,nprocs,lustre,exe,mean,error
0,0,7891771,7861736,0,3,424661,60035,90055,0,0,...,0,0,0,0,29.684507,64,1,cp2k.psmp,31.913841,-2.229334
1,1,194,172,0,34,1499,6,54,0,0,...,0,0,0,0,28.155456,16,1,pw.x,11.403251,16.752206
2,2,46037,40869,0,4713059,1719073,1271774,5429,0,0,...,0,0,0,-1,71229.030892,128,1,train.x-2.0.3-ifort_intelmpi,,0.0
3,3,194,172,0,34,1492,6,54,0,0,...,0,0,0,2,1.70764,16,1,pw.x,6.519022,-4.811382
4,4,7891771,7861736,0,3,424661,60035,90055,0,0,...,0,0,0,0,34.010366,64,1,cp2k.psmp,33.63173,0.378636


In [18]:
filter_spec = (df_blue_waters_posix.exe.str.strip().isin(["nwchem", "./nwchem"]))  # | (df_blue_waters_posix.POSIX_TOTAL_TIME >= 1e8)
df_blue_waters_posix_nospec = df_blue_waters_posix[filter_spec == False]
df_blue_waters_posix_nospec.shape

(139902, 95)

In [None]:
if config.top10_apps_filter:
    apps_count_series = df_blue_waters_posix.groupby(by=["app"]).count()["nprocs"].sort_values(ascending=False)
    df_blue_waters_posix = df_blue_waters_posix[df_blue_waters_posix.app.isin(apps_count_series[0:10].index)]

In [None]:
if config.only_duplicates:
    df_blue_waters_posix = df_blue_waters_posix[df_blue_waters_posix["mean"].isna() == False]

In [None]:
if config.meancount75_filter:
    mean_counts = df_blue_waters_posix.groupby("mean",dropna=False)["mean"].transform("count")
    mean_counts_quantile = pd.Series(mean_counts.unique()).quantile(0.75)
    df_blue_waters_posix = df_blue_waters_posix[df_blue_waters_posix.index.isin(mean_counts[mean_counts > mean_counts_quantile].index)]

In [None]:
df_blue_waters_posix = df_blue_waters_posix.drop(['app'], axis=1)
df_blue_waters_posix.head()

In [None]:
POSIX_TOTAL_TIME_df = df_blue_waters_posix.pop('POSIX_TOTAL_TIME')
POSIX_TOTAL_TIME_df.head()

In [None]:
# Separate duplicate set mean from input features and drop errors
dup_set_means_df = df_blue_waters_posix.pop('mean')

In [None]:
df_blue_waters_posix = df_blue_waters_posix.drop(["error"],axis=1)
df_blue_waters_posix.head()

In [None]:
# Fix seeds for reproducibility
random.seed(config.random_seed)
np.random.seed(config.random_seed)

torch.manual_seed(config.random_seed)
torch.cuda.manual_seed_all(config.random_seed)

torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [None]:
# Split the data
X_train, X_test, y_train, y_test, dup_set_means_train, dup_set_means_test = train_test_split(df_blue_waters_posix,
                                                    POSIX_TOTAL_TIME_df,
                                                    dup_set_means_df,
                                                    test_size=config.test_size,
                                                    random_state=config.split_seed,
                                                    stratify=df_blue_waters_posix["nprocs"] if config.stratified_split else None)

In [None]:
X_train_subset, X_val_subset, y_train_subset, y_val_subset, dup_set_means_train_subset, dup_set_means_val_subset = train_test_split(X_train,
                                                    y_train,
                                                    dup_set_means_train,
                                                    train_size=0.8,
                                                    random_state=config.split_seed)

In [None]:
clf = IsolationForest(random_state=0, n_jobs=-1)
clf.fit(X_train_subset)

In [None]:
with open(ISOLATIONFOREST_PATH,'wb') as f:
    pickle.dump(clf,f)

In [None]:
X_train_subset_outlier_labels = pd.Series(clf.predict(X_train_subset))
X_val_subset_outlier_labels = pd.Series(clf.predict(X_val_subset))
X_test_outlier_labels = pd.Series(clf.predict(X_test))

In [None]:
if config.isolation_forest_train:
    X_train_subset = X_train_subset.reset_index()[X_train_subset_outlier_labels == 1].drop(["index"],axis=1)
    y_train_subset = y_train_subset.reset_index()[X_train_subset_outlier_labels == 1].drop(["index"],axis=1)
    dup_set_means_train_subset = dup_set_means_train_subset.reset_index()[X_train_subset_outlier_labels == 1].drop(["index"],axis=1)

In [None]:
if config.isolation_forest_val:
    X_val_subset = X_val_subset.reset_index()[X_val_subset_outlier_labels == 1].drop(["index"],axis=1)
    y_val_subset = y_val_subset.reset_index()[X_val_subset_outlier_labels == 1].drop(["index"],axis=1)

In [None]:
if config.isolation_forest_test:
    X_test = X_test.reset_index()[X_test_outlier_labels == 1].drop(["index"],axis=1)
    y_test = y_test.reset_index()[X_test_outlier_labels == 1].drop(["index"],axis=1)
    dup_set_means_test = dup_set_means_test.reset_index()[X_test_outlier_labels == 1].drop(["index"],axis=1)

In [None]:
# Compute counts for weighted random sampler as 1/duplicate_set_size
dup_set_means_train_subset_df = pd.DataFrame(dup_set_means_train_subset)
mean_counts = dup_set_means_train_subset_df.groupby("mean",dropna=False)["mean"].transform("count")
mean_counts.loc[mean_counts == 0] = 1
weights = 1 / mean_counts
weights.head()

In [None]:
generator = torch.Generator().manual_seed(config.random_seed)
sampler = WeightedRandomSampler(weights.to_numpy(),len(weights),replacement=True,generator=generator)

In [None]:
# Save the test dataset for later analysis with captum
# test_df = X_test.copy()
# test_df["POSIX_TOTAL_TIME"] = y_test
# test_df = test_df.reset_index()[X_test_outlier_labels == -1].drop(["index"],axis=1)
# test_df.to_csv(Path(MODEL_DIR,r"captum_test_data.csv"))
# test_df.to_csv(Path(MODEL_DIR,r"test_outliers_index_reset.csv"))

In [None]:
agglo = cluster.FeatureAgglomeration(n_clusters=config.feature_agglomeration_nclusters)
agglo.fit(df_blue_waters_posix)
with open(FEATUREAGGLO_PATH,'wb') as f:
    pickle.dump(agglo,f)

In [None]:
if config.feature_agglomeration:
    X_train_subset = agglo.transform(X_train_subset)
    X_val_subset = agglo.transform(X_val_subset)
    X_test = agglo.transform(X_test) 

In [None]:
# Scale the input features with RobustScaler
scaler = RobustScaler().fit(X_train_subset)

In [None]:
with open(ROBUSTSCALER_PATH,'wb') as f:
    pickle.dump(scaler,f)

In [None]:
X_train_subset_scaled = scaler.transform(X_train_subset)
X_val_subset_scaled = scaler.transform(X_val_subset)

In [None]:
tensor_X_train = torch.Tensor(X_train_subset_scaled).to(device)
tensor_y_train = torch.Tensor(y_train_subset.values).view(-1, 1).to(device)

In [None]:
training_dataset = TensorDataset(tensor_X_train, tensor_y_train)

In [None]:
# If shuffle disabled, use weighted random sampling
if config.shuffle:
    training_dataloader = DataLoader(training_dataset, batch_size=config.batch_size, shuffle=config.shuffle)
else:
    training_dataloader = DataLoader(training_dataset, batch_size=config.batch_size, sampler=sampler)

In [None]:
tensor_X_val = torch.Tensor(X_val_subset_scaled).to(device)
tensor_y_val = torch.Tensor(y_val_subset.values).view(-1, 1).to(device)

In [None]:
validation_dataset = TensorDataset(tensor_X_val, tensor_y_val)
validation_dataloader = DataLoader(validation_dataset, batch_size=config.batch_size)  #, shuffle=config.shuffle)

In [None]:
X_test_scaled = scaler.transform(X_test)

In [None]:
tensor_X_test = torch.Tensor(X_test_scaled).to(device)
tensor_y_test = torch.Tensor(y_test.values).view(-1, 1).to(device)

In [None]:
test_dataset = TensorDataset(tensor_X_test, tensor_y_test)
test_dataloader = DataLoader(test_dataset, batch_size=config.batch_size)

In [None]:
model = nn.Sequential(
    nn.Linear(config.feature_agglomeration_nclusters if config.feature_agglomeration else 89, 512),
    nn.Dropout(p=config.dropout),
    nn.ReLU(),
    nn.Linear(512, 256),
    nn.Dropout(p=config.dropout),
    nn.ReLU(),
    nn.Linear(256, 128),
    nn.Dropout(p=config.dropout),
    nn.ReLU(),
    nn.Linear(128, 1)
).to(device)

In [None]:
# By default Pytorch returns avg loss per minibatch elements. But since the last batch
# (both in training and test) does not have enough instances, sum all the loss across the batches
# and then divide it by total number of elements in the the test set.
loss_fn = nn.SmoothL1Loss(beta=config.smooth_l1_loss_beta, reduction="sum").to(device)

optimizer = optim.Adamax(model.parameters(), lr=config.learning_rate, weight_decay=config.weight_decay)

scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', verbose=True)

In [None]:
model_epoch = 0
model.train()

In [None]:
def train():
    size = len(training_dataloader)
    for (X, y) in training_dataloader:
        y_pred = model(X)
        
        # Divide the summed loss by the number of elements in the current batch to get the average loss
        loss = loss_fn(y, y_pred) / len(X)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    model.train()

In [None]:
def test():
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for X, y in validation_dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item() 

    # Divide the summed test loss by the number of elements in the whole test dataset to get the average loss
    test_loss /= len(validation_dataloader.dataset)

    # print(f"Avg loss: {test_loss:>8f} \n")

    return test_loss

In [None]:
test_losses = []

for epoch in range(model_epoch, config.epochs):
    # print(f"Epoch {epoch + 1}\n-------------------------------")
    train()
    test_loss = test()

    scheduler.step(test_loss)

    model_epoch = epoch
    test_losses.append(test_loss)

    torch.save({
        'epoch': model_epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'scheduler_state_dict': scheduler.state_dict(),
        'test_losses': test_losses
    }, MODEL_PATH)

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(test_losses, label='Test Loss', color='royalblue', linewidth=2, marker='o', markersize=4)

plt.title("Test Loss Over Epochs", fontsize=16)
plt.xlabel("Epoch", fontsize=14)
plt.ylabel("Loss", fontsize=14)
plt.grid(True, linestyle='--', alpha=0.6)
plt.legend(fontsize=12)
plt.tight_layout()
plt.savefig("Full_data_Model_test_loss.png", dpi=300)
plt.show()

In [None]:
test_output_tensor = torch.Tensor([]).to(device)
with torch.no_grad():
    for X,y in test_dataloader:
        output = model(X)
        output_as_tensor = torch.Tensor(output).to(device)
        test_output_tensor = torch.cat((test_output_tensor,output_as_tensor))
test_output_df = pd.DataFrame(test_output_tensor.cpu().numpy())
test_output_df

In [None]:
abs_errors = (test_output_df[0] - dup_set_means_test.fillna(0).reset_index()["mean"]).abs()
abs_errors

In [None]:
# Division result will be NaN where the mean is NaN because the set has no duplicates. Median ignores NaN
abs_errors_percent = (abs_errors / dup_set_means_test.reset_index()["mean"])
abs_errors_percent

In [None]:
mae = abs_errors_percent.median()
print(f"MAE: {mae}")

In [None]:
dup_set_means_test_df = pd.DataFrame(dup_set_means_test)
mean_counts_test = dup_set_means_test_df.groupby("mean",dropna=False)["mean"].transform("count")
mean_counts_test.loc[mean_counts_test == 0] = 1
weights_test = 1 / mean_counts_test
weights_test.loc[mean_counts_test < 1] = weights_test.loc[mean_counts_test < 1]   
weights_test_nona = weights_test.reset_index()[abs_errors_percent.isna() == False].drop(["index"],axis=1)["mean"]
weights_test_nona_normalized = weights_test_nona / weights_test_nona.sum()
weighted_mae = quantile_1D(abs_errors_percent[abs_errors_percent.isna() == False].to_numpy().T,weights_test_nona_normalized.to_numpy().T,0.5)
print(f"Weighted MAE: {weighted_mae}")

In [None]:
print(f"Weighted MAE: {weighted_mae}")
test_outliers = len(X_test_outlier_labels[(X_test_outlier_labels == -1) & (dup_set_means_test.reset_index()["mean"].notnull())])
print(f"Outliers in test set that are considered in MAE computation: {test_outliers}")
print(f"Feature Agglomeration clusters: {agglo.labels_}")