# Feature Engineering for Predictive Maintenance 

This notebook focuses on the **Feature Engineering (FE)** stage for the predictive maintenance dataset.  
The goal is to merge the different sources, clean inconsistencies, and generate enriched features to train a model that predicts machine failures in the near future.  


---

## Action Plan  
1. Merge datasets (`telemetry`, `errors`, `maintenance`, `failures`, `machines`).  
2. Validate missing values and apply imputation if necessary.  
3. Feature engineering:  
   - Time-based features (hour, weekday, month, etc).  
   - Rolling features (means, std, lags on sensors and error flags).  
4. Encode categorical variables (`model`, components).  
5. Re-label the target to anticipate **near future failures**.  
6. Validate feature importance (mutual information + correlation with target).  

---

## Import Libraries and Set Preferences

In [1]:
import pandas as pd
import numpy as np
import matplotlib as plt

In [2]:
# Numeric data format
pd.set_option('display.float_format', '{:,.2f}'.format)
pd.options.display.float_format = '{:,.2f}'.format

---

## Load Files

In [3]:
errors = pd.read_csv("../data/PdM_errors.csv", parse_dates=['datetime'])
failures = pd.read_csv("../data/PdM_failures.csv", parse_dates=['datetime'])
machines = pd.read_csv("../data/PdM_machines.csv")
maintenance = pd.read_csv("../data/PdM_maint.csv", parse_dates=['datetime'])
telemetry = pd.read_csv("../data/PdM_telemetry.csv", parse_dates=['datetime'])

---

## Pre-Process Datasets

During the EDA phase, we identified the presence of simultaneous events in the **errors**, **maintenance**, and **failures** tables.  
Since our plan involves creating lag features and rolling averages—which require a consistent frequency and format in the main dataframe—we will pivot these tables.  
This approach ensures a single record for each `(datetime, machineID)` combination while preserving all available information.

In [4]:
# Dictionary with the tables to check
tables = {
    "errors": errors,
    "maintenance": maintenance,
    "failures": failures
}

# Loop to count simultaneous events per table
for name, df in tables.items():
    check = df.groupby(["datetime", "machineID"]).size()
    simultaneous = (check > 1).sum()
    print(f"{name}: {simultaneous} simultaneous events")
    df = None

errors: 274 simultaneous events
maintenance: 756 simultaneous events
failures: 42 simultaneous events


We isolate **proactive maintenance events** to avoid data leakage, since some maintenance records in the dataset actually correspond to reactive replacements triggered by a failure.

In [5]:
# Marcar proactivo vs reactivo
maint_with_flag = maintenance.merge(
    failures[["datetime", "machineID", "failure"]],
    left_on = ["datetime", "machineID", "comp"],
    right_on= ["datetime", "machineID", "failure"],
    how="left",
    indicator=True
)

In [9]:
# Solo mantenimientos proactivos (no aparecen en failures)
maintenance_proactive = (
    maint_with_flag[maint_with_flag["_merge"] == "left_only"]
      .drop(columns=["_merge", "failure"])
      .drop_duplicates()
)

In [11]:
# Pivot errors
errors_pivot = (
    errors.assign(flag=1)
          .pivot_table(index=["datetime", "machineID"],
                       columns="errorID",
                       values="flag",
                       fill_value=0)
          .reset_index()
)

# Pivot maintenance
maint_pivot = (
    maintenance_proactive.assign(flag=1)
               .pivot_table(index=["datetime", "machineID"],
                            columns="comp",
                            values="flag",
                            fill_value=0)
               .reset_index()
)

# Pivot failures
failures_pivot = (
    failures.assign(flag=1)
            .pivot_table(index=["datetime", "machineID"],
                         columns="failure",
                         values="flag",
                         fill_value=0)
            .reset_index()
)


Validate with errors dataframe

In [12]:
# before
errors.head()

Unnamed: 0,datetime,machineID,errorID
0,2015-01-03 07:00:00,1,error1
1,2015-01-03 20:00:00,1,error3
2,2015-01-04 06:00:00,1,error5
3,2015-01-10 15:00:00,1,error4
4,2015-01-22 10:00:00,1,error4


In [13]:
# after
errors_pivot.head()

