In [None]:
from datetime import date, datetime, timedelta
import pandas as pd

import numpy as np
import math
from sklearn.preprocessing import StandardScaler
from sklearn.utils import resample
from sklearn import metrics, preprocessing
from sklearn.model_selection import KFold

from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.tree import DecisionTreeClassifier
from rgf.sklearn import RGFClassifier
from sklearn.svm import SVC

import os
from tqdm import tqdm

import matplotlib.pyplot as plt

from scipy import stats

In [None]:
def obtain_natural_chunks(features, labels, terminals):
    feature_list = []
    label_list = []
    for i in range(len(terminals) - 1):
        idx = np.logical_and(features[:, 0] >= terminals[i], features[:, 0] < terminals[i + 1])
        feature_list.append(features[idx][:, 1:])
        label_list.append(labels[idx])
    return feature_list, label_list


In [None]:
def obtain_intervals(dataset):
    '''
    Generate interval terminals, so that samples in each interval have:
        interval_i = (timestamp >= terminal_i) and (timestamp < terminal_{i+1})

    Args:
        dataset (chr): Assuming only Backblaze (b) and Google (g) datasets exists
    '''
    if dataset == 'g':
        # time unit in Google: millisecond, tracing time: 29 days
        start_time = 604046279
        unit_period = 24 * 60 * 60 * 1000 * 1000  # unit period: one day
        end_time = start_time + 28*unit_period
    elif dataset == 'b':
        # time unit in Backblaze: week, tracing time: one year (50 weeks)
        start_time = 1
        unit_period = 7  # unit period: one week (7 days)
        end_time = start_time + 50*unit_period
    # original 1 month
    '''
    elif dataset == 'b':
        # time unit in Backblaze: month, tracing time: one year (12 months)
        start_time = 1
        unit_period = 1  # unit period: one month
        end_time = start_time + 12*unit_period
    '''
    
    
    # add one unit for the open-end of range function
    terminals = [i for i in range(start_time, end_time+unit_period, unit_period)]

    return terminals

In [None]:
def features_labels_extraction(dataset, data_path):
    
    # currently assume only b (disk) and g (job) data exist
    if(dataset=='b'):

        
        # define features
        features_disk_failure = ['smart_1_raw', 'smart_4_raw', 'smart_5_raw', 'smart_7_raw', 'smart_9_raw', 'smart_12_raw', 'smart_187_raw', 'smart_193_raw', 'smart_194_raw', 'smart_197_raw', 'smart_199_raw', 
                         'smart_4_raw_diff', 'smart_5_raw_diff', 'smart_9_raw_diff', 'smart_12_raw_diff', 'smart_187_raw_diff', 'smart_193_raw_diff', 'smart_197_raw_diff', 'smart_199_raw_diff']
        columns = ['serial_number', 'date'] + features_disk_failure + ['label']
        
        # read data
        df = pd.read_csv(data_path, header=None)
        # put columns names
        df.columns = columns
        # ignore serial number
        df = df[df.columns[1:]]
        
        # transform date to date time
        df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d')
        
        # divide on weeks
        df['date'] = pd.Series(pd.DatetimeIndex(df['date']).day_of_year)
        
        # extract features and labels
        features = df[df.columns[:-1]].to_numpy()
        labels = df[df.columns[-1]].to_numpy()
        
        feature_list, label_list = obtain_natural_chunks(features, labels, obtain_intervals('b'))
        
    elif(dataset == 'g'):
        
        # define features
        features_job_failure = ['User ID', 'Job Name', 'Scheduling Class',
                   'Num Tasks', 'Priority', 'Diff Machine', 'CPU Requested', 'Mem Requested', 'Disk Requested',
                   'Avg CPU', 'Avg Mem', 'Avg Disk', 'Std CPU', 'Std Mem', 'Std Disk']
        columns_initial = ['Job ID', 'Status', 'Start Time', 'End Time'] + features_job_failure
        
        # read data
        df = pd.read_csv(data_path, header=None)
        # put columns names
        df.columns = columns_initial
        df = df.tail(-1)

        # drop Job ID
        df = df.drop(['Job ID'], axis = 1)
        
        # get features
        columns = features_job_failure
        
        include_end_time = False
        
        # EXTRACT FEATURES AND LABELS
        features = df[(['Start Time']+ features_job_failure)].to_numpy()
        labels = (df['Status']==3).to_numpy()


        # FEATURES PREPROCESSING
        offset = (1 if include_end_time else 0)

        # ENCODE USER ID
        le = preprocessing.LabelEncoder()
        features[:, 1+offset] = le.fit_transform(features[:, 1+offset])

        # ENCODE JOB NAME
        le = preprocessing.LabelEncoder()
        features[:, 2+offset] = le.fit_transform(features[:, 2+offset])

        features = features.astype(float)
        
        feature_list, label_list = obtain_natural_chunks(features, labels, obtain_intervals('g'))

    else:
        print('Undefined value')
    return feature_list, label_list

