In [1]:
# Import Libraries
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, roc_curve, classification_report, precision_recall_curve, auc, accuracy_score, precision_score, recall_score, f1_score
import ipywidgets as widgets
from ipywidgets import interact
import pandas as pd
from imblearn.over_sampling import SMOTE
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from sklearn.linear_model import LogisticRegression
from xgboost import XGBClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import StratifiedShuffleSplit, GridSearchCV
from sklearn.preprocessing import LabelEncoder
import warnings
import shap
import numpy as np
from art.attacks.poisoning import PoisoningAttackBackdoor, FeatureCollisionAttack
from art.estimators.classification import SklearnClassifier
from art.utils import to_categorical

sns.set(style="whitegrid")
warnings.filterwarnings('ignore')

In [2]:
# Load the preprocessed data
data = pd.read_csv('Preprocessed_Data.csv')

# Rename columns to remove special characters
data.rename(columns={
    'Air temperature [K]': 'Air_temperature_K',
    'Process temperature [K]': 'Process_temperature_K',
    'Rotational speed [rpm]': 'Rotational_speed_rpm',
    'Torque [Nm]': 'Torque_Nm',
    'Tool wear [min]': 'Tool_wear_min'
}, inplace=True)

# Create the 'No failure' column
data['No failure'] = 1 - data['Machine failure']

# Define features and target
X = data[['Type', 'Air_temperature_K', 'Process_temperature_K', 'Rotational_speed_rpm', 'Torque_Nm', 'Tool_wear_min']]
y = data[['No failure', 'TWF', 'HDF', 'PWF', 'OSF', 'RNF']].idxmax(axis=1)

# Encode the target variable
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)
set(y_encoded)

{0, 1, 2, 3, 4}

In [3]:
# Initialize stratified split
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.3, random_state=42)

for train_index, test_index in sss.split(X, y_encoded):
    X_train, X_test = X.iloc[train_index], X.iloc[test_index]
    y_train, y_test = y_encoded[train_index], y_encoded[test_index]

# Apply SMOTE to oversample the training data
smote = SMOTE(random_state=42)
X_train_res, y_train_res = smote.fit_resample(X_train, y_train)

# Define all possible target names
all_classes = label_encoder.classes_
set(all_classes)

{'HDF', 'No failure', 'OSF', 'PWF', 'TWF'}

In [4]:
# Define models with the best parameters
models = {
    'Random Forest': RandomForestClassifier(
        max_depth=35, 
        min_samples_split=3, 
        n_estimators=150, 
        random_state=42
    ),
    'XGBoost': XGBClassifier(
        learning_rate=0.4, 
        max_depth=5, 
        n_estimators=400, 
        subsample=1.0, 
        random_state=42
    ),
    'Neural Network': MLPClassifier(
        activation='relu', 
        hidden_layer_sizes=(50, 50), 
        solver='adam', 
        max_iter=350, 
        random_state=42
    )
}

In [5]:
# Define the poisoning function for multiclass classification
def label_flip_poisoning(X_train, y_train, poison_percentage, target_class):
    # Target Class 1: Flipping "no failure" (y_train == 1) to the 6th failure (RNF, y_train == 6)
    if target_class == 1:
        # Find indices where y_train indicates no failure (y_train == 1)
        no_failure_indices = np.where(y_train == 1)[0]
        
        # Determine the number of labels to flip based on the poison percentage
        num_to_flip = int(poison_percentage * len(no_failure_indices))
        
        # Randomly select indices to flip
        flip_indices = np.random.choice(no_failure_indices, size=num_to_flip, replace=False)
        
        # Create poisoned labels by copying y_train
        y_train_poisoned = y_train.copy()
        
        # Flip the selected "no failure" labels (1) to the 6th failure (RNF, y_train == 6)
        y_train_poisoned[flip_indices] = 0
    
    # Target Class 2: Flipping failure types (2 to 6) to "no failure" (y_train == 1)
    else:
        # Find indices where y_train indicates failure (y_train in [2, 3, 4, 5, 6])
        failure_indices = np.where(y_train > 1)[0]
        
        # Determine the number of labels to flip based on the poison percentage
        num_to_flip = int(poison_percentage * len(failure_indices))
        
        # Randomly select indices to flip
        flip_indices = np.random.choice(failure_indices, size=num_to_flip, replace=False)
        
        # Create poisoned labels by copying y_train
        y_train_poisoned = y_train.copy()
        
        # Flip the selected failure labels (2 to 6) to "no failure" (1)
        y_train_poisoned[flip_indices] = 1
    
    return X_train, y_train_poisoned


In [6]:
# Detect suspicious labels using KNN and return the most common neighbors for correction
def identify_suspicious_labels(X_train, y_train, threshold=0.7, n_neighbors=3):
    # Ensure X_train is a NumPy array for KNN compatibility
    if isinstance(X_train, pd.DataFrame):
        X_train = X_train.values
    
    knn = KNeighborsClassifier(n_neighbors=n_neighbors)
    knn.fit(X_train, y_train)
    y_pred_knn = knn.predict(X_train)
    
    # Identify mismatches between the predicted labels and the actual labels
    mismatches = (y_train != y_pred_knn)
    suspicious_idx = np.where(mismatches)[0]
    
    # Initialize a list to store the most common neighbor class for each suspicious point
    most_common_neighbors = []
    
    # Loop through suspicious indices and find the most common neighbor class
    for idx in suspicious_idx:
        neighbor_indices = knn.kneighbors([X_train[idx]], return_distance=False)[0]
        
        # Find the most common class among the neighbors
        neighbor_classes = y_train[neighbor_indices]  # Since y_train is a NumPy array, we use direct indexing
        most_common_class = np.bincount(neighbor_classes).argmax()  # Get the most frequent class
        most_common_neighbors.append(most_common_class)
    
    print(f"Number of suspicious indices: {len(suspicious_idx)}")
    return suspicious_idx, most_common_neighbors