errorID,datetime,machineID,error1,error2,error3,error4,error5
0,2015-01-01 06:00:00,24,1.0,0.0,0.0,0.0,0.0
1,2015-01-01 06:00:00,73,0.0,0.0,0.0,1.0,0.0
2,2015-01-01 06:00:00,81,1.0,0.0,0.0,0.0,0.0
3,2015-01-01 07:00:00,43,0.0,0.0,1.0,0.0,0.0
4,2015-01-01 08:00:00,14,0.0,0.0,0.0,1.0,0.0


We'll pivot the `model` column into binary flags (e.g., `model_M1`, `model_M2`, …) to keep the dataset fully numeric and consistent with the other pivoted tables.  
This ensures compatibility with any ML algorithm, simplifies feature management, and avoids the need for separate categorical encoders.

In [14]:
# Pivot models
machines_pivot = (
    machines.assign(flag=1)
            .pivot_table(index="machineID",
                         columns="model",
                         values="flag",
                         fill_value=0)
            .reset_index()
)

# Drop the column name ("model") left by pivot_table
machines_pivot.columns.name = None

In [15]:
machines_pivot

Unnamed: 0,machineID,model1,model2,model3,model4
0,1,0.00,0.00,1.00,0.00
1,2,0.00,0.00,0.00,1.00
2,3,0.00,0.00,1.00,0.00
3,4,0.00,0.00,1.00,0.00
4,5,0.00,0.00,1.00,0.00
...,...,...,...,...,...
95,96,0.00,1.00,0.00,0.00
96,97,0.00,1.00,0.00,0.00
97,98,0.00,1.00,0.00,0.00
98,99,1.00,0.00,0.00,0.00


---

## Merge Dataframes

In [16]:
# Base: telemetry + machines
full_df = telemetry.merge(machines_pivot, on="machineID", how="left")

In [17]:
# Merge with errors
full_df = full_df.merge(errors_pivot, on=["datetime", "machineID"], how="left")

In [18]:
# Rename maintenance components columns, to avoid confusion with columns on failures dataframe
maint_pivot = maint_pivot.rename(
    columns={c: f"{c}_maint" for c in maint_pivot.columns if c.startswith("comp")}
)

In [19]:
# Merge with maintenance
full_df = full_df.merge(
    maint_pivot,
    on=["datetime", "machineID"],
    how="left"
)

In [20]:
# Rename failures components columns, to avoid confusion with columns on maintenance dataframe
failures_pivot = failures_pivot.rename(
    columns={c: f"{c}_fail" for c in failures_pivot.columns if c.startswith("comp")}
)

In [21]:
# Merge with failures
full_df = full_df.merge(
    failures_pivot,
    on=["datetime", "machineID"],
    how="left"
)

In [22]:
full_df.head(3).T

Unnamed: 0,0,1,2
datetime,2015-01-01 06:00:00,2015-01-01 07:00:00,2015-01-01 08:00:00
machineID,1,1,1
volt,176.22,162.88,170.99
rotate,418.50,402.75,527.35
pressure,113.08,95.46,75.24
vibration,45.09,43.41,34.18
model1,0.00,0.00,0.00
model2,0.00,0.00,0.00
model3,1.00,1.00,1.00
model4,0.00,0.00,0.00


---

## Full DataFrame Quality Check

- Missing values  
- Imputation (if needed)  
- Deduplication (if needed)

At this stage, we expect multiple NaN values since failures, maintenance, and errors are not present in every record.  
Because these columns are already encoded as binary indicators, we can safely impute all missing values with zero.

In [23]:
full_df.isnull().sum()[full_df.isnull().any()]

error1         872484
error2         872484
error3         872484
error4         872484
error5         872484
comp1_maint    874360
comp2_maint    874360
comp3_maint    874360
comp4_maint    874360
comp1_fail     875381
comp2_fail     875381
comp3_fail     875381
comp4_fail     875381
dtype: int64

In [24]:
full_df = full_df.fillna(0)

In [25]:
# Identify duplicates
duplicates = full_df.duplicated(subset=["datetime", "machineID"]).sum()
print(f"Duplicated records on final merge: {duplicates}")


Duplicated records on final merge: 0


---

## Feature Engineering

