# Optimization of Industrial Uptime: Predictive Maintenance Using GMMs and SVMs

**Dataset:** AI4I 2020 Predictive Maintenance Dataset (UCI ML Repository)  
**Module:** Data Analytics ECS784U/P  
**Date:** February 2026

---

> **How to obtain the dataset**  
> Download `ai4i2020.csv` from:  
> https://archive.ics.uci.edu/dataset/601/ai4i+2020+predictive+maintenance+dataset  
> Place the CSV file in the same directory as this notebook before running.

---

## Table of Contents
1. Data Loading and Initial Exploration
2. Data Preprocessing
3. Feature Selection
4. Unsupervised Learning: K-Means and GMM Clustering
5. Supervised Learning: SVM Classification
6. Feature Reduction Using PCA
7. Conclusions


## 1. Data Loading and Initial Exploration

In [None]:
import os
os.environ["OMP_NUM_THREADS"] = '1'   # stops kmeans memory-leak warnings on some platforms

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns; sns.set()
import warnings
warnings.filterwarnings('ignore')

%matplotlib inline
np.set_printoptions(precision=3, suppress=True,
                    formatter={'float': lambda x: f'{x:7.3f}'}, linewidth=100)

print("Libraries imported successfully")

In [None]:
df = pd.read_csv('ai4i2020.csv')   # load the real dataset

In [None]:
df.columns   # listing the dataframe columns

In [None]:
df.dtypes   # returns the datatype of each column

In [None]:
df.shape   # returns the shape of data - rows and columns

In [None]:
df.head()   # view the first few rows

In [None]:
df.describe()   # description of the numerical variables

In [None]:
df.info()   # variable types and missing value counts

In [None]:
# UDI and Product ID are unique identifiers - drop them before analysis
print('UDI has {} unique values - will drop'.format(len(df['UDI'].unique())))
print('Product ID has {} unique values - will drop'.format(len(df['Product ID'].unique())))
df = df.drop(['UDI', 'Product ID'], axis=1)

In [None]:
# Distributions of the categorical target variable
print('Values and counts for Machine failure are:')
print(df['Machine failure'].value_counts())
print()
print('Normalised counts for Machine failure are:')
print(df['Machine failure'].value_counts(normalize=True))

In [None]:
df['Machine failure'].value_counts().plot.bar(figsize=(5, 4),
                                              title='Machine Failure Distribution')
plt.xlabel('Machine failure (0=Normal, 1=Failure)')
plt.ylabel('Count')
plt.show()

In [None]:
# Distributions of the categorical feature variable: Type
fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(14, 5))
df['Type'].value_counts().plot(ax=axes[0], kind='bar',
                               title='Product Type Distribution',
                               color=['b', 'g', 'r'],
                               ylabel='Count', xlabel='Type')
df['Machine failure'].value_counts().plot(ax=axes[1], kind='bar',
                                          title='Machine Failure Distribution',
                                          xlabel='Machine failure')
plt.show()

In [None]:
# Bivariate analysis: does Type predict Machine failure?
Type_status = pd.crosstab(df['Type'], df['Machine failure'])
print('Counts for Type vs Machine failure:')
print(Type_status)

# Normalise so each row sums to 1 - shows the failure rate per product type
Type_status = Type_status.div(Type_status.sum(1).astype(float), axis=0)
print('\nNormalised counts for Type vs Machine failure:')
print(Type_status)

Type_status.plot(kind='bar', stacked=True, figsize=(6, 4),
                 xlabel='Product Type', ylabel='Fraction',
                 title='Machine Failure Rate by Product Type')
plt.show()

In [None]:
# Distributions of continuous variables (histogram, density, boxplot)
fig, axes = plt.subplots(nrows=1, ncols=3, figsize=(16, 4))
df['Torque [Nm]'].plot(kind='hist', bins=50, ax=axes[0],
                       xlabel='Torque [Nm]', title='Histogram')
df['Torque [Nm]'].plot(kind='density', color='r', ax=axes[1], title='Density Plot')
df['Torque [Nm]'].plot(kind='box', ax=axes[2], ylabel='Torque [Nm]',
                       xlabel='', title='Boxplot')
