In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report
from keras.models import Sequential
from keras.layers import Dense, Dropout, LSTM
from keras.callbacks import EarlyStopping
import matplotlib.pyplot as plt

In [None]:
# Step 1: Load Datasets
dataset_train = pd.read_csv(r'C:\Users\njhar\Downloads\train_FD001.txt', sep=" ", header=None).drop([26, 27], axis=1)
col_names = ['id', 'cycle', 'setting1', 'setting2', 'setting3', 's1', 's2', 's3', 's4', 's5', 's6',
             's7', 's8', 's9', 's10', 's11', 's12', 's13', 's14', 's15', 's16', 's17', 's18',
             's19', 's20', 's21']
dataset_train.columns = col_names

In [None]:
dataset_test = pd.read_csv(r'C:\Users\njhar\Downloads\test_FD001.txt', sep=" ", header=None).drop([26, 27], axis=1)
dataset_test.columns = col_names

In [None]:
pm_truth = pd.read_csv(r'C:\Users\njhar\Downloads\RUL_FD001.txt', sep=" ", header=None).drop([1], axis=1)
pm_truth.columns = ['more']
pm_truth['id'] = pm_truth.index + 1

In [None]:
# Step 2: Prepare the dataset for training
# Generate column max for test data
rul = pd.DataFrame(dataset_test.groupby('id')['cycle'].max()).reset_index()
rul.columns = ['id', 'max']

In [None]:
# Run to failure
pm_truth['rtf'] = pm_truth['more'] + rul['max']
pm_truth.drop('more', axis=1, inplace=True)

In [None]:
# Merge the test set with RUL
dataset_test = dataset_test.merge(pm_truth, on=['id'], how='left')
dataset_test['ttf'] = dataset_test['rtf'] - dataset_test['cycle']
dataset_test.drop('rtf', axis=1, inplace=True)

In [None]:
# Calculate Time to Failure (TTF) for training data
dataset_train['ttf'] = dataset_train.groupby(['id'])['cycle'].transform(max) - dataset_train['cycle']

In [None]:
# Create labels
period = 30
dataset_train['label_bc'] = dataset_train['ttf'].apply(lambda x: 1 if x <= period else 0)
dataset_test['label_bc'] = dataset_test['ttf'].apply(lambda x: 1 if x <= period else 0)

In [None]:
# Feature columns and target column
features_col_name = ['setting1', 'setting2', 'setting3', 's1', 's2', 's3', 's4', 's5', 's6',
                     's7', 's8', 's9', 's10', 's11', 's12', 's13', 's14', 's15', 's16',
                     's17', 's18', 's19', 's20', 's21']
target_col_name = 'label_bc'

In [None]:
# Step 3: Normalize Features
sc = MinMaxScaler()
dataset_train[features_col_name] = sc.fit_transform(dataset_train[features_col_name])
dataset_test[features_col_name] = sc.transform(dataset_test[features_col_name])

In [None]:
# Step 4: Prepare Data for LSTM
def gen_sequence(id_df, seq_length, seq_cols):
    df_zeros = pd.DataFrame(np.zeros((seq_length - 1, id_df.shape[1])), columns=id_df.columns)
    id_df = pd.concat([df_zeros, id_df], ignore_index=True)
    data_array = id_df[seq_cols].values
    num_elements = data_array.shape[0]
    lstm_array = []
    for start in range(num_elements - seq_length):
        lstm_array.append(data_array[start:start + seq_length, :])
    return np.array(lstm_array)

In [None]:
# Function to generate labels
def gen_label(id_df, seq_length, label):
    df_zeros = pd.DataFrame(np.zeros((seq_length - 1, id_df.shape[1])), columns=id_df.columns)
    id_df = pd.concat([df_zeros, id_df], ignore_index=True)
    y_label = []
    num_elements = id_df.shape[0]
    for stop in range(seq_length, num_elements):
        y_label.append(id_df[label].iloc[stop])
    return np.array(y_label)

In [None]:
# Timestamp or window size
seq_length = 50
seq_cols = features_col_name

In [None]:
# Generate X_train
X_train = np.concatenate(
    [gen_sequence(dataset_train[dataset_train['id'] == id], seq_length, seq_cols) for id in dataset_train['id'].unique()]
)
print("X_train shape:", X_train.shape)

# Generate y_train
y_train = np.concatenate(
    [gen_label(dataset_train[dataset_train['id'] == id], seq_length, 'label_bc') for id in dataset_train['id'].unique()]
)
print("y_train shape:", y_train.shape)

In [None]:
# Generate X_test
X_test = np.concatenate(
    [gen_sequence(dataset_test[dataset_test['id'] == id], seq_length, seq_cols) for id in dataset_test['id'].unique()]
)
print("X_test shape:", X_test.shape)

# Generate y_test
y_test = np.concatenate(
    [gen_label(dataset_test[dataset_test['id'] == id], seq_length, 'label_bc') for id in dataset_test['id'].unique()]
)
print("y_test shape:", y_test.shape)

In [None]:
# Step 5: Build and Train LSTM Model
nb_features = X_train.shape[2]
model = Sequential()
model.add(LSTM(units=100, input_shape=(seq_length, nb_features), return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(units=50, return_sequences=False))
model.add(Dropout(0.2))
model.add(Dense(units=1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
# Fit the model
model.fit(X_train, y_train, epochs=10, batch_size=200, validation_split=0.05, verbose=1,
          callbacks=[EarlyStopping(monitor='val_loss', patience=3)])

In [None]:
# Step 6: Model Evaluation
y_pred = (model.predict(X_test) > 0.5).astype(int)
print('Accuracy of model on test data: ', accuracy_score(y_test, y_pred))
print('-------------------------------------------------------------------')
print('Confusion Matrix: \n', confusion_matrix(y_test, y_pred))
print('-------------------------------------------------------------------')
print('Classification Report: \n', classification_report(y_test, y_pred))

In [None]:
# Step 7: Probability of Failure Function
def prob_failure(machine_id):
    machine_df = dataset_test[dataset_test.id == machine_id]
    machine_test = gen_sequence(machine_df, seq_length, seq_cols)
    m_pred = model.predict(machine_test)
    failure_prob = list(m_pred[-1] * 100)[0]  # Probability as a percentage
    return failure_prob

In [None]:
# Example: Probability of failure for machine ID 16
machine_id = int(input("Enter the Engine ID: "))
print('Probability that machine will fail within 30 days: ', prob_failure(machine_id))