# Equipament Maintenance Prediction

In [None]:
# %pip install mlflow
# %pip install imblearn
# %pip install xgboost



In [None]:
import pandas as pd
import torch
from sklearn.metrics import accuracy_score, confusion_matrix, recall_score, f1_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from xgboost import XGBClassifier
import mlflow
from mlflow.models import infer_signature
import os
from pathlib import Path
import statsmodels.api as sm
import matplotlib.pyplot as plt
import numpy as np
from scipy import stats
from sklearn.preprocessing import PowerTransformer
from imblearn.over_sampling import SMOTE
import seaborn as sns
from sklearn.feature_selection import mutual_info_regression
from statsmodels.stats.outliers_influence import variance_inflation_factor
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import GaussianNB
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.preprocessing import MinMaxScaler

In [None]:
ROOT_PATH = Path(__name__).resolve().parent.parent
data_folder_path = os.path.join(ROOT_PATH, "data")

data_path = os.path.join(data_folder_path, "equipament-cycles.csv")

In [None]:
data = pd.read_csv(data_path, header=0, sep=",")

data.head()

In [None]:
data.loc[data["Machine failure"] == 1]

In [None]:
data.isnull().sum()

In [None]:
data.dtypes

In [None]:
cols_drop = ["UDI", "Product ID", "Type", "Machine failure", "TWF", "HDF", "PWF", "OSF", "RNF"]
temp = data.drop(cols_drop, axis=1)

fig, axes = plt.subplots(nrows=2, ncols=3, figsize=(18, 10))
axes = axes.flatten()

for i, col in enumerate(temp.columns):
    ax = axes[i]
    temp.boxplot(column=col, ax=ax)
    ax.set_title(f"{col.title()} BoxPlot Analysis")
    ax.set_ylabel("Value")

plt.tight_layout()
plt.show();

In [None]:
fig, axes = plt.subplots(nrows=2, ncols=3, figsize=(18, 10))
axes = axes.flatten()

for i, col in enumerate(temp.columns):
    ax = axes[i]
    temp.hist(column=col, ax=ax, grid=False, bins=50)
    ax.set_title(f"{col.title()} BoxPlot Analysis")
    ax.set_ylabel("Value")

plt.tight_layout()
plt.show();

If the machine goes further than the top quantile, does it generate more failure?

H0 -> Rotation on outlier parameters does not have impact<br>
H1 -> Rotation on outlier parameters does have impact

In [None]:
q1 = data["Rotational speed [rpm]"].quantile(0.25)
q3 = data["Rotational speed [rpm]"].quantile(0.75)

iqr = q3 - q1
superior_limit = q3 + 1.5 * iqr

In [None]:
couting_over_limit = data.loc[data["Rotational speed [rpm]"]>=superior_limit]["Machine failure"].value_counts()
couting_under_limit = data.loc[data["Rotational speed [rpm]"]<superior_limit]["Machine failure"].value_counts()

In [None]:
chance_of_failure_above_limit = (
    couting_over_limit[1] / couting_over_limit.sum()
    ) * 100

In [None]:
chance_of_failure_under_limit = (
    couting_under_limit[1] / couting_under_limit.sum()
    ) * 100

In [None]:
print(f"Above Limit: {chance_of_failure_above_limit.round(2)} %")
print(f"Under Limit: {chance_of_failure_under_limit.round(2)} %")

Although we have a higher chance of failure with the equipament running up to the outliers values, it's not conclusive that it generate a failure.<br>
To really understand the impact of rotation on failure, we would have to take a look at the equipament manual and evaluate its operations limits.<br>

Types of procuts manufactured

In [None]:
print(f"Products: {data["Type"].unique()}\n")
print(f"Counting: {data["Type"].value_counts()}")

In [None]:
resume_failure = data.loc[data["Machine failure"]==1]
resume_type = resume_failure.groupby(["Type"]).agg({"Machine failure": "count"}).reset_index()

In [None]:
fig, ax = plt.subplots(figsize=(13, 6))

bars = ax.barh(width=resume_type["Machine failure"], y=resume_type["Type"])
ax.set_ylabel("Product Type")
ax.set_xlabel("Count")
ax.set_title("Counting failures of each product type manufactured", loc="left")
ax.bar_label(bars, padding=3)
plt.tight_layout()
plt.show()

Checking normalization on the variables

In [None]:
data

In [None]:
cols_to_transform = list()

