# Using One-Class SVM

One-Class SVM 是支持向量机（SVM）的一种变体，专门用于异常检测（outlier detection）或单类分类（one-class classification）。

In [1]:
from utility_OneClassSVM import read_all_test_data_from_path, run_cv_one_motor
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
from sklearn.svm import OneClassSVM
import warnings
%matplotlib inline

# Subfunction for data preprocessing.
def remove_outliers(df: pd.DataFrame):
    ''' # Description
    Remove outliers from the dataframe based on defined valid ranges. 
    Define a valid range of temperature and voltage. 
    Use ffil function to replace the invalid measurement with the previous value.
    '''
    df['temperature'] = df['temperature'].where(df['temperature'] <= 100, np.nan)
    df['temperature'] = df['temperature'].where(df['temperature'] >= 0, np.nan)
    df['temperature'] = df['temperature'].ffill()

    df['voltage'] = df['voltage'].where(df['voltage'] >= 6000, np.nan)
    df['voltage'] = df['voltage'].where(df['voltage'] <= 9000, np.nan)
    df['voltage'] = df['voltage'].ffill()

    df['position'] = df['position'].where(df['position'] >= 0, np.nan)
    df['position'] = df['position'].where(df['position'] <= 1000, np.nan)
    df['position'] = df['position'].ffill()


# Ignore warnings.
warnings.filterwarnings('ignore')

# Define the steps of the pipeline
steps = [
    ('standardizer', StandardScaler()),  # Step 1: StandardScaler 归一化
    ('mdl', OneClassSVM(kernel='rbf', gamma='scale'))  # Step 2: One-Class SVM
]

# Create the pipeline
pipeline = Pipeline(steps)

# Define hyperparameters to search
param_grid = {
    'mdl__nu': [0.01, 0.1, 0.5, 0.9],  # Anomaly proportion
    'mdl__gamma': ['scale', 'auto', 0.001, 0.01, 0.1, 1]  # Kernel coefficient
}

# Initialize GridSearchCV
grid_search = GridSearchCV(estimator=pipeline, param_grid=param_grid, scoring='f1', cv=5)

# Read all the dataset.
base_dictionary = '../../dataset/training_data/'
df_data = read_all_test_data_from_path(base_dictionary, remove_outliers, is_plot=False)


In [2]:
from utility_OneClassSVM import run_cv_one_motor

# Specify the test conditions you would like to include in the test.
df_data_experiment = df_data[df_data['test_condition'].isin(['20240425_093699', '20240425_094425', '20240426_140055',
                                                       '20240503_164675', '20240503_165189',
                                                       '20240503_163963', '20240325_155003'])]

# We want a seven-fold cross validation.
n_cv = 7

# Define the features.
feature_list_all = ['time', 'data_motor_1_position', 'data_motor_1_temperature', 'data_motor_1_voltage',
                    'data_motor_2_position', 'data_motor_2_temperature', 'data_motor_2_voltage',
                    'data_motor_3_position', 'data_motor_3_temperature', 'data_motor_3_voltage',
                    'data_motor_4_position', 'data_motor_4_temperature', 'data_motor_4_voltage',
                    'data_motor_5_position', 'data_motor_5_temperature', 'data_motor_5_voltage',
                    'data_motor_6_position', 'data_motor_6_temperature', 'data_motor_6_voltage']

# Run cross-validation with One-Class SVM
all_result = run_cv_one_motor(motor_idx=6, df_data=df_data_experiment, mdl=grid_search, feature_list=feature_list_all, n_fold=n_cv)


Model for motor 6:


NameError: name 'run_cross_val' is not defined

In [9]:
from utility import read_all_test_data_from_path, run_cv_one_motor
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
from sklearn.svm import OneClassSVM
import warnings
%matplotlib inline

# Subfunction for data preprocessing.
def remove_outliers(df: pd.DataFrame):
    ''' # Description
    Remove outliers from the dataframe based on defined valid ranges. 
    Define a valid range of temperature and voltage. 
    Use ffil function to replace the invalid measurement with the previous value.
    '''
    df['temperature'] = df['temperature'].where(df['temperature'] <= 100, np.nan)
    df['temperature'] = df['temperature'].where(df['temperature'] >= 0, np.nan)
    df['temperature'] = df['temperature'].ffill()

    df['voltage'] = df['voltage'].where(df['voltage'] >= 6000, np.nan)
    df['voltage'] = df['voltage'].where(df['voltage'] <= 9000, np.nan)
    df['voltage'] = df['voltage'].ffill()

    df['position'] = df['position'].where(df['position'] >= 0, np.nan)
    df['position'] = df['position'].where(df['position'] <= 1000, np.nan)
    df['position'] = df['position'].ffill()

