<a href="https://colab.research.google.com/github/DGuilherme/PMTese/blob/main/notebooks%5CTransformer%5CSimple%5CTransformerAllDatasetsComparison.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install tensorflow pandas scikit-learn

import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
def run_predictive_maintenance(dataset_name, scaler_type='minmax', seq_length=50):

  # Load data
  column_names = ['id', 'cycle', 'setting1', 'setting2', 'setting3'] + [f'sensor{i}' for i in range(1, 22)]
  train_data = pd.read_csv(f'/content/drive/MyDrive/Python/predictive-maintenance-main/datasets/cmapss/train_{dataset_name}.txt/train_{dataset_name}.txt', delim_whitespace=True, header=None)
  test_data = pd.read_csv(f'/content/drive/MyDrive/Python/predictive-maintenance-main/datasets/cmapss/test_{dataset_name}.txt/test_{dataset_name}.txt', delim_whitespace=True, header=None)
  rul_data = pd.read_csv(f'/content/drive/MyDrive/Python/predictive-maintenance-main/datasets/cmapss/RUL_{dataset_name}.txt/RUL_{dataset_name}.txt', delim_whitespace=True, header=None)


  train_data.columns = column_names
  test_data.columns = column_names
  rul_data.columns = ['RUL']

  # Generate RUL for training data
  max_cycle = train_data.groupby('id')['cycle'].max()
  train_data = train_data.merge(max_cycle.reset_index(), on='id', suffixes=('', '_max'))
  train_data['RUL'] = train_data['cycle_max'] - train_data['cycle']
  train_data.drop(columns=['cycle_max'], inplace=True)

  # RUL for test data
  max_cycle_test = test_data.groupby('id')['cycle'].max().reset_index()
  max_cycle_test.columns = ['id', 'max_cycle']
  rul_data.columns = ['RUL']
  max_cycle_test['RUL'] = rul_data['RUL']
  test_data = test_data.merge(max_cycle_test, on='id')
  test_data['RUL'] = test_data['RUL'] + test_data['max_cycle'] - test_data['cycle']
  test_data.drop(columns=['max_cycle'], inplace=True)

  useful_sensor_cols = ['setting1', 'setting2', 'setting3'] + \
      [f'sensor{i}' for i in [2, 3, 4, 7, 8, 11, 12, 13, 14, 15, 17, 20, 21]]

  train_data = train_data[['id', 'cycle'] + useful_sensor_cols + ['RUL']]
  test_data = test_data[['id', 'cycle'] + useful_sensor_cols + ['RUL']]

  # Choose scaler based on scaler_type
  if scaler_type == 'standard':
      scaler = StandardScaler()
  elif scaler_type == 'minmax':
      scaler = MinMaxScaler()
  else:
      raise ValueError("Invalid scaler_type. Choose 'standard' or 'minmax'.")

  # Normalize sensor values
  scaler = StandardScaler()
  train_data[useful_sensor_cols] = scaler.fit_transform(train_data[useful_sensor_cols])
  test_data[useful_sensor_cols] = scaler.transform(test_data[useful_sensor_cols])


  # Create sequences
  def create_sequences(data, seq_length=50):
      sequences = []
      for unit in data['id'].unique():
          unit_data = data[data['id'] == unit].reset_index(drop=True)
          for start in range(len(unit_data) - seq_length + 1):
              end = start + seq_length
              seq_X = unit_data.iloc[start:end, 2:-1].values  # all sensor + setting cols
              seq_y = unit_data.iloc[end-1]['RUL']
              sequences.append((seq_X, seq_y))
      return sequences

  seq_length = 50
  train_seqs = create_sequences(train_data, seq_length)
  test_seqs = create_sequences(test_data, seq_length)

  train_X, val_X, train_y, val_y = train_test_split(
      [seq[0] for seq in train_seqs],
      [seq[1] for seq in train_seqs],
      test_size=0.2,
      random_state=42
  )

  # Convert to numpy arrays
  train_X = np.array(train_X)
  train_y = np.array(train_y)
  val_X = np.array(val_X)
  val_y = np.array(val_y)
  test_X = np.array([seq[0] for seq in test_seqs])
  test_y = np.array([seq[1] for seq in test_seqs])

  # Define Transformer
  class TimeSeriesTransformer(tf.keras.Model):
      def __init__(self, input_dim, model_dim, num_heads, num_layers, output_dim):
          super(TimeSeriesTransformer, self).__init__()
          self.input_proj = tf.keras.layers.Dense(model_dim)

          self.encoder_layers = []
          for _ in range(num_layers):
              self.encoder_layers.append([
                  tf.keras.layers.LayerNormalization(),
                  tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=model_dim),
                  tf.keras.Sequential([
                      tf.keras.layers.Dense(model_dim * 4, activation='relu'),
                      tf.keras.layers.Dense(model_dim)
                  ]),
              ])

          self.global_pool = tf.keras.layers.GlobalAveragePooling1D()
          self.output_layer = tf.keras.layers.Dense(output_dim)

      def call(self, inputs, training=False):
          x = self.input_proj(inputs)

          for norm, mha, ffn in self.encoder_layers:
              attn_output = mha(x, x)
              x = norm(x + attn_output)
              ff_output = ffn(x)
              x = norm(x + ff_output)

          x = self.global_pool(x)
          return self.output_layer(x)

  # Instantiate model
  input_dim = train_X.shape[2]
  model_dim = 64
  num_heads = 8
  num_layers = 4
  output_dim = 1

  model = TimeSeriesTransformer(input_dim, model_dim, num_heads, num_layers, output_dim)
  model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
                loss='mse',
                metrics=['mae'])

  # Train
  history = model.fit(
      train_X, train_y,
      validation_data=(val_X, val_y),
      epochs=20,
      batch_size=32
  )

  # Predict
  predictions = model.predict(test_X)

  # Evaluate
  return model, predictions, mean_squared_error(test_y, predictions), mean_absolute_error(test_y, predictions)


In [3]:
def compare_datasets(datasets, scaler_type='standard', seq_length=50):
    results = []
    for dataset_name in datasets:
        model, predictions, mse, mae = run_predictive_maintenance(dataset_name, scaler_type, seq_length)
        rmse = np.sqrt(mse)  # Calculate RMSE
        results.append([dataset_name, rmse, mae])

    # Create a pandas DataFrame for the results
    results_df = pd.DataFrame(results, columns=['Dataset', 'RMSE', 'MAE'])
    return results_df

In [None]:
# Define the list of datasets
datasets = ['FD001', 'FD002', 'FD003', 'FD004']  # Add your dataset names here

# Run the comparison
results_df = compare_datasets(datasets, scaler_type='minmax', seq_length=30)

# Display the results table
display(results_df)