# Machine Failure Classification using sensor data

By: Nathália Alvim

E-mail: natalvimdesouza@hotmail.com

Dataset is avaible on kaggle.

https://www.kaggle.com/datasets/mujtabamatin/dataset-for-machine-failure-detection

The dataset has 1000 samples and 6 variables.


The dataset consists of the following features:

- Temperature (°C): Continuous data representing the temperature at the machine's
critical points. Higher temperatures may indicate potential issues due to overheating.

- Vibration (Hz): Frequency of machine vibrations. Abnormal vibrations can signal mechanical misalignment, imbalance, or wear.

- Power Usage (kW): Power consumption levels of the machine. Spikes in power usage may indicate increased load or potential mechanical issues.

- Humidity (%): Environmental humidity around the machine. High humidity levels could affect machine performance and lead to failure over time.

- Machine Type: Categorical data indicating the type of machine (e.g., "Drill", "Lathe", "Mill"). Different machine types may have unique failure patterns.

Target Variable:

- Failure Risk: A binary label where 0 indicates normal operation, and 1 indicates that the machine is at risk of failure.

## Install Packages

In [None]:
#Installing Lazy Predict
!pip install lazypredict
!pip install tensorflow
!pip install imblearn

##Importing dataset

In [None]:
#Import packages for pre processing
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import shapiro, normaltest, anderson
import seaborn as sns
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import GridSearchCV, cross_val_score, StratifiedKFold, cross_validate
from lazypredict.Supervised import LazyClassifier
from sklearn.metrics import auc, RocCurveDisplay, roc_curve
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import SGDClassifier, LogisticRegression
from sklearn.metrics import make_scorer, accuracy_score, f1_score, recall_score, precision_score
from imblearn.pipeline import Pipeline as ImbPipeline
from imblearn.over_sampling import SMOTE
#Import packages tensor flow
import tensorflow as tf
from tensorflow.keras import models, layers
from tensorflow.keras import backend as K
from xgboost import XGBClassifier
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
from imblearn.ensemble import BalancedRandomForestClassifier

In [None]:
#Loading the dataset
file_path = '/workspaces/Machine-Learning/machine_failure_dataset.csv'
df = pd.read_csv(file_path)
df.head()

In [None]:
# General information
print(df.info())

## Class Balacing

In [None]:
# Class Balancing
# First i'll apply for the categorical variables and after i'll do with the binary variable

categorical = 'Machine_Type'

#Available categories

categories = df[categorical].unique()

#Counting
counting = df[categorical].value_counts()

print(categories)
print(counting)

In [None]:
#Binary variable

binary = 'Failure_Risk'

#Available categories

categories = df[binary].unique() # 0 = Normal operation \\ 1 = Risk of failure

#Counting
counting = df[binary].value_counts()

print(categories)
print(counting)

## Descriptive statistics

In [None]:
# Calculate mean, median, and mode for each numerical column
# Select only numerical columns (nc) and exclude the binary variable
# Select only numerical columns and exclude a specific one
nc = df.select_dtypes(include=['number']).drop(columns=['Failure_Risk'])

mean = nc.mean()
median = nc.median()
mode = nc.mode().iloc[0]  # Takes the first mode found for each column
std = nc.std()  # Standard deviation
cv = std / mean  # Coefficient of Variation


print("\nMean of each column:")
print(mean)

print("\nMedian of each column:")
print(median)

print("\nMode of each column:")
print(mode)

print("\nStandard Deviation of each column:")
print(std)

print("\nCoefficient of Variation (CV) of each column:")
print(cv)

In [None]:
# Boxpot
# Generate boxplot to visualize distribution and possible outliers
plt.figure(figsize=(10, 6))
nc.boxplot()
plt.title("Dataset Boxplot")
plt.xlabel("Columns")
plt.ylabel("Values")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [None]:
# Generate paired boxplots to save space and for better visualization
columns = nc.columns
num_columns = len(columns)