plt.show()

In [None]:
# Bivariate analysis: does Rotational speed differ across failure status?
df.plot(column='Rotational speed [rpm]', by='Machine failure', kind='box',
        subplots=False, xlabel='Machine failure', ylabel='Rotational speed [rpm]',
        title='Rotational Speed by Failure Status', figsize=(5, 4))
plt.xlabel('Machine failure')
plt.show()

In [None]:
# Mean of each continuous variable grouped by failure status
df.groupby('Machine failure')['Torque [Nm]'].mean().plot(
    kind='bar', ylabel='Mean Torque [Nm]', figsize=(5, 4),
    title='Mean Torque by Failure Status')
plt.show()

## 2. Data Preprocessing

### 2.1 Missing Value Imputation

In [None]:
# Count missing values per column
df.isnull().sum()

In [None]:
# Fill missing values in numerical columns with the mean
for col in df.select_dtypes(include=[np.number]).columns:
    df[col] = df[col].fillna(df[col].mean())

# Confirm no missing values remain
print('Missing values after imputation:')
print(df.isnull().sum())

# Save a copy of the data after imputation for use in the PCA section later
df_original = df.copy()

### 2.2 Log Transformations for Skewed Distributions

In [None]:
# Rotational speed and Torque both have skewed distributions
df['Rotational_Speed_Log'] = np.log(df['Rotational speed [rpm]'])
df['Torque_Log'] = np.log(df['Torque [Nm]'])
df['Tool_Wear_Log'] = np.log(df['Tool wear [min]'].replace(0, 1))

fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(16, 4))
df['Rotational speed [rpm]'].plot(kind='hist', bins=50, ax=axes[0],
                                  title='Raw: Rotational Speed Distribution')
df['Rotational_Speed_Log'].plot(kind='hist', bins=50, ax=axes[1],
                                title='Log: Rotational Speed Distribution')
plt.show()

### 2.3 Engineered Features

In [None]:
# Temperature difference: linked to Heat Dissipation Failure
df['Temp_Diff'] = df['Process temperature [K]'] - df['Air temperature [K]']

# Power: torque * angular velocity (omega = 2*pi*rpm/60)
df['Power'] = df['Torque [Nm]'] * (2 * np.pi * df['Rotational speed [rpm]'] / 60)

# Strain: tool wear * torque (linked to Overstrain Failure)
df['Strain'] = df['Tool wear [min]'] * df['Torque [Nm]']

print('Engineered features added: Temp_Diff, Power, Strain')
df[['Temp_Diff', 'Power', 'Strain']].describe()

### 2.4 Convert Categorical Variables to Integer

In [None]:
# Convert the Type column (L, M, H) to integer codes
print('Type is originally a string variable:')
print(df['Type'].value_counts())

df['Type'] = df['Type'].astype('category').cat.codes
print('\nType has been converted to integer:')
print(df['Type'].value_counts())

# Drop the raw failure sub-mode columns to prevent data leakage
# (TWF, HDF, PWF, OSF, RNF directly determine Machine failure)
df = df.drop(['TWF', 'HDF', 'PWF', 'OSF', 'RNF'], axis=1)

print('\nData frame after encoding and dropping leakage columns:')
df.head()

## 3. Feature Selection

### 3.1 Correlation Heatmap

In [None]:
corr = df.corr()
plt.figure(figsize=(14, 6))
sns.heatmap(corr, annot=True, cmap='BuPu')
plt.title('Correlation Matrix of All Features')
plt.show()

In [None]:
# Drop highly correlated raw features that are superseded by engineered equivalents
# Rotational speed and Torque are captured in Power and their log transforms
cols_to_drop = ['Air temperature [K]', 'Process temperature [K]']
df = df.drop(columns=cols_to_drop, axis=1)

print('Remaining features after dropping highly correlated columns:')
print(df.columns.tolist())

### 3.2 SelectKBest and ExtraTreesClassifier Feature Importance

