In [None]:
# imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from imblearn.over_sampling import SMOTENC
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score, roc_auc_score, f1_score, fbeta_score, confusion_matrix, make_scorer
from sklearn.inspection import permutation_importance
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.ensemble import VotingClassifier
from sklearn.neural_network import MLPClassifier
import time

# Import data
data_path = 'https://raw.githubusercontent.com/Rapo-zevs/eda/main/predictive_maintenance.csv'
data = pd.read_csv(data_path)
n = data.shape[0]

# First checks
data.info()
print('Number of null values is: {}'.format(data.isnull().sum().sum()))

# Set numeric columns dtype to float
data['Tool wear [min]'] = data['Tool wear [min]'].astype('float64')
data['Rotational speed [rpm]'] = data['Rotational speed [rpm]'].astype('float64')

# Drop ID columns
df = data.copy()
df.drop(columns=['UDI', 'Product ID'], inplace=True)

# The following pie chart shows the percentages of machines by Type:
value = data['Type'].value_counts()
Type_Percentage = 100 * value / data.Type.shape[0]
labels = Type_Percentage.index.array
x = Type_Percentage.array
custom_colors = sns.color_palette('Set3', len(labels))
plt.pie(x, labels=labels, colors=custom_colors, autopct='%1.1f%%', startangle=90)
plt.title('Machine Type percentage')
plt.show()

# Create lists of features and target names
df.dtypes
features = [col for col in df.columns if df[col].dtype == 'float64' or col == 'Type']
target = ['Target', 'Failure Type']

# Portion of data where Random Failure=1
random_fail_index = df.loc[df['Failure Type'] == 'Random Failures'].index
df.loc[random_fail_index]

# Drop RNF
df.drop(index=random_fail_index, inplace=True)

# Portion of data where Machine failure=1 but no failure cause is specified
ambiguous_index = df.loc[(df['Target'] == 1) & (df['Failure Type'] == 'No Failure')].index
second_drop = df.loc[ambiguous_index].shape[0]
display(df.loc[ambiguous_index, target])
df.drop(index=ambiguous_index, inplace=True)

# Global percentage of removed observations
print('Global percentage of removed observations:', (100 * (data.size - df.size) / data.size))
df.reset_index(drop=True, inplace=True)  # Reset index
n = df.shape[0]