# Iterate over the columns in pairs
for i in range(0, num_columns, 2):
    # Set up the subplot for 2 side-by-side plots
    fig, axes = plt.subplots(1, 2, figsize=(12, 6))

    # First boxplot
    axes[0].boxplot(nc[columns[i]].dropna(), vert=True, patch_artist=True)
    axes[0].set_title(f"Boxplot - {columns[i]}")
    axes[0].set_xlabel(columns[i])
    axes[0].set_ylabel("Values")

    # Second boxplot (if it exists)
    if i + 1 < num_columns:
        axes[1].boxplot(nc[columns[i + 1]].dropna(), vert=True, patch_artist=True)
        axes[1].set_title(f"Boxplot - {columns[i + 1]}")
        axes[1].set_xlabel(columns[i + 1])
        axes[1].set_ylabel("Values")
    else:
        # Remove the second subplot if there's no second column
        fig.delaxes(axes[1])

    # Adjust layout
    plt.tight_layout()
    plt.show()

## Normality of the dataset

- Shapiro-Wilk

- D’Agostino-Pearson

- Anderson-Darling

In [None]:
def normality_tests_summary(dataframe):
    numeric_cols = dataframe.select_dtypes(include=['number'])
    results = []

    for col in numeric_cols.columns:
        col_data = numeric_cols[col].dropna()

        # Shapiro-Wilk Test
        shapiro_stat, shapiro_p = shapiro(col_data)
        shapiro_result = "Normal" if shapiro_p > 0.05 else "Not normal"

        # D’Agostino and Pearson Test
        dagostino_stat, dagostino_p = normaltest(col_data)
        dagostino_result = "Normal" if dagostino_p > 0.05 else "Not normal"

        # Anderson-Darling Test
        anderson_result_obj = anderson(col_data)
        ad_stat = anderson_result_obj.statistic
        # Compare with 5% significance level
        ad_result = "Normal" if ad_stat < anderson_result_obj.critical_values[2] else "Not normal"

        results.append({
            "Column": col,
            "Shapiro-Wilk (p)": round(shapiro_p, 4),
            "Shapiro Result": shapiro_result,
            "D’Agostino (p)": round(dagostino_p, 4),
            "D’Agostino Result": dagostino_result,
            "Anderson-Darling (stat)": round(ad_stat, 4),
            "Anderson Result (5%)": ad_result
        })

    return pd.DataFrame(results)


In [None]:
summary_df = normality_tests_summary(nc)
print(summary_df)

In [None]:
#Convert categorical variable to 1, 2, 3 variable
label_encoder = LabelEncoder()
df['Machine_Type_encoded'] = label_encoder.fit_transform(df['Machine_Type'])

In [None]:
for i, category in enumerate(label_encoder.classes_):
    print(f"{category} -> {i + 1}")

print(df.head())

In [None]:
#Remove Machine_Type and renomed Machine_Type_encoded to Machine_Type
df.drop('Machine_Type', axis=1, inplace=True)
df.rename(columns={'Machine_Type_encoded': 'Machine_Type'}, inplace=True)
print(df.head())

In [None]:
# Correlation with the Failure_Risk variable
correlations = df.select_dtypes(include=['number']).corr()["Failure_Risk"].drop("Failure_Risk")
print(correlations)

## Pearson correlation map




In [None]:
# Pearson Correlation Map
# Calculate the correlation matrix (Pearson coefficient)
pearson_correlation = nc.corr(method='pearson')

# Generate the heatmap
plt.figure(figsize=(10, 8))
sns.heatmap(pearson_correlation, annot=True, cmap="coolwarm", fmt=".2f", cbar=True, square=True)
plt.title("Correlation Map (Pearson)")
plt.show()

### Conclusion:

The data set follows a normal distribution by tests performed such as Shapiro Wilk.
A graph was generated in order to define the linearity of the system, thus concluding that the system is non-linear.
Finally, the Pearson correlation map was created to see the relationship between the variables.

In summary:

-> Non-linear

-> Normal Distribution

## Train and Test

In [None]:
# Separate features (X) and target (y)
X = df.drop("Failure_Risk", axis=1)
y = df["Failure_Risk"]

In [None]:
#80 training and 20 for test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
#Scale numerical features
scaler = MinMaxScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
print(X_test_scaled)
print(X_train_scaled)

I normalized everything, so if the Machine Type was 1, now is 0, if it was 2 now is 0.5 and if it was 3 now is 1.

Drill -> 0

Lathe -> 0.5

Mill -> 1

In [None]:
#Visually see how classes are distributed across variable combinations:
data_plot = pd.DataFrame(X, columns=['Humidity', 'Temperature', 'Vibration', 'Power_Usage'])
data_plot['Target'] = y
sns.pairplot(data_plot, hue='Target')