### Time-based features

In [26]:
# --- Calendar features ---
full_df["hour"] = full_df["datetime"].dt.hour
full_df["dayofweek"] = full_df["datetime"].dt.dayofweek  # Monday=0, Sunday=6
full_df["month"] = full_df["datetime"].dt.month

# --- Cyclical encoding for hour and dayofweek ---
full_df["hour_sin"] = np.sin(2 * np.pi * full_df["hour"] / 24)
full_df["hour_cos"] = np.cos(2 * np.pi * full_df["hour"] / 24)

full_df["dayofweek_sin"] = np.sin(2 * np.pi * full_df["dayofweek"] / 7)
full_df["dayofweek_cos"] = np.cos(2 * np.pi * full_df["dayofweek"] / 7)


### Rolling Features

#### Telemetry

In [27]:
# Telemetry features: lags + rolling stats
telemetry_cols = ["volt", "rotate", "pressure", "vibration"]

In [28]:
# Ensure chronological order per machine
full_df = full_df.sort_values(["machineID", "datetime"]).reset_index(drop=True)

# Now lags and rolling features are safe
for col in telemetry_cols:
    full_df[f"{col}_lag1"] = full_df.groupby("machineID")[col].shift(1)

    full_df[f"{col}_mean24h"] = (
        full_df.groupby("machineID")[col]
          .transform(lambda x: x.rolling(window=24, min_periods=1).mean())
    )
    full_df[f"{col}_std24h"] = (
        full_df.groupby("machineID")[col]
          .transform(lambda x: x.rolling(window=24, min_periods=1).std())
    )

After creating lag and rolling features, we expect additional NaN values, mainly in the first record of each machine (100).
Since these cases are very few, we can simply discard them.

In [29]:
full_df.isnull().sum()[full_df.isnull().any()]

volt_lag1           100
volt_std24h         100
rotate_lag1         100
rotate_std24h       100
pressure_lag1       100
pressure_std24h     100
vibration_lag1      100
vibration_std24h    100
dtype: int64

In [30]:
full_df = full_df.dropna().reset_index(drop=True)

#### Recent events

We will add two binary features to indicate whether each machine has experienced 
at least one **error** or one **maintenance event** in the previous 24 hours.  

These features serve as short-term history indicators, helping the model capture 
recency effects that may increase the likelihood of a failure.


In [31]:
# Ensure chronological order
full_df = full_df.sort_values(["machineID", "datetime"]).reset_index(drop=True)

# Define error and maintenance groups
error_cols = [c for c in full_df.columns if c.startswith("error")]
maint_cols = [c for c in full_df.columns if c.endswith("_maint")]

# --- Aggregate into "any" flags ---
full_df["any_error"] = full_df[error_cols].any(axis=1).astype(int)
full_df["any_maint"] = full_df[maint_cols].any(axis=1).astype(int)


# --- Rolling 24h windows per machine ---
full_df["any_error_last24h"] = (
    full_df.groupby("machineID")["any_error"]
      .transform(lambda x: x.rolling(window=24, min_periods=1).max())
)

full_df["any_maint_last24h"] = (
    full_df.groupby("machineID")["any_maint"]
      .transform(lambda x: x.rolling(window=24, min_periods=1).max())
)

We will also add a binary feature indicating whether any failure is present.  
This feature will later be shifted using our near-future horizon to generate the target variable.

In [32]:
fail_cols = [c for c in full_df.columns if c.endswith("_fail")]
full_df["any_fail"] = full_df[fail_cols].any(axis=1).astype(int)

Our final dataset looks like this

In [33]:
full_df.head(2).T

Unnamed: 0,0,1
datetime,2015-01-01 07:00:00,2015-01-01 08:00:00
machineID,1,1
volt,162.88,170.99
rotate,402.75,527.35
pressure,95.46,75.24
vibration,43.41,34.18
model1,0.00,0.00
model2,0.00,0.00
model3,1.00,1.00
model4,0.00,0.00


## Feature Relevance Analysis (Reference)

As a final step in this Feature Engineering notebook, we explore the relationship 
between the engineered features and the current `any_fail` indicator.  

**Important note:**  
This analysis is **only for reference**.  
Our true target will be `any_fail_future`, which will be created later in the training 
pipeline using a gap + horizon strategy.  

