<a href="https://colab.research.google.com/github/Swastik200/30DaysOfDSA/blob/main/GA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install tensorflow scikit-learn deap


Collecting deap
  Downloading deap-1.4.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (135 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.4/135.4 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: deap
Successfully installed deap-1.4.1


In [2]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import tensorflow as tf
from tensorflow.keras.layers import Convolution1D, MaxPooling1D, Dense, Dropout, LSTM, BatchNormalization
from tensorflow.keras.models import Sequential
from deap import base, creator, tools, algorithms
import random

In [3]:
from google.colab import drive
drive.mount('/content/drive')
!cp "/content/drive/MyDrive/CMAPSSData.zip" "/content"
!unzip CMAPSSData.zip

Mounted at /content/drive
Archive:  CMAPSSData.zip
  inflating: CMAPSS/RUL_FD001.txt    
  inflating: CMAPSS/RUL_FD002.txt    
  inflating: CMAPSS/RUL_FD003.txt    
  inflating: CMAPSS/RUL_FD004.txt    
  inflating: CMAPSS/test_FD001.txt   
  inflating: CMAPSS/test_FD002.txt   
  inflating: CMAPSS/test_FD003.txt   
  inflating: CMAPSS/test_FD004.txt   
  inflating: CMAPSS/train_FD001.txt  
  inflating: CMAPSS/train_FD002.txt  
  inflating: CMAPSS/train_FD003.txt  
  inflating: CMAPSS/train_FD004.txt  
  inflating: CMAPSS/x.txt            


In [4]:
path='/content/CMAPSS/'

In [5]:
import pandas as pd
from sklearn.preprocessing import StandardScaler

In [6]:
def load_and_preprocess_data(path):
    train_df = pd.read_csv(path+'train_FD001.txt', delim_whitespace=True, header=None)
    test_df = pd.read_csv(path+'test_FD001.txt', delim_whitespace=True, header=None)
    rul_df = pd.read_csv(path+'RUL_FD001.txt', delim_whitespace=True, header=None)

    # Define column names
    col_names = ['unit_number', 'time_in_cycles'] + [f'sensor_{i}' for i in range(1, 22)] + ['operational_setting_1', 'operational_setting_2', 'operational_setting_3']
    train_df.columns = col_names
    test_df.columns = col_names[:-3]  # Test data does not include the 'RUL' column
    rul_df.columns = ['RUL']

    return train_df, test_df, rul_df

In [7]:
def preprocess_data(train_df, test_df):
    # Normalize data
    scaler = StandardScaler()
    train_df.iloc[:, 2:] = scaler.fit_transform(train_df.iloc[:, 2:])
    test_df.iloc[:, 2:] = scaler.transform(test_df.iloc[:, 2:])

    # Generate labels for training data
    rul = pd.DataFrame(train_df.groupby('unit_number')['time_in_cycles'].max()).reset_index()
    rul.columns = ['unit_number', 'max']
    train_df = train_df.merge(rul, on=['unit_number'], how='left')
    train_df['RUL'] = train_df['max'] - train_df['time_in_cycles']
    train_df.drop(columns=['max'], inplace=True)
   # Generate sequences for LSTM
    sequence_length = 50
    def gen_sequence(id_df, seq_length):
        data_matrix = id_df.values
        num_elements = data_matrix.shape[0]
        for start, stop in zip(range(0, num_elements-seq_length), range(seq_length, num_elements)):
            yield data_matrix[start:stop, :]

    train_gen = (list(gen_sequence(train_df[train_df['unit_number'] == id], sequence_length))
                for id in train_df['unit_number'].unique())
    train_seq_array = np.concatenate(list(train_gen)).astype(np.float32)

    train_label_gen = [train_df[train_df['unit_number'] == id]['RUL'].values[sequence_length:]
                      for id in train_df['unit_number'].unique()]
    train_label_array = np.concatenate(train_label_gen).astype(np.float32)

    return train_seq_array, train_label_array, test_df

In [8]:
def createCNNLSTMModel(inputShape):
    model = Sequential()
    model.add(Convolution1D(input_shape=inputShape, filters=18, kernel_size=2, strides=1, padding='same', activation='relu', name='cv1'))
    model.add(BatchNormalization(axis=-1))
    model.add(MaxPooling1D(pool_size=2, strides=2, padding='same', name='mp1'))

    model.add(Convolution1D(filters=36, kernel_size=2, strides=1, padding='same', activation='relu', name='cv2'))
    model.add(BatchNormalization(axis=-1))
    model.add(MaxPooling1D(pool_size=2, strides=2, padding='same', name='mp2'))

    model.add(Convolution1D(filters=72, kernel_size=2, strides=1, padding='same', activation='relu', name='cv3'))
    model.add(BatchNormalization(axis=-1))
    model.add(MaxPooling1D(pool_size=2, strides=2, padding='same', name='mp3'))
    model.add(tf.keras.layers.Bidirectional(LSTM(64, return_sequences=True)))
    model.add(Dropout(0.2))
    model.add(tf.keras.layers.Bidirectional(LSTM(64)))

    model.add(Dense(50, activation='relu'))
    model.add(Dropout(0.2))

    model.add(Dense(1))

    model.compile(optimizer='rmsprop', loss='mse', metrics=[tf.keras.metrics.RootMeanSquaredError()])
    return model

In [9]:
def evaluate_model(model, X_test, y_test):
    y_pred = model.predict(X_test)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    r2 = r2_score(y_test, y_pred)
    mae = mean_absolute_error(y_test, y_pred)
    return rmse, r2, mae

In [10]:
def load_and_preprocess_data(path):
    train_df = pd.read_csv(path + 'train_FD001.txt', delim_whitespace=True, header=None)
    test_df = pd.read_csv(path + 'test_FD001.txt', delim_whitespace=True, header=None)
    rul_df = pd.read_csv(path + 'RUL_FD001.txt', delim_whitespace=True, header=None)

    # Define column names
    col_names = ['unit_number', 'time_in_cycles'] + [f'op_setting_{i}' for i in range(1, 4)] + [f'sensor_{i}' for i in range(1, 22)]

    # Adding the columns to the dataframes
    train_df.columns = col_names
    test_df.columns = col_names

    # Note: rul_df has only one column for RUL values
    rul_df.columns = ['RUL']

    return train_df, test_df, rul_df


In [11]:
path='/content/CMAPSS/'

# Load and preprocess data
train_df, test_df, rul_df = load_and_preprocess_data(path)
train_seq_array, train_label_array, test_df = preprocess_data(train_df, test_df)


In [12]:
split = int(0.8 * len(train_seq_array))
X_train, X_val = train_seq_array[:split], train_seq_array[split:]
y_train, y_val = train_label_array[:split], train_label_array[split:]

# Define input shape
input_shape = X_train.shape[1:]

In [13]:
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.FitnessMin)