Visually, the classes are overlapping, making it difficult to separate them. Therefore, linear methods would not work as expected. And they all follow a normal distribution, as previously presented.

## Models

In [None]:
# LazyClassifier
clf = LazyClassifier(verbose=0, ignore_warnings=True, custom_metric=None)

# Fit and compare models
models, predictions = clf.fit(X_train_scaled, X_test_scaled, y_train, y_test)

# Display the results
print(models)

In [None]:
# Top 10 models
print(models.head(10))

## Rebalancing classes


Before we train the model, we must rebalance the classes, as metrics like ROC AUC and Balanced Accuracy are "guessing" values ​​randomly.

In [None]:
#Rebalancing
smote = SMOTE(random_state=42)
X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)

print(X_train_resampled)
print(y_train_resampled)

In [None]:
#Scaler again
scaler = MinMaxScaler()
X_train_scaled = scaler.fit_transform(X_train_resampled)
X_test_scaled = scaler.transform(X_test)

In [None]:
# LazyClassifier
clf = LazyClassifier(verbose=0, ignore_warnings=True, custom_metric=None)

# Fit and compare models with resampled data
models_2, predictions_2 = clf.fit(X_train_scaled, X_test_scaled, y_train_resampled, y_test)

# Display the results
print(models_2)

In [None]:
# Top 10 models_2
print(models_2.head(10))

Separability between classes is still weak, even with balanced data.

→ May indicate overlap between classes or features with little predictive power.

## Test with XGBoost

In [None]:
# Train XGBoost classifier
xgb_model = XGBClassifier(random_state=42, use_label_encoder=False, eval_metric='logloss')
xgb_model.fit(X_train_scaled, y_train_resampled)

# Predict on test set
y_pred = xgb_model.predict(X_test_scaled)
y_prob = xgb_model.predict_proba(X_test_scaled)[:, 1]

# Evaluation
print("Classification Report:\n")
print(classification_report(y_test, y_pred))

print("\nConfusion Matrix:")
print(confusion_matrix(y_test, y_pred))

roc_auc = roc_auc_score(y_test, y_prob)
print(f"\nROC AUC Score: {roc_auc:.2f}")

## Test with RadomForest

In [None]:
brf = BalancedRandomForestClassifier(random_state=42)
brf.fit(X_train_scaled, y_train_resampled)

# Prediction on test data
y_pred = brf.predict(X_test_scaled)
y_prob = brf.predict_proba(X_test_scaled)[:, 1]

# Evaluation
print("Classification Report:\n")
print(classification_report(y_test, y_pred))

print("\nConfusion Matrix:")
print(confusion_matrix(y_test, y_pred))

roc_auc = roc_auc_score(y_test, y_prob)
print(f"\nROC AUC Score: {roc_auc:.2f}")

## Adding new data to improve class balance

I'll use Autoencode to add some data

In [None]:
# Select only data with failures
df_failures = df[df["Failure_Risk"] == 1]

# Only numerical columns
features = ["Humidity", "Temperature", "Vibration", "Power_Usage","Machine_Type"]
X_failure = df_failures[features].values

# Normalize the data
scaler = MinMaxScaler()
X_failure_scaled = scaler.fit_transform(X_failure)

# Latent space dimension
latent_dim = 2
input_dim = X_failure_scaled.shape[1]

In [None]:
# --- Encoder ---
inputs = layers.Input(shape=(input_dim,))
h = layers.Dense(16, activation='relu')(inputs)
z_mean = layers.Dense(latent_dim, name='z_mean')(h)
z_log_var = layers.Dense(latent_dim, name='z_log_var')(h)

In [None]:
# Sampling layer
def sampling(args):
    z_mean, z_log_var = args
    epsilon = tf.random.normal(shape=tf.shape(z_mean))
    return z_mean + tf.exp(0.5 * z_log_var) * epsilon

z = layers.Lambda(sampling, output_shape=(latent_dim,), name='z')([z_mean, z_log_var])

In [None]:
# --- Decoder ---
decoder_h = layers.Dense(16, activation='relu')
decoder_output = layers.Dense(input_dim, activation='sigmoid')
h_decoded = decoder_h(z)
outputs = decoder_output(h_decoded)