# Outliers inspection
num_features = [feature for feature in features if df[feature].dtype == 'float64']
# boxplot of numeric features
fig, axs = plt.subplots(nrows=2, ncols=3, figsize=(18, 7))
fig.suptitle('Numeric features boxplot')
for j, feature in enumerate(num_features):
    sns.boxplot(ax=axs[j // 3, j - 3 * (j // 3)], data=df, x=feature)
plt.show()

def drop_outliers_custom(dataframe, column_name, lower_threshold, upper_threshold):
    # Filter the DataFrame to keep only the data points within the specified range
    dataframe_filtered = dataframe[(dataframe[column_name] >= lower_threshold) & (dataframe[column_name] <= upper_threshold)]
    return dataframe_filtered

df = drop_outliers_custom(df, 'Rotational speed [rpm]', 1300, 2250)
sns.boxplot(x='Rotational speed [rpm]', data=df)
plt.show()
df = drop_outliers_custom(df, 'Torque [Nm]', 12, 73)
sns.boxplot(x='Torque [Nm]', data=df)
plt.show()

# Resampling with SMOTE
# Portion of df where there is a failure and causes percentage
fail_index = df.loc[df['Failure Type'] != 'No Failure'].index
df_fail = df.loc[fail_index]
df_fail_percentage = 100 * df_fail['Failure Type'].value_counts() / df_fail['Failure Type'].shape[0]

# Pie plot
plt.title('Causes involved in Machine failures')
plt.pie(x=df_fail_percentage.array, labels=df_fail_percentage.index.array,
        colors=sns.color_palette('tab10')[0:4], autopct='%.0f%%')
plt.show()

# n_working must represent 80% of the desired length of resampled dataframe
n_working = df['Failure Type'].value_counts()['No Failure']
desired_length = round(n_working / 0.8)
spc = round((desired_length - n_working) / 4)  # samples per class

# Resampling
balance_cause = {'No Failure': n_working,
                 'Overstrain Failure': spc,
                 'Heat Dissipation Failure': spc,
                 'Power Failure': spc,
                 'Tool Wear Failure': spc}

sm = SMOTENC(categorical_features=[0, 7], sampling_strategy=balance_cause, random_state=0)
df_res, y_res = sm.fit_resample(df, df['Failure Type'])

# Comparison after resampling
# Portion of df_res where there is a failure and causes percentage
idx_fail_res = df_res.loc[df_res['Failure Type'] != 'No Failure'].index
df_res_fail = df_res.loc[idx_fail_res]
fail_res_percentage = 100 * df_res_fail['Failure Type'].value_counts() / df_res_fail.shape[0]

fig, axs = plt.subplots(ncols=2, figsize=(12, 4))
fig.suptitle('Causes involved in Machine failures')

# Create bar charts on the first subplot
axs[0].bar(df_fail_percentage.index.array, df_fail_percentage.array, color=sns.color_palette('tab10')[0:4])
axs[0].set_ylabel('Percentage')
axs[0].set_title('Original')

# Create bar charts on the second subplot
axs[1].bar(fail_res_percentage.index.array, fail_res_percentage.array, color=sns.color_palette('tab10')[0:4])
axs[1].set_ylabel('Percentage')
axs[1].set_title('After Resampling')

# Rotate x-axis labels for better readability
plt.xticks(rotation=45)

plt.tight_layout()
plt.show()

# Features scaling and Encoding
# Encoding
sc = StandardScaler()
types = {'L': 0, 'M': 1, 'H': 2}
causes = {'No Failure': 0, 'Power Failure': 1, 'Overstrain Failure': 2, 'Heat Dissipation Failure': 3, 'Tool Wear Failure': 4}
df_pre = df_res.copy()
df_pre['Type'].replace(to_replace=types, inplace=True)
df_pre['Failure Type'].replace(to_replace=causes, inplace=True)
df_pre[num_features] = sc.fit_transform(df_pre[num_features])

# PCA and Correlation Heatmap
pca = PCA(n_components=len(num_features))
X_pca = pd.DataFrame(data=pca.fit_transform(df_pre[num_features]), columns=['PC' + str(i + 1) for i in range(len(num_features))])
var_exp = pd.Series(data=100 * pca.explained_variance_ratio_, index=['PC' + str(i + 1) for i in range(len(num_features))])
print('Explained variance ratio per component:', round(var_exp, 2), sep='\n=')

# Loadings Analysis
fig, axs = plt.subplots(ncols=3, figsize=(18, 4))
fig.suptitle('Loadings magnitude')
pca_loadings = pd.DataFrame(data=pca.components_, columns=num_features)

for j in range(3):
    ax = axs[j]
    loadings_data = pca_loadings.values[j]
    positive_loadings = np.maximum(0, loadings_data)
    negative_loadings = np.minimum(0, loadings_data)

    x = np.arange(len(num_features))
    width = 0.35

    ax.bar(x, positive_loadings, width, label='Positive', color='b')
    ax.bar(x, negative_loadings, width, label='Negative', color='r')

    ax.set_xlabel('Features')
    ax.set_ylabel('Loadings')
    ax.set_title('PC' + str(j + 1))
    ax.set_xticks(x)
    ax.set_xticklabels(num_features, rotation=90)
    ax.legend()

plt.show()

# Correlation Heatmap
plt.figure(figsize=(7, 4))
sns.heatmap(data=df_pre.corr(), mask=np.triu(df_pre.corr()), annot=True, cmap='BrBG')
plt.title('Correlation Heatmap')
plt.show()

# Binary task
# train-validation-test split
X, y = df_pre[features], df_pre[['Target', 'Failure Type']]
X_trainval, X_test, y_trainval, y_test = train_test_split(X, y, test_size=0.2, shuffle=True, random_state=0)
X_train, X_val, y_train, y_val = train_test_split(X_trainval, y_trainval, test_size=0.25, shuffle=True, random_state=0)

def classification(model, X_train, y_train, X_val, y_val):
    t = time.time()
    model.fit(X_train, y_train)
    y_pred = model.predict(X_val)
    elapsed = time.time() - t
    print(f'Elapsed time: {elapsed:.4f} s')

    accuracy = accuracy_score(y_val, y_pred)
    auc = roc_auc_score(y_val, y_pred)
    f1 = f1_score(y_val, y_pred)
    f2 = fbeta_score(y_val, y_pred, beta=2)

    print('Validation Accuracy:', accuracy)
    print('Validation ROC AUC Score:', auc)
    print('Validation F1 Score:', f1)
    print('Validation F2 Score:', f2)

    return model, accuracy, auc, f1, f2

# Models
models = [
    LogisticRegression(),
    KNeighborsClassifier(),
    RandomForestClassifier(),
    SVC(probability=True),
    MLPClassifier(max_iter=500)
]

best_model = None
best_f2_score = 0

# Compare models
for model in models:
    print(f"\nModel: {model.__class__.__name__}")
    trained_model, accuracy, auc, f1, f2 = classification(model, X_train, y_train['Target'], X_val, y_val['Target'])

    if f2 > best_f2_score:
        best_model = trained_model
        best_f2_score = f2

print(f'\nBest model: {best_model.__class__.__name__} with F2 Score: {best_f2_score}')

# Evaluate the best model on the test set
y_test_pred = best_model.predict(X_test)
test_accuracy = accuracy_score(y_test['Target'], y_test_pred)
test_auc = roc_auc_score(y_test['Target'], y_test_pred)
test_f1 = f1_score(y_test['Target'], y_test_pred)
test_f2 = fbeta_score(y_test['Target'], y_test_pred, beta=2)

print(f'\nTest Accuracy: {test_accuracy}')
print(f'Test ROC AUC Score: {test_auc}')
print(f'Test F1 Score: {test_f1}')
print(f'Test F2 Score: {test_f2}')

# Permutation feature importance
perm_importance = permutation_importance(best_model, X_val, y_val['Target'])
feature_importance = pd.Series(perm_importance.importances_mean, index=features)
feature_importance.sort_values(ascending=False, inplace=True)

# Visualize feature importance
plt.figure(figsize=(12, 6))
sns.barplot(x=feature_importance, y=feature_importance.index, palette='viridis')
plt.title('Feature Importance')
plt.show()

# Plotting function
def plot_predictions(y_true, y_pred, title='Predictions vs Actual'):
    plt.figure(figsize=(10, 5))
    plt.plot(y_true.reset_index(drop=True), label='Actual')
    plt.plot(y_pred, label='Predicted', linestyle='--')
    plt.xlabel('Samples')
    plt.ylabel('Machine Failure')
    plt.title(title)
    plt.legend()
    plt.show()

# Plot results
plot_predictions(y_test['Target'], y_test_pred)

# Save results
results = {
    'test_accuracy': test_accuracy,
    'test_auc': test_auc,
    'test_f1': test_f1,
    'test_f2': test_f2
}

results_df = pd.DataFrame([results])
results_df.to_csv('results.csv', index=False)