for col in data.drop(cols_drop, axis=1).columns:

    test = stats.normaltest(data[col])
    pvalue = test.pvalue

    alpha = 0.05

    if pvalue > alpha:
        print(f"column: {col}  /  p-value: {pvalue}")
        print('Não rejeita H0: Os dados parecem ser normalmente distribuídos.\n')
    else:
        cols_to_transform.append(col)
        print(f"column: {col}  /  p-value: {pvalue}")
        print('Rejeita H0: Os dados não seguem uma distribuição normal.\n')


In [None]:
data_transformed = data.copy()

for col in cols_to_transform:
    data_array = np.array(data[col]).reshape(-1, 1)
    transformer = PowerTransformer(method="yeo-johnson")
    data_transformed[f"{col}_t"] = transformer.fit_transform(data_array).flatten()

    data_transformed.drop(col, axis=1, inplace=True)

Later, I'll try both ways and understand which fits better. Normalized data or original data

The structure of the modeling may include a failure prediction before the the failure itself.<br>

To do that, we can get the value of failure and offset it to the previous cycles, creating the possibility of early prediction before failure X cycles before real failure.<br>
<br>
We'll try to make the prediction in a range of 3 cycles before failure

In [None]:
data_new = data.drop([col for col in cols_drop if col != "Machine failure" and col != "Type"], axis=1).copy()

In [None]:
for i in data_new.index:
    status = data_new.loc[i, "Machine failure"]
    if status == 1 and i > 3:
        data_new.loc[i-1, "Machine failure"] = 1
        data_new.loc[i-2, "Machine failure"] = 1
        data_new.loc[i-3, "Machine failure"] = 1

In [None]:
data_new["Machine failure"].value_counts()

In [None]:
non_failure = data_new["Machine failure"].value_counts()[0]
failure = data_new["Machine failure"].value_counts()[1]

diff = (failure / non_failure) -1

cutting = 0.2

if abs(diff) > cutting:
    print(f"Difference {diff.round(3) * 100}%\n")
    print("Data is unbalanced. Treatment of balancing is required.")
else:
    print(f"Difference {diff.round(3) * 100}\n")
    print("Data is balanced. No treatment needed")

In [None]:
fig, ax = plt.subplots(figsize=(6, 5))

counting = data_new.groupby(["Machine failure"]).agg({"Type": "count"}).reset_index()

bars = ax.bar(height=counting["Type"], x=counting["Machine failure"])
ax.bar_label(bars)
ax.set_xlabel("Labels")
ax.set_ylabel("Counting")
ax.set_title("Counting values of each label", loc="left")
plt.tight_layout()
plt.show();

### Balancing data

In [None]:
X = data_new.drop(["Machine failure"], axis=1)
y = data_new.loc[:, "Machine failure"]

We've got important variable called Type. Although, this variable is a categorical value.<br>
<br>
As soons as I may use this variable, I'll convert it to binary values with the One-Hot-Encoding method

In [None]:
X = pd.get_dummies(X, columns=["Type"], dtype=int)

# I'll not drop de first dummy. I'll keep all of them

In [None]:
smote = SMOTE()
X_b, y_b = smote.fit_resample(X, y)

data_balanced = pd.concat([X_b, y_b], axis=1)

In [None]:
data_balanced

In [None]:
fig, ax = plt.subplots(figsize=(6, 5))

counting = data_balanced.groupby(["Machine failure"]).agg({"Type_H": "count"}).reset_index()

bars = ax.bar(height=counting["Type_H"], x=counting["Machine failure"])
ax.bar_label(bars)
ax.set_xlabel("Labels")
ax.set_ylabel("Counting")
ax.set_title("Counting values of each label", loc="left")
plt.tight_layout()
plt.show();

In [None]:
data_balanced.corr(method="pearson")

In [None]:
data_balanced.corr(method="kendall")

In [None]:
non_linear_relation = dict()
for col in data_balanced.columns:
    x1 = np.array(data_balanced["Machine failure"]).reshape(-1, 1)
    x2 = np.array(data_balanced[col]).reshape(-1, 1)

    coef = mutual_info_regression(x1, x2)

    non_linear_relation[col] = coef

In [None]:
non_linear_relation

There is a non linear relation between the variables with the Failures.

Does the data have multicolinearity? Let's evaluate

In [None]:
vif_data = pd.DataFrame()
X_vif = data_balanced.drop(["Machine failure"], axis=1)
vif_data["features"] = X_vif.columns

vif_data["VIF"] = [variance_inflation_factor(X_vif.values, i) for i in range(len(X_vif.columns))]

vif_data

- 'Air Temperature [K]' and 'Process Temperature [K]' contains a huge multicolinearity. Depending on how the model behives, We can drop it.<br>
- 'Rotational speed [rpm]' and 'Torque [Nm]' also contains multicolinearity. Which make sense because rotary machinery have a Torque and Rotational Speed.<br>
- All other variabels satisfy the Variation inflation Factor Limits