In [7]:
# Correct the suspicious labels by assigning them the most common class among neighbors
def correct_labels_failure_type(X_train, y_train, suspicious_idx, most_common_neighbors):
    corrected_labels = y_train.copy()
    
    # Replace the suspicious labels with the most common class from neighbors
    for i, idx in enumerate(suspicious_idx):
        corrected_labels.iloc[idx] = most_common_neighbors[i]
    
    return corrected_labels
# Define label deletion
def delete_labels(X_train, y_train, suspicious_idx):
    X_train_cleaned = np.delete(X_train, suspicious_idx, axis=0)
    y_train_cleaned = np.delete(y_train, suspicious_idx, axis=0)
    return X_train_cleaned, y_train_cleaned

In [None]:
# Define the poisoning percentages to test
poison_percentages = [0, 0.1, 0.2, 0.3, 0.4, 0.5]
target_classes = [1, 2]  # 0 for no failure, 1 for failure
# Initialize dictionary to store FDR results
fdr_results = {'Model': [], 'Target': [], 'Intervention': [], '0%': [], '10%': [], '20%': [], '30%': [], '40%': [], '50%': [], 'Mean': []}

# Define the interventions
interventions = ['No Intervention', 'Correction', 'Deletion']

# Loop through each model
for name, model in models.items():
    print(f'Model: {name}')
    
    # Loop through each target class (only focusing on target class 1)
    for target_class in [2]:  # Focus only on failure class
        print(f'Target Class: {target_class}')
        
        # Loop through each intervention type (No Intervention, Correction, Deletion)
        for intervention in interventions:
            print(f'Intervention: {intervention}')
            
            # Store the model name, target class, and intervention type in the results dictionary
            fdr_results['Model'].append(name)
            fdr_results['Target'].append(target_class)
            fdr_results['Intervention'].append(intervention)
            
            # Store a list to calculate the mean FDR values later
            fdr_values = []
            
            # Loop through each poisoning percentage (including 0% for clean data)
            for poison_percentage in poison_percentages:
                print(f'Poison percentage: {poison_percentage}')

                # Poison the training data
                X_train_poisoned, y_train_poisoned = label_flip_poisoning(X_train_res, y_train_res, poison_percentage, target_class)
                
                # Apply label correction or deletion if needed
                if intervention == 'Correction':
                    suspicious_indices, most_common_neighbors = identify_suspicious_labels(X_train_poisoned, y_train_poisoned)
                    y_train_poisoned[suspicious_indices] = most_common_neighbors  # Correct labels based on neighbors
                elif intervention == 'Deletion':
                    suspicious_indices, _ = identify_suspicious_labels(X_train_poisoned, y_train_poisoned)
                    X_train_poisoned, y_train_poisoned = delete_labels(X_train_poisoned, y_train_poisoned, suspicious_indices)

                # Train the model on the (possibly corrected or cleaned) poisoned data
                model.fit(X_train_poisoned, y_train_poisoned)
                
                # Make predictions on the clean test set
                y_pred = model.predict(X_test)
                
                # Calculate Failure Detection Rate (FDR) for class 1 (failure)
                true_failure = (y_test == 2)
                true_positive_failures = ((y_pred == 2) & (y_test == 2)).sum()
                fdr = true_positive_failures / true_failure.sum() if true_failure.sum() > 0 else 0
                
                # Store the FDR based on the poison percentage for the current intervention and target class
                fdr_results[f'{int(poison_percentage * 100)}%'].append(fdr)
                fdr_values.append(fdr)

            # Calculate the mean FDR across all poisoning percentages
            fdr_results['Mean'].append(sum(fdr_values) / len(fdr_values))

# Convert the FDR results dictionary to a DataFrame
fdr_df = pd.DataFrame(fdr_results)

# Display the FDR results
print("Failure Detection Rate (FDR) Results:")
display(fdr_df)

Model: Random Forest
Target Class: 2
Intervention: No Intervention
Poison percentage: 0
Poison percentage: 0.1
Poison percentage: 0.2
Poison percentage: 0.3
Poison percentage: 0.4
Poison percentage: 0.5
Intervention: Correction
Poison percentage: 0
Number of suspicious indices: 213
Poison percentage: 0.1
Number of suspicious indices: 2014
Poison percentage: 0.2
Number of suspicious indices: 3437
Poison percentage: 0.3
Number of suspicious indices: 4458
Poison percentage: 0.4
Number of suspicious indices: 5029
Poison percentage: 0.5
Number of suspicious indices: 5076
Intervention: Deletion
Poison percentage: 0
Number of suspicious indices: 213
Poison percentage: 0.1
Number of suspicious indices: 2006
Poison percentage: 0.2
Number of suspicious indices: 3392
Poison percentage: 0.3
Number of suspicious indices: 4411
Poison percentage: 0.4
Number of suspicious indices: 4977
Poison percentage: 0.5
Number of suspicious indices: 5187
Model: XGBoost
Target Class: 2
Intervention: No Interventio