We apply three complementary approaches:  

1. **Mutual Information (MI)**  
   - Captures both linear and non-linear dependencies.  
   - Works for continuous and categorical/binary features.  

2. **Correlation**  
   - Point-biserial correlation for continuous features (measures how well they separate 
     the two classes of `any_fail`).  
   - Spearman correlation for binary features (monotonic association).  

3. **Chi² test**  
   - Applied only to binary features (e.g., errors, maintenance, model dummies).  
   - Tests the statistical association between feature presence and `any_fail`.  

These analyses help us understand which features already show strong association with 
failures, but the actual target engineering (`any_fail_future`) will be performed 
later, after the temporal split.


In [34]:
exclude_cols = ["datetime", "machineID", "any_fail"] + \
               [c for c in full_df.columns if c.endswith("_fail")]

In [35]:
from sklearn.feature_selection import mutual_info_classif

# Features vs any_fail
X = full_df.drop(columns=exclude_cols)
y = full_df["any_fail"]

mi_scores = mutual_info_classif(X, y, discrete_features="auto", random_state=42)
mi_series = pd.Series(mi_scores, index=X.columns).sort_values(ascending=False)

print("Top 15 features by Mutual Information:\n")
display(mi_series.head(15))


Top 15 features by Mutual Information:



model3              0.07
model4              0.06
dayofweek_cos       0.05
dayofweek           0.03
dayofweek_sin       0.03
hour_sin            0.02
month               0.02
model2              0.02
hour_cos            0.02
model1              0.02
hour                0.01
any_error_last24h   0.01
any_maint_last24h   0.00
any_maint           0.00
rotate_mean24h      0.00
dtype: float64

In [36]:
from scipy.stats import spearmanr, pointbiserialr

corrs = {}
for col in full_df.drop(columns=exclude_cols).columns:
    if full_df[col].nunique() > 2:  # continuous
        try:
            corr, _ = pointbiserialr(full_df[col], y)
            corrs[col] = corr
        except Exception:
            pass
    else:  # binary
        corr, _ = spearmanr(full_df[col], y)
        corrs[col] = corr

corr_series = pd.Series(corrs).sort_values(key=lambda x: abs(x), ascending=False)

print("Top 15 features by correlation with target:\n")
display(corr_series.head(15))


Top 15 features by correlation with target:



any_maint            0.25
comp4_maint          0.12
comp3_maint          0.12
comp1_maint          0.11
comp2_maint          0.10
any_maint_last24h    0.05
rotate_mean24h      -0.04
hour_sin             0.04
vibration_mean24h    0.03
volt_mean24h         0.03
pressure_mean24h     0.03
hour                -0.02
rotate_lag1         -0.01
vibration            0.01
pressure_lag1        0.01
dtype: float64

In [37]:
from sklearn.feature_selection import chi2
from sklearn.preprocessing import MinMaxScaler

binary_cols = [c for c in full_df.columns 
               if full_df[c].nunique() <= 2 and c not in exclude_cols]

X_bin = full_df[binary_cols]
y = full_df["any_fail"]

# Chi² necesita datos no negativos
scaler = MinMaxScaler()
X_bin_scaled = scaler.fit_transform(X_bin)

chi_scores, pvals = chi2(X_bin_scaled, y)
chi_series = pd.Series(chi_scores, index=binary_cols).sort_values(ascending=False)

print("Top 15 binary features by Chi² test:\n")
display(chi_series.head(15))


Top 15 binary features by Chi² test:



any_maint           54,212.83
comp4_maint         13,410.26
comp3_maint         12,332.68
comp1_maint         10,142.07
comp2_maint          8,287.45
any_maint_last24h    1,766.52
model1                  31.28
model4                  14.67
model2                  11.68
model3                   6.25
error5                   1.72
error2                   0.81
any_error_last24h        0.67
error4                   0.60
error3                   0.14
dtype: float64

In [38]:
# Save as Parquet (preferred: smaller + preserves datatypes)
full_df.to_parquet("../data/processed/pdm_features.parquet", index=False)

print("✅ Processed dataset saved successfully!")

✅ Processed dataset saved successfully!