In this case, we can Drop one of the columns between: 'Air Temperature [K]' and 'Process Temperature [K]', and also drop one between 'Rotational speed [rpm]' and 'Torque [Nm]'.<br>
It'll all depend on how does the model behive to the modeling, and if the column is statistically significant.

### Separate Original data into Train and Test

In [None]:
X_b.columns = [col.replace("[K]", "").replace("[rpm]", "").replace("[Nm]", "").replace("[min]", "") for col in X_b.columns]

In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    X_b, y_b, test_size=0.2, shuffle=True
)

### Model experiment - Original Data

In [None]:
models = {
    "RandomForestClassifier": RandomForestClassifier(),
    "XGBoostClassifier": XGBClassifier(),
    "SVC": SVC(),
    "GaussianNB": GaussianNB(),
    "DecisionTreeClassifier": DecisionTreeClassifier(),
    "HistGradientBoostingClassifier": HistGradientBoostingClassifier(),
}

In [None]:
parameters = {
    "RandomForestClassifier": {
        "n_estimators": [100, 150, 180, 200],
        # "criterion": ["gini", "entropy", "log_loss"],
        # "max_depth": [None, 1, 3, 5, 7, 9],
        "min_samples_split": [2, 5, 8, 11],
        # "max_features": ["sqrt", "log2", None],
        "min_samples_leaf": [2, 5, 9]
    },
    "XGBoostClassifier": {
        "n_estimators": [100, 150, 180, 200],
        # "learning_rate": [0.3, 0.1, 0.01, 0.005],
        "max_depth": [6, 12, 24, 44],
    },
    "SVC": {
        "C": [1.0, 1.5, 2.0],
        # "kernel": ["linear", "poly", "rbf", "sigmoid", "precomputed"],
        # "degree": [3, 5, 7, 9],
        "gamma": ["scale", "auto"],
    },
    "GaussianNB": {
        "var_smoothing": [1e-9, 1e-6, 1e-3, 1]
    },
    "DecisionTreeClassifier": {
        # "criterion": ["gini", "entropy", "log_loss"],
        # "splitter": ["best", "random"],
        # "max_depth": [None, 5, 10, 15, 20, 25],
        "min_samples_split": [2, 5, 8, 11, 14],
        "min_samples_leaf": [1, 3, 5, 7, 9, 11],
    },
    "HistGradientBoostingClassifier": {
        # "learning_rate": [0.1, 0.01, 0.001],
        "max_iter": [100, 150, 200, 250],
        "max_depth": [None, 5, 10, 15, 20, 25],
    },
}

In [None]:
def model_evaluation(y_true, y_pred):
    accuracy_score_ = accuracy_score(y_true, y_pred)
    f1_score_ = f1_score(y_true, y_pred)
    recall_score_ = recall_score(y_true, y_pred)

    return accuracy_score_, f1_score_, recall_score_

In [None]:
def run_training_experiment(
        X_train,
        y_train,
        models: dict,
        params: dict,
        experiment: str,
        type: str
) -> dict:
    print(">>>>>>>>>>> Running Training Experiment <<<<<<<<<<<<")
    models_uri = dict()
    for m, estimator in models.items():
        print(f"\nModel: {m}")

        params = parameters[m]

        grid_cv = GridSearchCV(
            estimator=estimator,
            param_grid=params,
            cv=3,
            verbose=1
        )

        grid_cv.fit(X_train, y_train)
        y_result = grid_cv.predict(X_train)

        accuracy_score_, f1_score_, recall_score_ = model_evaluation(y_train, y_result)

        if accuracy_score != 1:

            mlflow.set_experiment(experiment)
            with mlflow.start_run(run_name=m):
                model_ = mlflow.sklearn.log_model(sk_model=grid_cv.best_estimator_, artifact_path=m)
                models_uri[m] = model_.model_uri
                mlflow.log_params(grid_cv.best_params_)

                mlflow.log_metric("accuracy_score", accuracy_score_)
                mlflow.log_metric("f1_score", f1_score_)
                mlflow.log_metric("recall_score", recall_score_)

                mlflow.set_tag("Experiment Type", "Training")
                mlflow.set_tag("Data Type", type)

    print("\n>>>>>>>>>>> Finishing Training Experiment <<<<<<<<<<<<")
    return models_uri