In [None]:
# Define encoder and decoder separately for generation later
encoder = models.Model(inputs, [z_mean, z_log_var, z], name='encoder')
decoder_input = layers.Input(shape=(latent_dim,))
decoder_model = models.Model(decoder_input, decoder_output(decoder_h(decoder_input)), name='decoder')

In [None]:
# --- VAE as subclassed model ---
class VAE(models.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder

    def compile(self, optimizer):
        super(VAE, self).compile()
        self.optimizer = optimizer
        self.total_loss_tracker = tf.keras.metrics.Mean(name="loss")
        self.reconstruction_loss_tracker = tf.keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = tf.keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [self.total_loss_tracker, self.reconstruction_loss_tracker, self.kl_loss_tracker]

    def train_step(self, data):
        if isinstance(data, tuple):
            data = data[0]

        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)

            # Calculate losses
            reconstruction_loss = tf.reduce_mean(tf.reduce_sum(tf.square(data - reconstruction), axis=1))
            kl_loss = -0.5 * tf.reduce_mean(tf.reduce_sum(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var), axis=1))
            total_loss = reconstruction_loss + kl_loss

        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

In [None]:
# --- Compile and Train ---
vae = VAE(encoder, decoder_model)
vae.compile(optimizer=tf.keras.optimizers.Adam())
vae.fit(X_failure_scaled, epochs=100, batch_size=32)

## Create a new dataset

Generate synthetic data + the old dataset

In [None]:
# Generate synthetic failure data
z_samples = np.random.normal(size=(300, latent_dim))
generated_data = decoder_model.predict(z_samples)
generated_data = scaler.inverse_transform(generated_data)

# Create new DataFrame for synthetic failures
df_synthetic = pd.DataFrame(generated_data, columns=df_failures.drop("Failure_Risk", axis=1).columns)
df_synthetic["Failure_Risk"] = 1

# Combine with original dataset
df_combined = pd.concat([df, df_synthetic], ignore_index=True)

## Train and Test Split

In [None]:
# Separate features and target
X = df_combined.drop("Failure_Risk", axis=1)
y = df_combined["Failure_Risk"]

In [None]:
# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

In [None]:
# Scale features again
scaler_final = MinMaxScaler()
X_train_scaled = scaler_final.fit_transform(X_train)
X_test_scaled = scaler_final.transform(X_test)

## Lazy Classifier again

In [None]:
# LazyClassifier to compare models
clf = LazyClassifier(verbose=0, ignore_warnings=True, custom_metric=None)
models_3, predictions = clf.fit(X_train_scaled, X_test_scaled, y_train, y_test)

In [None]:
# Display model comparison
print("Top models after VAE-based balancing:")
print(models_3)

In [None]:
# Top 10 models
print(models_3.head(10))

## Testing SGDClassifier, Random Forest and Gradient Boosting

Using scores like:

- accuracy
- f1
- recall
- precision
- roc_AUC
- Cross Validation

In [None]:
# Scorers for evaluation
scoring = {
    'accuracy': make_scorer(accuracy_score),
    'f1': make_scorer(f1_score),
    'recall': make_scorer(recall_score),
    'precision': make_scorer(precision_score),
    'roc_auc': make_scorer(roc_auc_score)
}

In [None]:
# Initializing the models
models = {
    'SGDClassifier': SGDClassifier(random_state=42, loss='log_loss', penalty='l2', max_iter=1000),
    'RandomForest': RandomForestClassifier(random_state=42, n_estimators=100),
    'GradientBoosting': GradientBoostingClassifier(random_state=42, learning_rate=0.1, n_estimators=100),
    'LogisticRegression': LogisticRegression(random_state=42, max_iter=1000, multi_class='multinomial', solver='lbfgs')
}



In [None]:
# Stratified cross-validation
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

In [None]:
# Running the models
for name, model in models.items():
    print(f"\n{name}:")

    pipeline = ImbPipeline(steps=[
        ('scaler', StandardScaler()),
        ('smote', SMOTE(random_state=42)),
        ('classifier', model)
    ])

    results = cross_validate(
        pipeline, X, y,
        scoring=scoring,
        cv=cv,
        n_jobs=-1,
        return_train_score=False
    )

    for metric in scoring.keys():
        print(f"{metric.capitalize()}: {np.mean(results['test_' + metric]):.4f}")

## Optimizing hyperparameters

I'll choose Random Forest for my model