# Ignore warnings.
warnings.filterwarnings('ignore')

# Read all the dataset.
base_dictionary = '../../dataset/training_data/'
df_data = read_all_test_data_from_path(base_dictionary, remove_outliers, is_plot=False)

# Specify the test conditions you would like to include in the test.
df_data_experiment = df_data[df_data['test_condition'].isin(['20240425_093699', '20240425_094425', '20240426_140055',
                                                       '20240503_164675', '20240503_165189',
                                                       '20240503_163963', '20240325_155003'])]

# Define the features.
feature_list_all = ['time', 'data_motor_1_position', 'data_motor_1_temperature', 'data_motor_1_voltage',
                    'data_motor_2_position', 'data_motor_2_temperature', 'data_motor_2_voltage',
                    'data_motor_3_position', 'data_motor_3_temperature', 'data_motor_3_voltage',
                    'data_motor_4_position', 'data_motor_4_temperature', 'data_motor_4_voltage',
                    'data_motor_5_position', 'data_motor_5_temperature', 'data_motor_5_voltage',
                    'data_motor_6_position', 'data_motor_6_temperature', 'data_motor_6_voltage']

# Remove 'test_condition' column for training and prediction
X = df_data_experiment[feature_list_all]
y_true = df_data_experiment[f'data_motor_6_label']

# Define the steps of the pipeline
steps = [
    ('standardizer', StandardScaler()),  # Step 1: StandardScaler 归一化
    ('mdl', OneClassSVM(kernel='rbf', gamma='scale'))  # Step 2: One-Class SVM
]

# Create the pipeline
pipeline = Pipeline(steps)

# Define hyperparameters to search
param_grid = {
    'mdl__nu': [0.01, 0.1, 0.5, 0.9],  # Anomaly proportion
    'mdl__gamma': ['scale', 'auto', 0.001, 0.01, 0.1, 1]  # Kernel coefficient
}

# Initialize GridSearchCV
grid_search = GridSearchCV(estimator=pipeline, param_grid=param_grid, scoring='f1', cv=5)

# We want a seven-fold cross validation.
n_cv = 7

# Run cross-validation with One-Class SVM
all_result = run_cv_one_motor(motor_idx=6, df_data=df_data_experiment, mdl=grid_search, feature_list=feature_list_all, n_fold=n_cv)


Model for motor 6:
   Accuracy  Precision    Recall  F1 score
0  0.093055   0.093055  1.000000  0.170265
1  0.060606   0.080745  0.158537  0.106996
2  0.769643   0.000000  0.000000  0.000000
3  0.060345   0.060345  1.000000  0.113821
4  0.302002   0.302002  1.000000  0.463904
5  0.539232   0.752475  0.633333  0.687783
6  0.678250   0.025000  0.004717  0.007937


Mean performance metric and standard error:
Accuracy: 0.3576 +- 0.3042
Precision: 0.1877 +- 0.2678
Recall: 0.5424 +- 0.4773
F1 score: 0.2215 +- 0.2576


In [5]:
import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import OneClassSVM
from utility_OneClassSVM import *
import warnings
# Ignore warnings.
warnings.filterwarnings('ignore')

def remove_outliers(df: pd.DataFrame):
    ''' # Description
    Remove outliers from the dataframe based on defined valid ranges. 
    Define a valid range of temperature and voltage. 
    Use ffil function to replace the invalid measurement with the previous value.
    '''
    df['temperature'] = df['temperature'].where(df['temperature'] <= 100, np.nan)
    df['temperature'] = df['temperature'].where(df['temperature'] >= 0, np.nan)
    df['temperature'] = df['temperature'].ffill()
    df['temperature'] = df['temperature'] - df['temperature'].iloc[0]

    df['voltage'] = df['voltage'].where(df['voltage'] >= 6000, np.nan)
    df['voltage'] = df['voltage'].where(df['voltage'] <= 9000, np.nan)
    df['voltage'] = df['voltage'].ffill()
    df['voltage'] = df['voltage'] - df['voltage'].iloc[0]

    df['position'] = df['position'].where(df['position'] >= 0, np.nan)
    df['position'] = df['position'].where(df['position'] <= 1000, np.nan)
    df['position'] = df['position'].ffill()
    df['position'] = df['position'] - df['position'].iloc[0]