In [None]:
def run_testing_experiment(
        X_test,
        y_test,
        trained_models: dict,
        experiment: str,
        type: str
) -> None:
    print("\n>>>>>>>>>>> Running Testing Experiment <<<<<<<<<<<<")
    for m, run_id in trained_models.items():
        print(m, run_id)

        loaded_model = mlflow.pyfunc.load_model(run_id)

        y_pred_test = loaded_model.predict(X_test)

        accuracy_score_, f1_score_, recall_score_ = model_evaluation(y_test, y_pred_test)
        print(accuracy_score_, f1_score_, recall_score_)

        mlflow.set_experiment(experiment)
        with mlflow.start_run(run_name=m):
            mlflow.sklearn.log_model(sk_model=loaded_model, artifact_path=m)
            mlflow.log_metric("accuracy_score", accuracy_score_)
            mlflow.log_metric("f1_score", f1_score_)
            mlflow.log_metric("recall_score", recall_score_)

            mlflow.set_tag("Experiment Type", "Testing")
            mlflow.set_tag("Data Type", type)

    print("\n>>>>>>>>>>> Finishing Testing Experiment <<<<<<<<<<<<")

In [None]:
trained_models = run_training_experiment(
    X_train,
    y_train,
    models,
    parameters,
    "training_experiment",
    "Original data",
)

run_testing_experiment(
        X_test,
        y_test,
        trained_models,
        "testing_experiment",
        "Original data",
)

### Scale Data

In [None]:
scaler = MinMaxScaler(feature_range=(0, 1))
scaler = scaler.fit(X_b.copy())
X_b_scaled = scaler.transform(X_b.copy())

### Separate Scaled data into Train and Test

In [None]:
X_train_scaled, X_test_scaled, y_train, y_test = train_test_split(
    X_b_scaled, y_b, test_size=0.2, shuffle=True
)

### Model experiment - Scaled Data

In [None]:
trained_models_scaled = run_training_experiment(
    X_train_scaled,
    y_train,
    models,
    parameters,
    "training_experiment",
    "Scaled data",
)

run_testing_experiment(
        X_test_scaled,
        y_test,
        trained_models_scaled,
        "testing_experiment",
        "Scaled data",
)

## Prediction Set Up

### Get best model from MLflow

In [None]:
from mlflow.tracking import MlflowClient
import mlflow

In [None]:
def get_all_models(experiment_name: str):
    client = MlflowClient()
    experiment = client.get_experiment_by_name(experiment_name)

    all_models = client.search_runs(
        experiment_ids=[experiment.experiment_id],
        order_by=["metrics.accuracy_score DESC"],
    )

    return all_models

In [None]:
def get_ideal_models(all_models) -> dict:
    models_to_go = dict()

    for m in all_models:

        model_name = f"{m.info.run_name} | {m.data.tags["Data Type"]}"
        score = m.data.metrics["accuracy_score"]
        if score != 1 and score >= 0.7:

            models_to_go[model_name] = score

    return models_to_go

In [None]:
def get_best_model_uri(models: dict) -> dict:
    best_model = max(models, key=models.get)
    model, data_type = best_model.split(" | ")

    all_trained_models = get_all_models("training_experiment")

    output_model = dict()
    for m in all_trained_models:
        model_comp = m.info.run_name
        data_type_comp = m.data.tags["Data Type"]

        if model == model_comp and data_type == data_type_comp:
            run_id = m.info.run_id

            model_uri = f"runs:/{run_id}/{model}"
            output_model["model_name"] = model
            output_model["model_uri"] = model_uri
            output_model["data_type"] = data_type

    return output_model

In [None]:
all_tested_models = get_all_models("testing_experiment")
models = get_ideal_models(all_tested_models)
model = get_best_model_uri(models)

print(f"Final model:\n{model}")

### Making predictions

In [None]:
def make_prediction(model: dict, y_true):
    loaded_model = mlflow.pyfunc.load_model(model["model_uri"])

    if model["data_type"] == "Original data":
        y_pred = loaded_model.predict(X_test)
    else:
        y_pred = loaded_model.predict(X_test_scaled)

    acc_score = accuracy_score(y_true, y_pred)
    conf_matrix = confusion_matrix(y_true, y_pred)

    return acc_score, conf_matrix

In [None]:
acc_score, conf_matrix, = make_prediction(model, y_test)

In [None]:
plt.figure(figsize=(7, 5))
sns.heatmap(conf_matrix, annot=True, cbar=False, fmt="d", cmap="Purples")
plt.xlabel("Predicted Labels")
plt.ylabel("True Labels")
plt.title(f"Confusion Matrix of {model["model_name"]} | Score: {np.round(acc_score, 3)} | {model["data_type"]}", loc="left")
plt.tight_layout()
plt.show();

## Neural Network Model Development