In [None]:
# Pipeline including scaling and SMOTE
pipeline = ImbPipeline(steps=[
    ('scaler', StandardScaler()),
    ('smote', SMOTE(random_state=42)),
    ('classifier', LogisticRegression(
        multi_class='multinomial',
        solver='lbfgs',
        random_state=42
    ))
])

In [None]:
# Hyperparameter grid
param_grid = {
    'classifier__C': [0.01, 0.1, 1, 10],
    'classifier__penalty': ['l2'],
    'classifier__solver': ['lbfgs', 'liblinear']
}

In [None]:
# Grid Search with F1 as scoring metric
grid_search = GridSearchCV(pipeline, param_grid, cv=5, scoring='f1', n_jobs=-1, verbose=1)
grid_search.fit(X_train, y_train)

In [None]:
# Best hyperparameters
print("Best Parameters:")
print(grid_search.best_params_)

In [None]:
# Evaluation on test set
print("\nClassification Report on Test Set:")
y_pred = grid_search.predict(X_test)
print(classification_report(y_test, y_pred))

In [None]:
# Confusion matrix
conf_matrix = confusion_matrix(y_test, y_pred)
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=["No Failure", "Failure"], yticklabels=["No Failure", "Failure"])
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Confusion Matrix")
plt.show()

In [None]:
# ROC Curve and AUC
y_prob = grid_search.predict_proba(X_test)[:, 1]
fpr, tpr, _ = roc_curve(y_test, y_prob)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f"ROC Curve (AUC = {roc_auc:.2f})")
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend(loc="lower right")
plt.grid()
plt.show()

## Testing for more recalls

In [None]:
# Probability predictions for class 1 (failure)
y_prob = grid_search.predict_proba(X_test)[:, 1]

# Testing different thresholds
thresholds = np.arange(0.1, 0.9, 0.05)
recalls = []
precisions = []

for threshold in thresholds:
    y_pred_thresh = (y_prob >= threshold).astype(int)
    report = classification_report(y_test, y_pred_thresh, output_dict=True)
    recalls.append(report['1']['recall'])
    precisions.append(report['1']['precision'])


In [None]:
# Plotting the precision vs recall curve by threshold
plt.figure(figsize=(10, 6))
plt.plot(thresholds, recalls, label="Recall (Failure)", marker='o')
plt.plot(thresholds, precisions, label="Precision (Failure)", marker='x')
plt.axvline(x=0.5, color='gray', linestyle='--', label="Default Threshold")
plt.xlabel("Threshold")
plt.ylabel("Score")
plt.title("Precision and Recall for Failure Class at Different Thresholds")
plt.legend()
plt.grid()
plt.show()

Low threshold (~0.1–0.3):

Recall is close to 1 → The model identifies almost all failures.

However, precision drops significantly → There are many false positives (raising alarms when it wasn't necessary).

High threshold (>0.5):

Precision reaches 1 → The model is almost certain when predicting a failure.

However, recall drops drastically → It misses many real failures.

In [None]:
# Applying new threshold (e.g., 0.3)
new_threshold = 0.5
y_pred_adjusted = (y_prob >= new_threshold).astype(int)

# New confusion matrix and classification report
print("Classification Report with Adjusted Threshold:")
print(classification_report(y_test, y_pred_adjusted))

print("Confusion Matrix:")
print(confusion_matrix(y_test, y_pred_adjusted))

In [None]:
# New ROC Curve
fpr, tpr, _ = roc_curve(y_test, y_prob)

plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f"ROC Curve (AUC = {roc_auc_score(y_test, y_prob):.2f})")
plt.plot([0, 1], [0, 1], linestyle='--', color='gray')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend()
plt.grid()
plt.show()

Class 1 (Failure):

Recall: 0.77 → The model correctly identifies 77% of actual failure cases, indicating high sensitivity.

Precision: 0.57 → As expected with a lower threshold, the precision decreased due to an increase in false positives.

Class 0 (No Failure):

Recall: 0.51 → Only 51% of the non-failure cases are correctly classified, suggesting a significant drop in specificity.

Precision: 0.72 → Despite the lower recall, when the model predicts no failure, there is still a reasonably good chance it is correct.

F1-score:

Class 1: 0.65

Class 0: 0.59
This indicates a better balance between precision and recall for the failure class, which is often the priority in risk-sensitive applications