In [None]:
from sklearn.feature_selection import SelectKBest, chi2, mutual_info_classif, f_classif
from sklearn.ensemble import ExtraTreesClassifier

# Split into feature matrix X and target vector y
X = df.drop(['Machine failure'], axis=1)
y = df['Machine failure']

# Chi-squared test (requires non-negative values - shift negative columns)
X_pos = X - X.min()

chi2_sel = SelectKBest(score_func=chi2, k='all').fit(X_pos, y)
chi2_sorted = pd.Series(data=chi2_sel.scores_, index=X.columns).sort_values()

# F-test
ftest = SelectKBest(score_func=f_classif, k='all').fit(X, y)
ftest_sorted = pd.Series(data=ftest.scores_, index=X.columns).sort_values()

# Mutual information
mitest = SelectKBest(score_func=mutual_info_classif, k='all').fit(X, y)
mitest_sorted = pd.Series(data=mitest.scores_, index=X.columns).sort_values()

# ExtraTreesClassifier
xtrees = ExtraTreesClassifier(random_state=42).fit(X, y)
xtrees_sorted = pd.Series(data=xtrees.feature_importances_, index=X.columns).sort_values()

fig, axes = plt.subplots(nrows=2, ncols=2, figsize=(12, 8))
plt.subplots_adjust(wspace=0.6)
chi2_sorted.plot(kind='barh', ax=axes[0, 0], title='Using chi2 score')
ftest_sorted.plot(kind='barh', ax=axes[0, 1], title='Using FTest score')
xtrees_sorted.plot(kind='barh', ax=axes[1, 1], title='Using ExtraTreesClassifier')
mitest_sorted.plot(kind='barh', ax=axes[1, 0], title='Using MI Test score')
plt.show()

## 4. Unsupervised Learning: K-Means and GMM Clustering

Following the Lab 5 Part 2 case study approach with `kmeans()` and `gmm()` wrapper functions.

### 4.1 Prepare and Scale Features

In [None]:
from sklearn.preprocessing import MinMaxScaler
from sklearn.cluster import KMeans
from sklearn.mixture import GaussianMixture

# Use the engineered features most relevant to failure modes
cluster_features = ['Torque [Nm]', 'Tool wear [min]', 'Power', 'Temp_Diff', 'Strain']
X_cluster = df[cluster_features].copy()

scaler_cluster = MinMaxScaler().fit(X_cluster)
X_scaled = scaler_cluster.transform(X_cluster)
X_scaled = pd.DataFrame(X_scaled, columns=cluster_features)
print('Scaled data shape:', X_scaled.shape)
X_scaled.describe()

### 4.2 Wrapper Functions (Lab 5 Part 2 pattern)

In [None]:
def run_kmeans(X, n_clusters):
    """
    Use KMeans to group data into a specified number of clusters.
    Returns (inertia, cluster_centers, labels)
    """
    model = KMeans(n_clusters=n_clusters, init='k-means++',
                   max_iter=200, n_init=10, random_state=0)
    model.fit(X)
    return (model.inertia_, model.cluster_centers_, model.labels_)


def run_gmm(X, n_clusters):
    """
    Use GMM to group data into a specified number of clusters.
    Returns (BIC_score, means, labels)
    """
    model = GaussianMixture(n_components=n_clusters, random_state=123, n_init=5)
    model.fit(X)
    score = model.bic(X)
    labels = model.predict(X)
    return (score, model.means_, labels)

### 4.3 Elbow Method: Choosing Number of Clusters

In [None]:
kbest_scores = []
gmm_scores = []

for i in range(1, 11):
    score, centres, labels = run_kmeans(X_scaled, i)
    sizes = pd.Series(labels).value_counts().to_dict()
    print('KMeans has {} clusters with sizes {} with score {:.2f}'
          .format(i, sizes, score))
    kbest_scores.append(score)

    score, centres, labels = run_gmm(X_scaled, i)
    sizes = pd.Series(labels).value_counts().to_dict()
    print('GMM has {} clusters with sizes {} with score {:.2f}\n'
          .format(i, sizes, score))
    gmm_scores.append(score)