toolbox = base.Toolbox()
toolbox.register("attr_int", random.randint, 1, 100)
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_int, n=3)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)


In [21]:
def evaluate(individual):
    batch_size = individual[0]
    epochs = individual[1]
    patience = individual[2]

    model = createCNNLSTMModel(input_shape)
    model.fit(X_train, y_train, validation_data=(X_val, y_val), batch_size=batch_size, epochs=epochs,
              callbacks=[tf.keras.callbacks.EarlyStopping(patience=patience)], verbose=0)

    rmse, r2, mae = evaluate_model(model, X_val, y_val)
    return rmse,

toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
toolbox.register("select", tools.selTournament, tournsize=3)
toolbox.register("evaluate", evaluate)
population = toolbox.population(n=5)
algorithms.eaSimple(population, toolbox, cxpb=0.5, mutpb=0.2, ngen=5, verbose=True)

# Extract best individual
best_individual = tools.selBest(population, k=1)[0]
batch_size, epochs, patience = best_individual
print(f'Best hyperparameters: batch_size={batch_size}, epochs={epochs}, patience={patience}')

# Train final model with best hyperparameters
model = createCNNLSTMModel(input_shape)
model.fit(X_train, y_train, validation_data=(X_val, y_val), batch_size=batch_size, epochs=epochs,
          callbacks=[tf.keras.callbacks.EarlyStopping(patience=patience)], verbose=1)

# Evaluate on test data
sequence_length = 50
def gen_sequence(id_df, seq_length):
    data_matrix = id_df.values
    num_elements = data_matrix.shape[0]
    for start, stop in zip(range(0, num_elements-seq_length), range(seq_length, num_elements)):
        yield data_matrix[start:stop, :]

test_gen = (list(gen_sequence(test_df[test_df['unit_number'] == id], sequence_length)) for id in test_df['unit_number'].unique())
test_seq_array = np.concatenate(list(test_gen)).astype(np.float32)
y_test = np.concatenate([rul_df.iloc[id].values for id in range(len(rul_df))]).astype(np.float32)

TypeError: createCNNLSTMModel() missing 9 required positional arguments: 'f1', 'f2', 'f3', 'k', 'a1', 'a2', 'd1', 'd2', and 'lr'

In [None]:
rmse, r2, mae = evaluate_model(model, test_seq_array, y_test)
print(f'RMSE: {rmse}, R2: {r2}, MAE: {mae}')