We assume that only Backblaze (disk failure) - 'b' and Google (job failure) - 'g' datasets exist.

In [None]:
# set dataset to 'g' for experiments with Google data (job) and 'b' for experiments with Backblaze data (disk)
dataset = 'g'

# Reading and Preprocessing Data

In [None]:
if (dataset == 'b'):
    DATASET_PATH = './datasets/disk_failure_2015.csv'
elif(dataset == 'g'):
    DATASET_PATH = './datasets/google_job_failure.csv'

feature_list, label_list = features_labels_extraction(dataset, DATASET_PATH)

In [None]:
DATASET_PATH

In [None]:
len(feature_list)

# Data Preprocessing

In [None]:
if(dataset == 'b'):
    df_concept_drift = pd.read_csv('../../Documents/phd_related/AIOps_disk_failure_prediction/feature_importance/rf_concept_drift_localization_disk_week_2015_r_1.csv')
elif(dataset == 'g'):
    df_concept_drift = pd.read_csv('../../Documents/phd_related/AIOps_disk_failure_prediction/feature_importance/rf_concept_drift_localization_job_r_1.csv')
df_concept_drift = df_concept_drift.loc[:, ~df_concept_drift.columns.str.contains('^Unnamed')]
df_concept_drift

In [None]:
scaler = StandardScaler()
stat_test = stats.kstest

# Monitoring the No of Features that Change in Each Period

In [None]:
perc_changed_features_per_period = []
no_changed_features_per_period = []

for period in tqdm(range(0, len(feature_list)-1)):

    # extract features train and test
    training_features = scaler.fit_transform(feature_list[period])
    testing_features = scaler.transform(feature_list[period+1])
    
    # convert numpy array to Pandas Dataframe
    df_train_features = pd.DataFrame(training_features, columns = features_disk_failure)
    df_test_features = pd.DataFrame(testing_features, columns = features_disk_failure)
    
    no_changed_features = 0
    for feature in features_disk_failure:
        #print(feature)
        #print(df_train_features[feature])
        #print(df_test_features[feature])
        v, p = stat_test(df_train_features[feature], df_test_features[feature])
        if(p<0.05):
            no_changed_features = no_changed_features + 1
    
    no_changed_features_per_period.append(no_changed_features)
    perc_changed_features_per_period.append(no_changed_features/len(features_disk_failure))

In [None]:
len(perc_changed_features_per_period)

In [None]:
# period
if(dataset == 'b'):
    period = []
    for i in range(0, len(feature_list)-1):
        string_period = 'W' + str(i+1) + '_' + str(i+2)
        period.append(string_period)
elif(dataset == 'g'):
    for i in range(0, len(feature_list)-1):
        string_period = 'D' + str(i+1) + '_' + str(i+2)
        period.append(string_period)

In [None]:
drift_grount_truth = list(df_concept_drift.Sig)

In [None]:
# dataframe with final results

df_results_monitoring_all_individual_features = pd.DataFrame()
df_results_monitoring_all_individual_features['Period'] = period
df_results_monitoring_all_individual_features['Drift Ground Truth'] = drift_grount_truth
df_results_monitoring_all_individual_features['No Changed Features Per Period'] = no_changed_features_per_period
df_results_monitoring_all_individual_features['Percentage Changed Features Per Period'] = perc_changed_features_per_period
df_results_monitoring_all_individual_features

In [None]:
if(dataset == 'b'):
    df_results_monitoring_all_individual_features.to_csv('./results/df_percentage_of_changed_features_disk.csv')
elif(dataset == 'g'):
    df_results_monitoring_all_individual_features.to_csv('./results/df_percentage_of_changed_features_job.csv')