# Read data.
base_dictionary = '../../dataset/training_data/'
df_data = read_all_test_data_from_path(base_dictionary, remove_outliers, is_plot=False)

# Pre-train the model.
# Get all the normal data.
normal_test_id = ['20240105_164214',
                  '20240105_165300',
                  '20240105_165972',
                  '20240320_152031',
                  '20240320_153841',
                  '20240320_155664',
                  '20240321_122650',
                  '20240325_135213',
                  '20240426_141190',
                  '20240426_141532',
                  '20240426_141602',
                  '20240426_141726',
                  '20240426_141938',
                  '20240426_141980',
                  '20240503_164435']

df_tr = df_data[df_data['test_condition'].isin(normal_test_id)]

feature_list_all = ['time', 'data_motor_1_position', 'data_motor_1_temperature', 'data_motor_1_voltage',
                    'data_motor_2_position', 'data_motor_2_temperature', 'data_motor_2_voltage',
                    'data_motor_3_position', 'data_motor_3_temperature', 'data_motor_3_voltage',
                    'data_motor_4_position', 'data_motor_4_temperature', 'data_motor_4_voltage',
                    'data_motor_5_position', 'data_motor_5_temperature', 'data_motor_5_voltage',
                    'data_motor_6_position', 'data_motor_6_temperature', 'data_motor_6_voltage']

# Prepare feature and response of the training dataset.
x_tr_org, _ = extract_selected_feature(df_data=df_tr, feature_list=feature_list_all, motor_idx=6, mdl_type='clf')

# Enrich the features based on the sliding window.
window_size = 10
sample_step = 1

x_tr, _ = prepare_sliding_window(df_x=x_tr_org, y=pd.Series(np.zeros(len(x_tr_org))), window_size=window_size,
                                 sample_step=sample_step, prediction_lead_time=1, mdl_type='clf')

# Define the One Class SVM model
steps = [
    ('standardizer', StandardScaler()),  # Step 1: StandardScaler
    ('ocsvm', OneClassSVM(kernel='rbf', gamma='scale'))  # Step 2: One Class SVM
]

# Create the pipeline
mdl_ocsvm = Pipeline(steps)
# Fit the model
mdl = mdl_ocsvm.fit(x_tr)

# Test data.
test_id = [
    '20240325_155003',
    '20240425_093699',
    # '20240425_094425',
    # '20240426_140055',
    # '20240503_163963',
    # '20240503_164675',
    # '20240503_165189'
]
df_test = df_data[df_data['test_condition'].isin(test_id)]

# Define the fault detector.
detector_ocsvm = FaultDetect_OCSVM(ocsvm_mdl=mdl, threshold=3, window_size=window_size, sample_step=sample_step)

# Test
_, y_label_test_org = extract_selected_feature(df_data=df_test, feature_list=feature_list_all, motor_idx=6,
                                               mdl_type='clf')
x_test_org, _ = extract_selected_feature(df_data=df_test, feature_list=feature_list_all, motor_idx=6,
                                         mdl_type='clf')

# 生成与训练阶段相同的滑动窗口特征
x_test, _ = prepare_sliding_window(df_x=x_test_org, y=pd.Series(np.zeros(len(x_test_org)), index=x_test_org.index), window_size=window_size,
                                   sample_step=sample_step, prediction_lead_time=1, mdl_type='clf')

# Predict the labels
y_label_pred_tmp = detector_ocsvm.predict(x_test)

# Get the true values.
_, y_label_test = prepare_sliding_window(df_x=x_test_org, y=y_label_test_org, sequence_name_list=test_id,
                                         window_size=window_size, sample_step=sample_step, prediction_lead_time=1,
                                         mdl_type='clf')

# Show the results
show_clf_result(y_tr=y_label_test, y_test=y_label_test, y_pred_tr=y_label_pred_tmp, y_pred=y_label_pred_tmp)

# Run cross validation
n_fold = 7
all_result = run_cv_one_motor(motor_idx=6, df_data=df_data, mdl=detector_ocsvm, feature_list=feature_list_all,
                              n_fold=n_fold)

KeyError: 'None of [RangeIndex(start=0, stop=6652, step=1)] are in the [index]'