In [None]:
figure, axis = plt.subplots(1, 2, figsize=(14, 5))
axis[0].plot(range(1, 11), kbest_scores)
axis[0].scatter(3, kbest_scores[2], s=200, c='red', marker='*')
axis[0].set_title('The Elbow Method - KMeans')
axis[0].set_xlabel('Number of clusters')
axis[0].set_ylabel('Clustering Score (Inertia)')

axis[1].plot(range(1, 11), gmm_scores)
axis[1].scatter(3, gmm_scores[2], s=200, c='red', marker='*')
axis[1].set_title('The Elbow Method - GMM')
axis[1].set_xlabel('Number of clusters')
axis[1].set_ylabel('Clustering Score (BIC)')
plt.show()

### 4.4 Analyse Cluster Characteristics

In [None]:
def list_clusters(method, X, n_clusters, scaler, failure_labels):
    """
    Print cluster sizes, failure rates, and rescaled cluster centres.
    failure_labels: array of Machine failure values (0/1) aligned with X rows.
    """
    score, centres, labels = method(X, n_clusters)
    sizes = pd.Series(labels).value_counts().to_dict()

    # Compute failure rate per cluster
    failure_rate = {}
    for label in range(n_clusters):
        mask = labels == label
        rate = failure_labels[mask].mean()
        failure_rate[label] = round(rate, 4)

    print('\nThere are {} clusters with a total score of {:.1f}\n'
          .format(len(sizes), score))

    for label, centre in enumerate(centres):
        # Rescale centres back to original units
        centre_rescaled = {
            X.columns[i]: round(centre[i] * scaler.data_range_[i]
                                + scaler.data_min_[i], 2)
            for i in range(len(centre))
        }
        print('Cluster {} has {} posts, failure rate={}, centre:\n{}\n'
              .format(label, sizes.get(label, 0),
                      failure_rate.get(label, 'n/a'), centre_rescaled))

In [None]:
failure_arr = df['Machine failure'].values
list_clusters(run_kmeans, X_scaled, 3, scaler_cluster, failure_arr)

In [None]:
list_clusters(run_gmm, X_scaled, 3, scaler_cluster, failure_arr)

### 4.5 PCA Visualisation of Clusters in 2D Space

In [None]:
from sklearn.decomposition import PCA

_, _, kmeans_labels = run_kmeans(X_scaled, 3)
_, _, gmm_labels = run_gmm(X_scaled, 3)

X_PCA = PCA(2).fit_transform(X_scaled)

kwargs = dict(cmap=plt.colormaps['Set1'], edgecolor='none', alpha=0.6)
fig, ax = plt.subplots(1, 2, figsize=(12, 5))
ax[0].scatter(X_PCA[:, 0], X_PCA[:, 1], c=kmeans_labels, **kwargs)
ax[0].set_title('3 KMeans clusters\nplotted in 2 component space')
ax[1].scatter(X_PCA[:, 0], X_PCA[:, 1], c=gmm_labels, **kwargs)
ax[1].set_title('3 GMM clusters\nplotted in 2 component space')
plt.show()

## 5. Supervised Learning: SVM Classification

Following the Lab 5 Part 1 pattern: `train_and_evaluate()` helper, then hyperparameter tuning.

In [None]:
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
from sklearn.svm import SVC

def train_and_evaluate(model, X, y):
    """
    Train and evaluate a classification model using cross-validation,
    then report metrics on a held-out test set.
    """
    print('\nResults from algorithm {}:'.format(model))
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y)

    # 5-fold cross-validation accuracy on training data
    scores = cross_val_score(model, X_train, y_train, cv=5, scoring='f1')
    print('Mean cross-validation F1 is {:.3f} with SD {:.3f}'
          .format(np.mean(scores), np.std(scores)))

    learnt_model = model.fit(X_train, y_train)
    print('\nF1 on training data is {:.3f}\n'.format(
        f1_score(y_train, learnt_model.predict(X_train))))

    y_pred = model.predict(X_test)
    print('Test data metrics: accuracy={:.3f}, f1={:.3f}, precision={:.3f}, recall={:.3f}'
          .format(accuracy_score(y_test, y_pred),
                  f1_score(y_test, y_pred),
                  precision_score(y_test, y_pred),
                  recall_score(y_test, y_pred)))

    cm = confusion_matrix(y_true=y_test, y_pred=y_pred)
    plt.figure(figsize=(3, 3))
    ax = sns.heatmap(cm, annot=True, xticklabels=['Normal', 'Failure'],
                     yticklabels=['Normal', 'Failure'],
                     cbar=False, square=True, linewidths=4.0)
    ax.set_xlabel('Predicted')
    ax.set_ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.show()
    return learnt_model

In [None]:
# Scale feature matrix for SVM
from sklearn.preprocessing import MinMaxScaler

X_svm = df.drop(['Machine failure'], axis=1)
y_svm = df['Machine failure']

scaler_svm = MinMaxScaler()
X_svm_scaled = scaler_svm.fit_transform(X_svm)
X_svm_scaled = pd.DataFrame(X_svm_scaled, columns=X_svm.columns)

print('Feature matrix shape for SVM:', X_svm_scaled.shape)
print('Class balance:')
print(y_svm.value_counts())

In [None]:
# SVM with linear kernel (Lab 4 pattern: simplest case first)
_ = train_and_evaluate(SVC(kernel='linear', class_weight='balanced'), X_svm_scaled, y_svm)

In [None]:
# SVM with RBF kernel
_ = train_and_evaluate(SVC(kernel='rbf', C=1.0, gamma='scale',
                            class_weight='balanced'), X_svm_scaled, y_svm)

### 5.1 Hyperparameter Tuning: Finding Best C

In [None]:
def train_model(algorithm, hyperparams, X, y):
    """
    Use cross-validation to evaluate a model with given hyperparameters.
    Returns (mean_f1, trained_model).
    """
    model = algorithm(**hyperparams)
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y)
    scores = cross_val_score(model, X_train, y_train, cv=5, scoring='f1')
    learnt_model = model.fit(X_train, y_train)
    return (np.mean(scores), learnt_model)


# Try C values 0.1, 0.2, ... 2.0
c_values = [0.1 * i for i in range(1, 21)]
c_f1 = []
for c in c_values:
    score, _ = train_model(SVC, {'kernel': 'rbf', 'C': c, 'gamma': 'scale',
                                  'class_weight': 'balanced'},
                           X_svm_scaled, y_svm)
    c_f1.append(score)

plt.plot(c_values, c_f1)
plt.xlabel('Value of regularisation hyperparameter C')
plt.ylabel('Mean cross-validation F1 on training data')
plt.title('SVM RBF - F1 vs C')
plt.show()

best_c = c_values[np.argmax(c_f1)]
print(f'Best C = {best_c:.1f} with mean F1 = {max(c_f1):.3f}')

In [None]:
# Evaluate the best model on the held-out test set
print(f'Final evaluation using best C = {best_c}')
X_train_f, X_test_f, y_train_f, y_test_f = train_test_split(
    X_svm_scaled, y_svm, test_size=0.2, random_state=42, stratify=y_svm)

best_svm = SVC(kernel='rbf', C=best_c, gamma='scale', class_weight='balanced')
best_svm.fit(X_train_f, y_train_f)
y_pred_f = best_svm.predict(X_test_f)

print('Test data metrics: accuracy={:.3f}, f1={:.3f}, precision={:.3f}, recall={:.3f}'
      .format(accuracy_score(y_test_f, y_pred_f),
              f1_score(y_test_f, y_pred_f),
              precision_score(y_test_f, y_pred_f),
              recall_score(y_test_f, y_pred_f)))

cm = confusion_matrix(y_true=y_test_f, y_pred=y_pred_f)
plt.figure(figsize=(3, 3))
ax = sns.heatmap(cm, annot=True, xticklabels=['Normal', 'Failure'],
                 yticklabels=['Normal', 'Failure'],
                 cbar=False, square=True, linewidths=4.0)
ax.set_xlabel('Predicted')
ax.set_ylabel('Actual')
plt.title(f'Best SVM (C={best_c})')
plt.show()

## 6. Feature Reduction Using PCA

Following the Lab 5 Part 1 pattern: revert to the post-imputation data, 
use `LabelEncoder` + `MinMaxScaler`, plot cumulative explained variance, 
then compare SVM accuracy with different numbers of components.

In [None]:
from sklearn.preprocessing import LabelEncoder, MinMaxScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression

# Start from the saved post-imputation copy, remove leakage columns
df_pca = df_original.copy()
df_pca = df_pca.drop(['TWF', 'HDF', 'PWF', 'OSF', 'RNF'], axis=1)

# Encode Type using LabelEncoder (Lab 5 Part 1 approach)
le = LabelEncoder()
df_pca['Type'] = le.fit_transform(df_pca['Type'])

X_pca = df_pca.drop(['Machine failure'], axis=1)
y_pca = df_pca['Machine failure']

print(X_pca.head())

# Scale
scaler_pca = MinMaxScaler()
scaler_pca.fit(X_pca)
X_pca_scaled = scaler_pca.transform(X_pca)

In [None]:
# Plot cumulative explained variance for 1 to all components
pca_full = PCA(n_components=X_pca_scaled.shape[1]).fit(X_pca_scaled)
n = X_pca_scaled.shape[1]

plt.plot(range(1, n + 1), np.cumsum(pca_full.explained_variance_ratio_))
plt.xticks(range(1, n + 1))
plt.xlabel('Number of components')
plt.ylabel('Cumulative explained variance')
plt.title('PCA: Cumulative Explained Variance')
plt.show()

In [None]:
# Evaluate SVM with 4 PCA components
pca4 = PCA(n_components=4).fit(X_pca_scaled)
X_reduc4 = pca4.transform(X_pca_scaled)
print('\nPCA reduces features from {} to {}'.format(X_pca_scaled.shape, X_reduc4.shape))
_ = train_and_evaluate(SVC(kernel='rbf', C=best_c, gamma='scale',
                            class_weight='balanced'), X_reduc4, y_pca)

In [None]:
# Evaluate SVM with 6 PCA components
pca6 = PCA(n_components=6).fit(X_pca_scaled)
X_reduc6 = pca6.transform(X_pca_scaled)
print('\nPCA reduces features from {} to {}'.format(X_pca_scaled.shape, X_reduc6.shape))
_ = train_and_evaluate(SVC(kernel='rbf', C=best_c, gamma='scale',
                            class_weight='balanced'), X_reduc6, y_pca)

## 7. Conclusions

### Key Findings

1. **Data Exploration**: The AI4I 2020 dataset is heavily imbalanced (~3.4% failure rate). 
   Product Type shows limited standalone predictive power for failure, but engineered features 
   (Power, Strain, Temp_Diff) derived from the physical failure mechanisms are more informative.

2. **Unsupervised Learning (GMM / K-Means)**: The elbow method suggests 3 clusters as a 
   reasonable choice. Cluster centres, once rescaled to original units, allow meaningful 
   interpretation of operating regimes and their associated failure rates.

3. **Supervised Learning (SVM)**: The RBF kernel with `class_weight='balanced'` handles the 
   class imbalance effectively. Hyperparameter tuning via cross-validation identified the best 
   regularisation strength C; recall is the most critical metric for a maintenance use case 
   (missing a failure is more costly than a false alarm).

4. **PCA for Dimensionality Reduction**: The cumulative explained variance plot identifies a 
   natural elbow point. Reducing to that number of components retains most predictive information 
   while decreasing the feature space, though full-feature SVM generally outperforms PCA-reduced 
   versions on this dataset.


In [None]:
print('=' * 60)
print('ANALYSIS COMPLETE')
print('=' * 60)
print(f'Dataset loaded: ai4i2020.csv')
print(f'Total samples: {df_original.shape[0]}')
print(f'Failure rate: {df_original["Machine failure"].mean()*100:.2f}%')
print(f'Features used for SVM: {X_svm_scaled.shape[1]}')
print(f'Best SVM C value found: {best_c}')