In [11]:
import numpy as np
import pandas as pd
from typing import Tuple, Union

def inject_faults(X_test_orig: Union[np.ndarray, pd.DataFrame], 
                 fault_percentage: float = 0.2) -> Tuple[Union[np.ndarray, pd.DataFrame], np.ndarray]:
    """
    Inject multiple types of faults into test data.
    
    Args:
        X_test_orig: Original test data (numpy array or pandas DataFrame)
        fault_percentage: Percentage of samples to inject faults into (default: 0.2)
        
    Returns:
        Tuple containing:
        - Faulted test data
        - Indices where faults were injected
    """
    X_test_fault = X_test_orig.copy()
    num_samples = len(X_test_fault)
    num_fault_samples = int(num_samples * fault_percentage)
    
    # Select indices for fault injection
    fault_indices = np.random.choice(num_samples, num_fault_samples, replace=False)    
    for idx in fault_indices:
        # Randomly select fault type
        fault_type = np.random.choice(['random', 'malfunction', 'drift', 'bias'])
        
        if isinstance(X_test_fault, pd.DataFrame):
            cols_to_modify = X_test_fault.columns
        else:
            cols_to_modify = range(X_test_fault.shape[1])
            
        if fault_type == 'random':
            # Random fault with varying intensities
            intensity = np.random.choice([1.5, 2.5])
            if isinstance(X_test_fault, pd.DataFrame):
                X_test_fault.loc[idx, cols_to_modify] *= (1 + intensity)
            else:
                X_test_fault[idx] *= (1 + intensity)
                
        elif fault_type == 'malfunction':
            # Malfunction fault with noise based on data variance
            for col in cols_to_modify:
                if isinstance(X_test_fault, pd.DataFrame):
                    variance = np.var(X_test_fault[col])
                    noise =  np.random.normal(0, np.sqrt(variance)) * 3.0
                    X_test_fault.loc[idx, col] += noise
                else:
                    variance = np.var(X_test_fault[:, col])
                    noise =  np.random.normal(0, np.sqrt(variance)) * 3.0
                    X_test_fault[idx, col] += noise
                    
        elif fault_type == 'drift':
            # Drift fault with high intensity and some noise
            intensity = np.random.choice([1, 2, 3, 4])
            noise_intensity = 1.0
            
            for col in cols_to_modify:
                if isinstance(X_test_fault, pd.DataFrame):
                    values = X_test_fault[col].values
                    variance = np.var(values)
                    offset = values[0] * intensity
                    noise = np.random.normal(0, np.sqrt(variance)) * noise_intensity
                    X_test_fault.loc[idx, col] += noise + offset
                else:
                    variance = np.var(X_test_fault[:, col])
                    offset = X_test_fault[0, col] * intensity
                    noise = np.random.normal(0, np.sqrt(variance)) * noise_intensity
                    X_test_fault[idx, col] += noise + offset
                    
        else:  # bias fault
            # Bias fault with fixed intensity
            intensity = 2.0
            if isinstance(X_test_fault, pd.DataFrame):
                for col in cols_to_modify:
                    original_mean = np.mean(X_test_fault[col])
                    X_test_fault.loc[idx, col] = original_mean * intensity
            else:
                for col in cols_to_modify:
                    original_mean = np.mean(X_test_fault[:, col])
                    X_test_fault[idx, col] = original_mean * intensity
    
    return X_test_fault, fault_indices

# TCN


In [None]:
import sys
sys.path.append('/kaggle/input/keras-tcn/keras-tcn-master')

# Import TCN
from tcn import TCN

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
from tcn import TCN  # Make sure to pip install keras-tcn
import matplotlib.pyplot as plt

# Read and preprocess data
df = pd.read_csv('/kaggle/input/fruit-surface-temperature/FST_analysis.csv')

# Combine Date and Time
df['DateTime'] = pd.to_datetime(df['Date'] + ' ' + df['Time'])
df = df.sort_values('DateTime')

# Add time-based features
df['Hour'] = df['DateTime'].dt.hour
df['DayOfWeek'] = df['DateTime'].dt.dayofweek
df['Month'] = df['DateTime'].dt.month

# Define features and target
features = ['Air Temperature', 'Dew Point', 'Solar Radiation', 'Wind Speed', 
           'Hour', 'DayOfWeek', 'Month']
target = 'FST_EB'

# Handle missing values and outliers
df[features] = df[features].ffill().bfill()

# Remove outliers using IQR method
def remove_outliers(df, columns):
    for col in columns:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        df[col] = df[col].clip(lower_bound, upper_bound)
    return df

df = remove_outliers(df, features)

# Scale the data
scaler_X = RobustScaler()
scaler_y = RobustScaler()

X = scaler_X.fit_transform(df[features])
y = scaler_y.fit_transform(df[[target]])

# Create sequences
sequence_length = 48
stride = 1
X_sequences = []
y_sequences = []

for i in range(0, len(df) - sequence_length, stride):
    X_sequences.append(X[i:(i + sequence_length)])
    y_sequences.append(y[i + sequence_length])

X_sequences = np.array(X_sequences)
y_sequences = np.array(y_sequences)

# Split the data
X_train, X_test, y_train, y_test = train_test_split(
    X_sequences, y_sequences, test_size=0.2, random_state=42
)

# Create TCN model
model = Sequential([
    Input(shape=(X_train.shape[1], X_train.shape[2])),
    TCN(64, kernel_size=3, nb_stacks=2, dilations=[1, 2, 4, 8], padding='causal', 
        use_batch_norm=True, dropout_rate=0.2, return_sequences=True),
    TCN(32, kernel_size=3, nb_stacks=2, dilations=[1, 2, 4, 8], padding='causal', 
        use_batch_norm=True, dropout_rate=0.2, return_sequences=False),
    Dense(64, activation='relu'),
    Dense(32, activation='relu'),
    Dense(1)  # Changed to 1 output unit for regression
])

# Compile model with regression-appropriate loss
optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=optimizer, loss='huber')  # Using huber loss for robustness

# Callbacks
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=15,
    restore_best_weights=True
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2,
    patience=5,
    min_lr=0.0001
)

# Train the model
history = model.fit(
    X_train, y_train,
    epochs=100,
    batch_size=64,
    validation_split=0.2,
    callbacks=[early_stopping, reduce_lr],
    verbose=1
)


# Evaluate on clean test data
y_pred_clean = model.predict(X_test)
y_test_orig = scaler_y.inverse_transform(y_test)
y_pred_clean_orig = scaler_y.inverse_transform(y_pred_clean)

# Inject faults and evaluate
X_test_fault, fault_indices = inject_faults(X_test, fault_percentage=0.2)
y_pred_fault = model.predict(X_test_fault)
y_pred_fault_orig = scaler_y.inverse_transform(y_pred_fault)

# Calculate metrics
metrics_clean = {
    'MAE': mean_absolute_error(y_test_orig, y_pred_clean_orig),
    'RMSE': np.sqrt(mean_squared_error(y_test_orig, y_pred_clean_orig)),
    'R2': r2_score(y_test_orig, y_pred_clean_orig)
}

metrics_fault = {
    'MAE': mean_absolute_error(y_test_orig, y_pred_fault_orig),
    'RMSE': np.sqrt(mean_squared_error(y_test_orig, y_pred_fault_orig)),
    'R2': r2_score(y_test_orig, y_pred_fault_orig)
}

# Print results
print("\nMetrics on Clean Test Data:")
for metric, value in metrics_clean.items():
    print(f"{metric}: {value:.4f}")

print("\nMetrics on Faulty Test Data:")
for metric, value in metrics_fault.items():
    print(f"{metric}: {value:.4f}")

# Visualize results
plt.figure(figsize=(15, 10))

# Plot training history
plt.subplot(2, 1, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Training History')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

# Plot predictions
plt.subplot(2, 1, 2)
plt.plot(y_test_orig[:100], label='Actual', alpha=0.8)
plt.plot(y_pred_clean_orig[:100], label='Predicted (Clean)', alpha=0.8)
plt.plot(y_pred_fault_orig[:100], label='Predicted (Faulty)', alpha=0.8)
plt.title('Predictions (First 100 Samples)')
plt.xlabel('Time Steps')
plt.ylabel('FST')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

# ResNet

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (Dense, Input, Conv1D, BatchNormalization, 
                                   Activation, Add, GlobalAveragePooling1D)
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt

# Define ResNet blocks
def residual_block(x, filters, kernel_size=3, stride=1):
    shortcut = x
    x = Conv1D(filters, kernel_size, strides=stride, padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Conv1D(filters, kernel_size, padding='same')(x)
    x = BatchNormalization()(x)
    
    if stride != 1 or shortcut.shape[-1] != filters:
        shortcut = Conv1D(filters, 1, strides=stride, padding='same')(shortcut)
        shortcut = BatchNormalization()(shortcut)
    
    x = Add()([x, shortcut])
    x = Activation('relu')(x)
    return x

def create_resnet_model(input_shape):
    inputs = Input(shape=input_shape)
    x = Conv1D(64, 7, padding='same')(inputs)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    
    # ResNet blocks
    x = residual_block(x, 64)
    x = residual_block(x, 64)
    x = residual_block(x, 128, stride=2)
    x = residual_block(x, 128)
    x = residual_block(x, 256, stride=2)
    x = residual_block(x, 256)
    
    x = GlobalAveragePooling1D()(x)
    x = Dense(256, activation='relu')(x)
    x = Dense(128, activation='relu')(x)
    outputs = Dense(1)(x)  # Single output for regression
    
    model = Model(inputs, outputs)
    return model

# Read and preprocess data
df = pd.read_csv('/kaggle/input/fruit-surface-temperature/FST_analysis.csv')

# Combine Date and Time
df['DateTime'] = pd.to_datetime(df['Date'] + ' ' + df['Time'])
df = df.sort_values('DateTime')

# Add time-based features
df['Hour'] = df['DateTime'].dt.hour
df['DayOfWeek'] = df['DateTime'].dt.dayofweek
df['Month'] = df['DateTime'].dt.month

# Define features and target
features = ['Air Temperature', 'Dew Point', 'Solar Radiation', 'Wind Speed', 
           'Hour', 'DayOfWeek', 'Month']
target = 'FST_EB'

# Handle missing values and outliers
df[features] = df[features].ffill().bfill()

# Remove outliers using IQR method
def remove_outliers(df, columns):
    for col in columns:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        df[col] = df[col].clip(lower_bound, upper_bound)
    return df

df = remove_outliers(df, features)

# Scale the data
scaler_X = RobustScaler()
scaler_y = RobustScaler()

X = scaler_X.fit_transform(df[features])
y = scaler_y.fit_transform(df[[target]])

# Create sequences
sequence_length = 48
stride = 1
X_sequences = []
y_sequences = []

for i in range(0, len(df) - sequence_length, stride):
    X_sequences.append(X[i:(i + sequence_length)])
    y_sequences.append(y[i + sequence_length])

X_sequences = np.array(X_sequences)
y_sequences = np.array(y_sequences)

# Split the data
X_train, X_test, y_train, y_test = train_test_split(
    X_sequences, y_sequences, test_size=0.2, random_state=42
)

# Create and compile ResNet model
model = create_resnet_model((X_train.shape[1], X_train.shape[2]))
optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=optimizer, loss='huber')

# Callbacks
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=15,
    restore_best_weights=True
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2,
    patience=5,
    min_lr=0.0001
)

# Train the model
history = model.fit(
    X_train, y_train,
    epochs=100,
    batch_size=64,
    validation_split=0.2,
    callbacks=[early_stopping, reduce_lr],
    verbose=1
)

# Evaluate on clean test data
y_pred_clean = model.predict(X_test)
y_test_orig = scaler_y.inverse_transform(y_test)
y_pred_clean_orig = scaler_y.inverse_transform(y_pred_clean)

# Inject faults and evaluate
X_test_fault, fault_indices = inject_faults(X_test, fault_percentage=0.2)
y_pred_fault = model.predict(X_test_fault)
y_pred_fault_orig = scaler_y.inverse_transform(y_pred_fault)

# Calculate metrics
metrics_clean = {
    'MAE': mean_absolute_error(y_test_orig, y_pred_clean_orig),
    'RMSE': np.sqrt(mean_squared_error(y_test_orig, y_pred_clean_orig)),
    'R2': r2_score(y_test_orig, y_pred_clean_orig)
}

metrics_fault = {
    'MAE': mean_absolute_error(y_test_orig, y_pred_fault_orig),
    'RMSE': np.sqrt(mean_squared_error(y_test_orig, y_pred_fault_orig)),
    'R2': r2_score(y_test_orig, y_pred_fault_orig)
}

# Print results
print("\nMetrics on Clean Test Data:")
for metric, value in metrics_clean.items():
    print(f"{metric}: {value:.4f}")

print("\nMetrics on Faulty Test Data:")
for metric, value in metrics_fault.items():
    print(f"{metric}: {value:.4f}")

# Visualize results
plt.figure(figsize=(15, 10))

# Plot training history
plt.subplot(2, 1, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Training History')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

# Plot predictions
plt.subplot(2, 1, 2)
plt.plot(y_test_orig[:100], label='Actual', alpha=0.8)
plt.plot(y_pred_clean_orig[:100], label='Predicted (Clean)', alpha=0.8)
plt.plot(y_pred_fault_orig[:100], label='Predicted (Faulty)', alpha=0.8)
plt.title('Predictions (First 100 Samples)')
plt.xlabel('Time Steps')
plt.ylabel('FST')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

# LSTM

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, RobustScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Input, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
import matplotlib.pyplot as plt

# Read the data
df = pd.read_csv('/kaggle/input/fruit-surface-temperature/FST_analysis.csv')

# Print columns for verification
print("Available columns:", df.columns.tolist())

# Combine Date and Time
df['DateTime'] = pd.to_datetime(df['Date'] + ' ' + df['Time'])
df = df.sort_values('DateTime')

# Add time-based features
df['Hour'] = df['DateTime'].dt.hour
df['DayOfWeek'] = df['DateTime'].dt.dayofweek
df['Month'] = df['DateTime'].dt.month

# Define features and target
features = ['Air Temperature', 'Dew Point', 'Solar Radiation', 'Wind Speed', 
           'Hour', 'DayOfWeek', 'Month']
target = 'FST_EB'

# Handle missing values and outliers
df[features] = df[features].ffill().bfill()

# Remove outliers using IQR method
def remove_outliers(df, columns):
    for col in columns:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        df[col] = df[col].clip(lower_bound, upper_bound)
    return df

df = remove_outliers(df, features)

# Use RobustScaler instead of MinMaxScaler for better handling of outliers
scaler_X = RobustScaler()
scaler_y = RobustScaler()

X = scaler_X.fit_transform(df[features])
y = scaler_y.fit_transform(df[[target]])

# Create sequences with overlap
sequence_length = 48  # Increased sequence length
stride = 1  # Add stride for overlapping sequences
X_sequences = []
y_sequences = []

for i in range(0, len(df) - sequence_length, stride):
    X_sequences.append(X[i:(i + sequence_length)])
    y_sequences.append(y[i + sequence_length])

X_sequences = np.array(X_sequences)
y_sequences = np.array(y_sequences)

# Split the data with more data for training
X_train, X_test, y_train, y_test = train_test_split(
    X_sequences, y_sequences, test_size=0.2, random_state=42
)

# Create improved LSTM model
model = Sequential([
    Input(shape=(sequence_length, len(features))),
    BatchNormalization(),
    LSTM(128, return_sequences=True, kernel_regularizer=l2(0.01)),
    BatchNormalization(),
    Dropout(0.3),
    LSTM(64, return_sequences=True, kernel_regularizer=l2(0.01)),
    BatchNormalization(),
    Dropout(0.3),
    LSTM(32),
    BatchNormalization(),
    Dense(16, activation='relu'),
    Dropout(0.2),
    Dense(1)
])

# Use custom learning rate
optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=optimizer, loss='huber')  # Huber loss for robustness

# Add callbacks
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=15,
    restore_best_weights=True
)

reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2,
    patience=5,
    min_lr=0.0001
)

# Train with larger batch size and more epochs
history = model.fit(
    X_train, y_train,
    epochs=100,
    batch_size=64,
    validation_split=0.2,
    callbacks=[early_stopping, reduce_lr],
    verbose=1
)

# Evaluate on clean test data
y_pred_clean = model.predict(X_test)
y_test_orig = scaler_y.inverse_transform(y_test)
y_pred_clean_orig = scaler_y.inverse_transform(y_pred_clean)

# Inject faults and evaluate
X_test_fault, fault_indices = inject_faults(X_test, fault_percentage=0.2)
y_pred_fault = model.predict(X_test_fault)
y_pred_fault_orig = scaler_y.inverse_transform(y_pred_fault)

# Calculate metrics
metrics_clean = {
    'MAE': mean_absolute_error(y_test_orig, y_pred_clean_orig),
    'RMSE': np.sqrt(mean_squared_error(y_test_orig, y_pred_clean_orig)),
    'R2': r2_score(y_test_orig, y_pred_clean_orig)
}

metrics_fault = {
    'MAE': mean_absolute_error(y_test_orig, y_pred_fault_orig),
    'RMSE': np.sqrt(mean_squared_error(y_test_orig, y_pred_fault_orig)),
    'R2': r2_score(y_test_orig, y_pred_fault_orig)
}

# Print results
print("\nMetrics on Clean Test Data:")
for metric, value in metrics_clean.items():
    print(f"{metric}: {value:.4f}")

print("\nMetrics on Faulty Test Data:")
for metric, value in metrics_fault.items():
    print(f"{metric}: {value:.4f}")

# Visualize results
plt.figure(figsize=(15, 10))

# Plot training history
plt.subplot(2, 1, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Training History')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

# Plot predictions
plt.subplot(2, 1, 2)
plt.plot(y_test_orig[:100], label='Actual', alpha=0.8)
plt.plot(y_pred_clean_orig[:100], label='Predicted (Clean)', alpha=0.8)
plt.plot(y_pred_fault_orig[:100], label='Predicted (Faulty)', alpha=0.8)
plt.title('Predictions (First 100 Samples)')
plt.xlabel('Time Steps')
plt.ylabel('FST')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

# Bi-LSTM

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input, Dropout, LSTM, Bidirectional
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt

def create_bilstm_model(input_shape):
    model = Sequential([
        Input(shape=input_shape),
        Bidirectional(LSTM(64, return_sequences=True)),
        Dropout(0.2),
        Bidirectional(LSTM(32)),
        Dropout(0.2),
        Dense(64, activation='relu'),
        Dense(32, activation='relu'),
        Dense(1)  # Single output for regression
    ])
    return model

# Read and preprocess data
df = pd.read_csv('/kaggle/input/fruit-surface-temperature/FST_analysis.csv')

# Combine Date and Time
df['DateTime'] = pd.to_datetime(df['Date'] + ' ' + df['Time'])
df = df.sort_values('DateTime')

# Add time-based features
df['Hour'] = df['DateTime'].dt.hour
df['DayOfWeek'] = df['DateTime'].dt.dayofweek
df['Month'] = df['DateTime'].dt.month

# Define features and target
features = ['Air Temperature', 'Dew Point', 'Solar Radiation', 'Wind Speed', 
           'Hour', 'DayOfWeek', 'Month']
target = 'FST_EB'

# Handle missing values and outliers
df[features] = df[features].ffill().bfill()

# Remove outliers using IQR method
def remove_outliers(df, columns):
    for col in columns:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        df[col] = df[col].clip(lower_bound, upper_bound)
    return df

df = remove_outliers(df, features)

# Scale the data
scaler_X = RobustScaler()
scaler_y = RobustScaler()

X = scaler_X.fit_transform(df[features])
y = scaler_y.fit_transform(df[[target]])

# Create sequences
sequence_length = 48
stride = 1
X_sequences = []
y_sequences = []

for i in range(0, len(df) - sequence_length, stride):
    X_sequences.append(X[i:(i + sequence_length)])
    y_sequences.append(y[i + sequence_length])

X_sequences = np.array(X_sequences)
y_sequences = np.array(y_sequences)

# Split the data
X_train, X_test, y_train, y_test = train_test_split(
    X_sequences, y_sequences, test_size=0.2, random_state=42
)

# Create and compile Bi-LSTM model
model = create_bilstm_model((X_train.shape[1], X_train.shape[2]))
optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=optimizer, loss='huber')

# Callbacks
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=15,
    restore_best_weights=True
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2,
    patience=5,
    min_lr=0.0001
)

# Train the model
history = model.fit(
    X_train, y_train,
    epochs=100,
    batch_size=64,
    validation_split=0.2,
    callbacks=[early_stopping, reduce_lr],
    verbose=1
)
# Evaluate on clean test data
y_pred_clean = model.predict(X_test)
y_test_orig = scaler_y.inverse_transform(y_test)
y_pred_clean_orig = scaler_y.inverse_transform(y_pred_clean)

# Inject faults and evaluate
X_test_fault, fault_indices = inject_faults(X_test, fault_percentage=0.2)
y_pred_fault = model.predict(X_test_fault)
y_pred_fault_orig = scaler_y.inverse_transform(y_pred_fault)

# Calculate metrics
metrics_clean = {
    'MAE': mean_absolute_error(y_test_orig, y_pred_clean_orig),
    'RMSE': np.sqrt(mean_squared_error(y_test_orig, y_pred_clean_orig)),
    'R2': r2_score(y_test_orig, y_pred_clean_orig)
}

metrics_fault = {
    'MAE': mean_absolute_error(y_test_orig, y_pred_fault_orig),
    'RMSE': np.sqrt(mean_squared_error(y_test_orig, y_pred_fault_orig)),
    'R2': r2_score(y_test_orig, y_pred_fault_orig)
}

# Print results
print("\nMetrics on Clean Test Data:")
for metric, value in metrics_clean.items():
    print(f"{metric}: {value:.4f}")

print("\nMetrics on Faulty Test Data:")
for metric, value in metrics_fault.items():
    print(f"{metric}: {value:.4f}")

# Visualize results
plt.figure(figsize=(15, 10))

# Plot training history
plt.subplot(2, 1, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Training History')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

# Plot predictions
plt.subplot(2, 1, 2)
plt.plot(y_test_orig[:100], label='Actual', alpha=0.8)
plt.plot(y_pred_clean_orig[:100], label='Predicted (Clean)', alpha=0.8)
plt.plot(y_pred_fault_orig[:100], label='Predicted (Faulty)', alpha=0.8)
plt.title('Predictions (First 100 Samples)')
plt.xlabel('Time Steps')
plt.ylabel('FST')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

# GRU

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input, Dropout, GRU
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt

def create_gru_model(input_shape):
    model = Sequential([
        Input(shape=input_shape),
        GRU(64, return_sequences=True),
        Dropout(0.2),
        GRU(32),
        Dropout(0.2),
        Dense(64, activation='relu'),
        Dense(32, activation='relu'),
        Dense(1)  # Single output for regression
    ])
    return model

# Read and preprocess data
df = pd.read_csv('/kaggle/input/fruit-surface-temperature/FST_analysis.csv')

# Combine Date and Time
df['DateTime'] = pd.to_datetime(df['Date'] + ' ' + df['Time'])
df = df.sort_values('DateTime')

# Add time-based features
df['Hour'] = df['DateTime'].dt.hour
df['DayOfWeek'] = df['DateTime'].dt.dayofweek
df['Month'] = df['DateTime'].dt.month

# Define features and target
features = ['Air Temperature', 'Dew Point', 'Solar Radiation', 'Wind Speed', 
           'Hour', 'DayOfWeek', 'Month']
target = 'FST_EB'

# Handle missing values and outliers
df[features] = df[features].ffill().bfill()

# Remove outliers using IQR method
def remove_outliers(df, columns):
    for col in columns:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        df[col] = df[col].clip(lower_bound, upper_bound)
    return df

df = remove_outliers(df, features)

# Scale the data
scaler_X = RobustScaler()
scaler_y = RobustScaler()

X = scaler_X.fit_transform(df[features])
y = scaler_y.fit_transform(df[[target]])

# Create sequences
sequence_length = 48
stride = 1
X_sequences = []
y_sequences = []

for i in range(0, len(df) - sequence_length, stride):
    X_sequences.append(X[i:(i + sequence_length)])
    y_sequences.append(y[i + sequence_length])

X_sequences = np.array(X_sequences)
y_sequences = np.array(y_sequences)

# Split the data
X_train, X_test, y_train, y_test = train_test_split(
    X_sequences, y_sequences, test_size=0.2, random_state=42
)

# Create and compile GRU model
model = create_gru_model((X_train.shape[1], X_train.shape[2]))
optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=optimizer, loss='huber')

# Callbacks
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=15,
    restore_best_weights=True
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2,
    patience=5,
    min_lr=0.0001
)

# Train the model
history = model.fit(
    X_train, y_train,
    epochs=100,
    batch_size=64,
    validation_split=0.2,
    callbacks=[early_stopping, reduce_lr],
    verbose=1
)

# Evaluate on clean test data
y_pred_clean = model.predict(X_test)
y_test_orig = scaler_y.inverse_transform(y_test)
y_pred_clean_orig = scaler_y.inverse_transform(y_pred_clean)

# Inject faults and evaluate
X_test_fault, fault_indices = inject_faults(X_test, fault_percentage=0.2)
y_pred_fault = model.predict(X_test_fault)
y_pred_fault_orig = scaler_y.inverse_transform(y_pred_fault)

# Calculate metrics
metrics_clean = {
    'MAE': mean_absolute_error(y_test_orig, y_pred_clean_orig),
    'RMSE': np.sqrt(mean_squared_error(y_test_orig, y_pred_clean_orig)),
    'R2': r2_score(y_test_orig, y_pred_clean_orig)
}

metrics_fault = {
    'MAE': mean_absolute_error(y_test_orig, y_pred_fault_orig),
    'RMSE': np.sqrt(mean_squared_error(y_test_orig, y_pred_fault_orig)),
    'R2': r2_score(y_test_orig, y_pred_fault_orig)
}

# Print results
print("\nMetrics on Clean Test Data:")
for metric, value in metrics_clean.items():
    print(f"{metric}: {value:.4f}")

print("\nMetrics on Faulty Test Data:")
for metric, value in metrics_fault.items():
    print(f"{metric}: {value:.4f}")

# Visualize results
plt.figure(figsize=(15, 10))

# Plot training history
plt.subplot(2, 1, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Training History')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

# Plot predictions
plt.subplot(2, 1, 2)
plt.plot(y_test_orig[:100], label='Actual', alpha=0.8)
plt.plot(y_pred_clean_orig[:100], label='Predicted (Clean)', alpha=0.8)
plt.plot(y_pred_fault_orig[:100], label='Predicted (Faulty)', alpha=0.8)
plt.title('Predictions (First 100 Samples)')
plt.xlabel('Time Steps')
plt.ylabel('FST')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

# TST

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import tensorflow as tf
from tensorflow.keras.layers import *
from tensorflow.keras import Model
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt

# Custom Layer definitions (copying from your provided code)
class MultiHeadSelfAttention(Layer):
    def __init__(self, d_model, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.d_model = d_model
        self.num_heads = num_heads
        self.depth = d_model // num_heads
        
        self.wq = Dense(d_model)
        self.wk = Dense(d_model)
        self.wv = Dense(d_model)
        self.dense = Dense(d_model)
    
    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])
    
    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        
        q = self.wq(inputs)
        k = self.wk(inputs)
        v = self.wv(inputs)
        
        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)
        
        scaled_attention = tf.matmul(q, k, transpose_b=True)
        scaled_attention = scaled_attention / tf.math.sqrt(tf.cast(self.depth, tf.float32))
        
        attention_weights = tf.nn.softmax(scaled_attention, axis=-1)
        output = tf.matmul(attention_weights, v)
        
        output = tf.transpose(output, perm=[0, 2, 1, 3])
        output = tf.reshape(output, (batch_size, -1, self.d_model))
        
        return self.dense(output)

class TransformerBlock(Layer):
    def __init__(self, d_model, num_heads, dff, dropout=0.1, **kwargs):
        super().__init__(**kwargs)
        self.d_model = d_model
        self.num_heads = num_heads
        self.dff = dff
        self.dropout_rate = dropout
        
        self.mha = MultiHeadSelfAttention(d_model, num_heads)
        self.ffn = tf.keras.Sequential([
            Dense(dff, activation='relu'),
            Dense(d_model)
        ])
        
        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        
        self.dropout1 = Dropout(dropout)
        self.dropout2 = Dropout(dropout)
    
    def call(self, inputs, training=False):
        attn_output = self.mha(inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

# Modified TimeSeriesTransformer for regression
class TimeSeriesTransformer(Model):
    def __init__(self, 
                 num_layers,
                 d_model,
                 num_heads,
                 dff,
                 max_seq_len,
                 num_features,
                 dropout_rate=0.1):
        super().__init__()
        
        self.num_layers = num_layers
        self.d_model = d_model
        self.num_heads = num_heads
        self.max_seq_len = max_seq_len
        
        # Input projection
        self.input_projection = Dense(d_model)
        
        # Transformer blocks
        self.transformer_blocks = [
            TransformerBlock(d_model, num_heads, dff, dropout_rate)
            for _ in range(num_layers)
        ]
        
        # Output layers for regression
        self.dropout = Dropout(dropout_rate)
        self.global_pooling = GlobalAveragePooling1D()
        self.final_layer = Dense(1)  # Single output for regression
        
    def call(self, inputs, training=False):
        # Input projection
        x = self.input_projection(inputs)
        
        # Apply transformer blocks
        for transformer_block in self.transformer_blocks:
            x = transformer_block(x, training=training)
        
        # Global pooling
        x = self.global_pooling(x)
        x = self.dropout(x, training=training)
        
        # Final regression output
        return self.final_layer(x)

# Read and preprocess data
df = pd.read_csv('/kaggle/input/fruit-surface-temperature/FST_analysis.csv')

# Combine Date and Time
df['DateTime'] = pd.to_datetime(df['Date'] + ' ' + df['Time'])
df = df.sort_values('DateTime')

# Add time-based features
df['Hour'] = df['DateTime'].dt.hour
df['DayOfWeek'] = df['DateTime'].dt.dayofweek
df['Month'] = df['DateTime'].dt.month

# Define features and target
features = ['Air Temperature', 'Dew Point', 'Solar Radiation', 'Wind Speed', 
           'Hour', 'DayOfWeek', 'Month']
target = 'FST_EB'

# Handle missing values and outliers
df[features] = df[features].ffill().bfill()

# Remove outliers using IQR method
def remove_outliers(df, columns):
    for col in columns:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        df[col] = df[col].clip(lower_bound, upper_bound)
    return df

df = remove_outliers(df, features)

# Scale the data
scaler_X = RobustScaler()
scaler_y = RobustScaler()

X = scaler_X.fit_transform(df[features])
y = scaler_y.fit_transform(df[[target]])

# Create sequences
sequence_length = 48
stride = 1
X_sequences = []
y_sequences = []

for i in range(0, len(df) - sequence_length, stride):
    X_sequences.append(X[i:(i + sequence_length)])
    y_sequences.append(y[i + sequence_length])

X_sequences = np.array(X_sequences)
y_sequences = np.array(y_sequences)

# Split the data
X_train, X_test, y_train, y_test = train_test_split(
    X_sequences, y_sequences, test_size=0.2, random_state=42
)

# Model parameters
num_layers = 4
d_model = 128
num_heads = 8
dff = 256
max_seq_len = sequence_length
num_features = X_train.shape[2]
dropout_rate = 0.1

# Create and compile the TST model
model = TimeSeriesTransformer(
    num_layers=num_layers,
    d_model=d_model,
    num_heads=num_heads,
    dff=dff,
    max_seq_len=max_seq_len,
    num_features=num_features,
    dropout_rate=dropout_rate
)

# Compile the model
optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=optimizer, loss='huber')

# Callbacks
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=15,
    restore_best_weights=True
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2,
    patience=5,
    min_lr=0.0001
)

# Train the model
history = model.fit(
    X_train, y_train,
    epochs=100,
    batch_size=64,
    validation_split=0.2,
    callbacks=[early_stopping, reduce_lr],
    verbose=1
)


# Evaluate on clean test data
y_pred_clean = model.predict(X_test)
y_test_orig = scaler_y.inverse_transform(y_test)
y_pred_clean_orig = scaler_y.inverse_transform(y_pred_clean)

# Inject faults and evaluate
X_test_fault, fault_indices = inject_faults(X_test, fault_percentage=0.2)
y_pred_fault = model.predict(X_test_fault)
y_pred_fault_orig = scaler_y.inverse_transform(y_pred_fault)

# Calculate metrics
metrics_clean = {
    'MAE': mean_absolute_error(y_test_orig, y_pred_clean_orig),
    'RMSE': np.sqrt(mean_squared_error(y_test_orig, y_pred_clean_orig)),
    'R2': r2_score(y_test_orig, y_pred_clean_orig)
}

metrics_fault = {
    'MAE': mean_absolute_error(y_test_orig, y_pred_fault_orig),
    'RMSE': np.sqrt(mean_squared_error(y_test_orig, y_pred_fault_orig)),
    'R2': r2_score(y_test_orig, y_pred_fault_orig)
}

# Print results
print("\nMetrics on Clean Test Data:")
for metric, value in metrics_clean.items():
    print(f"{metric}: {value:.4f}")

print("\nMetrics on Faulty Test Data:")
for metric, value in metrics_fault.items():
    print(f"{metric}: {value:.4f}")

# Visualize results
plt.figure(figsize=(15, 10))

# Plot training history
plt.subplot(2, 1, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Training History')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

# Plot predictions
plt.subplot(2, 1, 2)
plt.plot(y_test_orig[:100], label='Actual', alpha=0.8)
plt.plot(y_pred_clean_orig[:100], label='Predicted (Clean)', alpha=0.8)
plt.plot(y_pred_fault_orig[:100], label='Predicted (Faulty)', alpha=0.8)
plt.title('Predictions (First 100 Samples)')
plt.xlabel('Time Steps')
plt.ylabel('FST')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

# Informer

In [None]:
import tensorflow as tf
from tensorflow.keras import Model, layers
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import pandas as pd
import matplotlib.pyplot as plt

# Read and preprocess data
df = pd.read_csv('FST_analysis.csv')

# Combine Date and Time
df['DateTime'] = pd.to_datetime(df['Date'] + ' ' + df['Time'])
df = df.sort_values('DateTime')

# Add time-based features
df['Hour'] = df['DateTime'].dt.hour
df['DayOfWeek'] = df['DateTime'].dt.dayofweek
df['Month'] = df['DateTime'].dt.month

# Define features and target
features = ['Air Temperature', 'Dew Point', 'Solar Radiation', 'Wind Speed', 
           'Hour', 'DayOfWeek', 'Month']
target = 'FST_EB'

# Handle missing values
df[features] = df[features].ffill().bfill()

# Remove outliers using IQR method
for col in features:
    Q1 = df[col].quantile(0.25)
    Q3 = df[col].quantile(0.75)
    IQR = Q3 - Q1
    df[col] = df[col].clip(Q1 - 1.5 * IQR, Q3 + 1.5 * IQR)

# Scale the data
scaler_X = RobustScaler()
scaler_y = RobustScaler()

X = scaler_X.fit_transform(df[features])
y = scaler_y.fit_transform(df[[target]])

# Create sequences
sequence_length = 48
stride = 1
X_sequences = []
y_sequences = []

for i in range(0, len(df) - sequence_length, stride):
    X_sequences.append(X[i:(i + sequence_length)])
    y_sequences.append(y[i + sequence_length])

X_sequences = np.array(X_sequences)
y_sequences = np.array(y_sequences)

# Split the data
X_train, X_test, y_train, y_test = train_test_split(
    X_sequences, y_sequences, test_size=0.2, random_state=42
)

class PositionalEncoding(layers.Layer):
    def __init__(self, max_steps, d_model, **kwargs):
        super().__init__(**kwargs)
        self.max_steps = max_steps
        self.d_model = d_model
        
        # Create positional encoding matrix once during initialization
        position = tf.range(max_steps, dtype=tf.float32)[:, tf.newaxis]
        div_term = tf.exp(tf.range(0, d_model, 2, dtype=tf.float32) * -(tf.math.log(10000.0) / d_model))
        
        pe = tf.zeros((max_steps, d_model))
        # Use scatter_nd to update sine values
        sine_indices = tf.stack([
            tf.repeat(tf.range(max_steps), tf.shape(div_term)),
            tf.tile(tf.range(0, d_model, 2), [max_steps])
        ], axis=1)
        sine_updates = tf.reshape(tf.sin(position * div_term), [-1])
        pe = tf.tensor_scatter_nd_update(pe, sine_indices, sine_updates)
        
        # Use scatter_nd to update cosine values
        if d_model > 1:
            cosine_indices = tf.stack([
                tf.repeat(tf.range(max_steps), tf.shape(div_term)),
                tf.tile(tf.range(1, d_model, 2), [max_steps])
            ], axis=1)
            cosine_updates = tf.reshape(tf.cos(position * div_term), [-1])
            pe = tf.tensor_scatter_nd_update(pe, cosine_indices, cosine_updates)
        
        self.pe = pe[tf.newaxis, :, :]  # Add batch dimension
        
    def call(self, inputs):
        return inputs + self.pe[:, :tf.shape(inputs)[1], :]
    
    def get_config(self):
        config = super().get_config()
        config.update({
            "max_steps": self.max_steps,
            "d_model": self.d_model
        })
        return config



class ProbSparseAttention(layers.Layer):
    def __init__(self, d_model, num_heads, factor=5, **kwargs):
        super().__init__(**kwargs)
        self.d_model = d_model
        self.num_heads = num_heads
        self.factor = factor
        self.depth = d_model // num_heads
        
        self.wq = layers.Dense(d_model)
        self.wk = layers.Dense(d_model)
        self.wv = layers.Dense(d_model)
        self.dense = layers.Dense(d_model)
    
    def _prob_QK(self, Q, K, sample_k):
        B, H, L_Q, D = tf.shape(Q)[0], tf.shape(Q)[1], tf.shape(Q)[2], tf.shape(Q)[3]
        L_K = tf.shape(K)[2]
        
        Q_K = tf.matmul(Q, K, transpose_b=True)
        Q_K = Q_K / tf.math.sqrt(tf.cast(self.depth, tf.float32))
        
        M = tf.math.reduce_max(Q_K, axis=-1, keepdims=True)
        Q_K = Q_K - M
        Q_K = tf.exp(Q_K)
        
        sample_size = tf.minimum(L_K, sample_k)
        mean_attention = tf.reduce_mean(Q_K, axis=2)
        _, indices = tf.nn.top_k(mean_attention, k=sample_size)
        
        return indices
    
    def call(self, inputs, training=None):
        batch_size = tf.shape(inputs)[0]
        seq_len = tf.shape(inputs)[1]
        
        Q = self.wq(inputs)
        K = self.wk(inputs)
        V = self.wv(inputs)
        
        Q = tf.reshape(Q, (batch_size, -1, self.num_heads, self.depth))
        Q = tf.transpose(Q, perm=[0, 2, 1, 3])
        K = tf.reshape(K, (batch_size, -1, self.num_heads, self.depth))
        K = tf.transpose(K, perm=[0, 2, 1, 3])
        V = tf.reshape(V, (batch_size, -1, self.num_heads, self.depth))
        V = tf.transpose(V, perm=[0, 2, 1, 3])
        
        L_K = tf.shape(K)[2]
        sample_k = tf.cast(tf.math.log(tf.cast(L_K, tf.float32)) * self.factor, tf.int32)
        sample_k = tf.minimum(sample_k, L_K)
        
        indices = self._prob_QK(Q, K, sample_k)
        
        batch_indices = tf.range(batch_size)[:, tf.newaxis, tf.newaxis]
        batch_indices = tf.tile(batch_indices, [1, self.num_heads, sample_k])
        head_indices = tf.range(self.num_heads)[tf.newaxis, :, tf.newaxis]
        head_indices = tf.tile(head_indices, [batch_size, 1, sample_k])
        
        gather_indices = tf.stack([batch_indices, head_indices, indices], axis=-1)
        
        K_sampled = tf.gather_nd(K, gather_indices)
        V_sampled = tf.gather_nd(V, gather_indices)
        
        attention_scores = tf.matmul(Q, K_sampled, transpose_b=True)
        attention_scores = attention_scores / tf.math.sqrt(tf.cast(self.depth, tf.float32))
        
        attention_weights = tf.nn.softmax(attention_scores, axis=-1)
        output = tf.matmul(attention_weights, V_sampled)
        
        output = tf.transpose(output, perm=[0, 2, 1, 3])
        output = tf.reshape(output, (batch_size, -1, self.d_model))
        
        return self.dense(output)

class InformerBlock(layers.Layer):
    def __init__(self, d_model, num_heads, dff, dropout=0.1, factor=5, **kwargs):
        super().__init__(**kwargs)
        self.d_model = d_model
        self.num_heads = num_heads
        self.dff = dff
        self.dropout_rate = dropout
        self.factor = factor
        
        self.prob_attention = ProbSparseAttention(d_model, num_heads, factor)
        self.ffn = tf.keras.Sequential([
            layers.Dense(dff, activation='relu'),
            layers.Dense(d_model)
        ])
        
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        
        self.dropout1 = layers.Dropout(dropout)
        self.dropout2 = layers.Dropout(dropout)
    
    def call(self, inputs, training=None):
        attn_output = self.prob_attention(inputs, training=training)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

class TimeSeriesInformer(Model):
    def __init__(self, 
                 num_layers,
                 d_model,
                 num_heads,
                 dff,
                 max_seq_len,
                 num_features,
                 num_classes,
                 dropout_rate=0.1,
                 factor=5):
        super().__init__()
        
        self.num_layers = num_layers
        self.d_model = d_model
        self.num_heads = num_heads
        self.max_seq_len = max_seq_len
        
        self.input_projection = layers.Dense(d_model)
        self.pos_encoding = PositionalEncoding(max_seq_len, d_model)
        
        self.informer_blocks = [
            InformerBlock(d_model, num_heads, dff, dropout_rate, factor)
            for _ in range(num_layers)
        ]
        
        self.dropout = layers.Dropout(dropout_rate)
        self.global_pooling = layers.GlobalAveragePooling1D()
        self.final_layer = layers.Dense(num_classes, activation='softmax')
        
    def call(self, inputs, training=None):
        x = self.input_projection(inputs)
        x = self.pos_encoding(x)
        
        for informer_block in self.informer_blocks:
            x = informer_block(x, training=training)
        
        x = self.global_pooling(x)
        x = self.dropout(x, training=training)
        
        return self.final_layer(x)


# Model parameters
num_layers = 4
d_model = 128
num_heads = 8
dff = 256
max_seq_len = sequence_length
num_features = X_train.shape[2]
num_classes = len(class_encoding)
dropout_rate = 0.1
factor = 5

# Create and compile the Informer model
informer = TimeSeriesInformer(
    num_layers=num_layers,
    d_model=d_model,
    num_heads=num_heads,
    dff=dff,
    max_seq_len=max_seq_len,
    num_features=num_features,
    num_classes=num_classes,
    dropout_rate=dropout_rate,
    factor=factor
)


# Compile model
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
informer.compile(optimizer=optimizer, loss='huber')

# Callbacks
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=15,
    restore_best_weights=True
)

reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2,
    patience=5,
    min_lr=0.0001
)

# Train model
history = informer.fit(
    X_train, y_train,
    epochs=100,
    batch_size=64,
    validation_split=0.2,
    callbacks=[early_stopping, reduce_lr],
    verbose=1
)

# Evaluate on clean test data
y_pred_clean = model.predict(X_test)
y_test_orig = scaler_y.inverse_transform(y_test)
y_pred_clean_orig = scaler_y.inverse_transform(y_pred_clean)

# Inject faults and evaluate
X_test_fault, fault_indices = inject_faults(X_test, fault_percentage=0.2)
y_pred_fault = model.predict(X_test_fault)
y_pred_fault_orig = scaler_y.inverse_transform(y_pred_fault)

# Calculate metrics
metrics_clean = {
    'MAE': mean_absolute_error(y_test_orig, y_pred_clean_orig),
    'RMSE': np.sqrt(mean_squared_error(y_test_orig, y_pred_clean_orig)),
    'R2': r2_score(y_test_orig, y_pred_clean_orig)
}

metrics_fault = {
    'MAE': mean_absolute_error(y_test_orig, y_pred_fault_orig),
    'RMSE': np.sqrt(mean_squared_error(y_test_orig, y_pred_fault_orig)),
    'R2': r2_score(y_test_orig, y_pred_fault_orig)
}

# Print results
print("\nMetrics on Clean Test Data:")
for metric, value in metrics_clean.items():
    print(f"{metric}: {value:.4f}")

print("\nMetrics on Faulty Test Data:")
for metric, value in metrics_fault.items():
    print(f"{metric}: {value:.4f}")

# Visualize results
plt.figure(figsize=(15, 10))

# Plot training history
plt.subplot(2, 1, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Training History')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

# Plot predictions
plt.subplot(2, 1, 2)
plt.plot(y_test_orig[:100], label='Actual', alpha=0.8)
plt.plot(y_pred_clean_orig[:100], label='Predicted (Clean)', alpha=0.8)
plt.plot(y_pred_fault_orig[:100], label='Predicted (Faulty)', alpha=0.8)
plt.title('Predictions (First 100 Samples)')
plt.xlabel('Time Steps')
plt.ylabel('FST')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

# TST-AE

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import tensorflow as tf
from tensorflow.keras import Model, layers
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt

# [Previous class implementations remain the same]
class MultiHeadSelfAttention(layers.Layer):
    def __init__(self, d_model, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.d_model = d_model
        self.num_heads = num_heads
        self.depth = d_model // num_heads
        
        self.wq = layers.Dense(d_model)
        self.wk = layers.Dense(d_model)
        self.wv = layers.Dense(d_model)
        self.dense = layers.Dense(d_model)
    
    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])
    
    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        
        q = self.wq(inputs)
        k = self.wk(inputs)
        v = self.wv(inputs)
        
        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)
        
        scaled_attention = tf.matmul(q, k, transpose_b=True)
        scaled_attention = scaled_attention / tf.math.sqrt(tf.cast(self.depth, tf.float32))
        
        attention_weights = tf.nn.softmax(scaled_attention, axis=-1)
        output = tf.matmul(attention_weights, v)
        
        output = tf.transpose(output, perm=[0, 2, 1, 3])
        output = tf.reshape(output, (batch_size, -1, self.d_model))
        
        return self.dense(output)
    
    def get_config(self):
        config = super().get_config()
        config.update({
            "d_model": self.d_model,
            "num_heads": self.num_heads
        })
        return config

class TransformerBlock(layers.Layer):
    def __init__(self, d_model, num_heads, dff, dropout=0.1, **kwargs):
        super().__init__(**kwargs)
        self.d_model = d_model
        self.num_heads = num_heads
        self.dff = dff
        self.dropout_rate = dropout
        
        self.mha = MultiHeadSelfAttention(d_model, num_heads)
        self.ffn = tf.keras.Sequential([
            layers.Dense(dff, activation='relu'),
            layers.Dense(d_model)
        ])
        
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        
        self.dropout1 = layers.Dropout(dropout)
        self.dropout2 = layers.Dropout(dropout)
    
    def call(self, inputs, training=False):
        attn_output = self.mha(inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)
    
    def get_config(self):
        config = super().get_config()
        config.update({
            "d_model": self.d_model,
            "num_heads": self.num_heads,
            "dff": self.dff,
            "dropout": self.dropout_rate
        })
        return config

class PositionalEncoding(layers.Layer):
    def __init__(self, max_steps, d_model, **kwargs):
        super().__init__(**kwargs)
        self.max_steps = max_steps
        self.d_model = d_model
        
        position = tf.range(max_steps, dtype=tf.float32)[:, tf.newaxis]
        div_term = tf.exp(tf.range(0, d_model, 2, dtype=tf.float32) * -(tf.math.log(10000.0) / d_model))
        
        pe = tf.zeros((max_steps, d_model))
        sine_indices = tf.stack([
            tf.repeat(tf.range(max_steps), tf.shape(div_term)),
            tf.tile(tf.range(0, d_model, 2), [max_steps])
        ], axis=1)
        sine_updates = tf.reshape(tf.sin(position * div_term), [-1])
        pe = tf.tensor_scatter_nd_update(pe, sine_indices, sine_updates)
        
        if d_model > 1:
            cosine_indices = tf.stack([
                tf.repeat(tf.range(max_steps), tf.shape(div_term)),
                tf.tile(tf.range(1, d_model, 2), [max_steps])
            ], axis=1)
            cosine_updates = tf.reshape(tf.cos(position * div_term), [-1])
            pe = tf.tensor_scatter_nd_update(pe, cosine_indices, cosine_updates)
        
        self.pe = pe[tf.newaxis, :, :]
        
    def call(self, inputs):
        return inputs + self.pe[:, :tf.shape(inputs)[1], :]
    
    def get_config(self):
        config = super().get_config()
        config.update({
            "max_steps": self.max_steps,
            "d_model": self.d_model
        })
        return config

# Modified Time Series Transformer Autoencoder for Regression
class TimeSeriesTransformerAutoencoder(Model):
    def __init__(self, 
                 num_layers,
                 d_model,
                 num_heads,
                 dff,
                 max_seq_len,
                 num_features,
                 dropout_rate=0.1):
        super().__init__()
        
        self.num_layers = num_layers
        self.d_model = d_model
        self.num_heads = num_heads
        self.max_seq_len = max_seq_len
        self.num_features = num_features
        
        # Input projection
        self.input_projection = layers.Dense(d_model)
        
        # Positional encoding
        self.pos_encoding = PositionalEncoding(max_seq_len, d_model)
        
        # Encoder transformer blocks
        self.encoder_blocks = [
            TransformerBlock(d_model, num_heads, dff, dropout_rate)
            for _ in range(num_layers)
        ]
        
        # Bottleneck
        self.bottleneck = layers.Dense(d_model)
        
        # Decoder transformer blocks
        self.decoder_blocks = [
            TransformerBlock(d_model, num_heads, dff, dropout_rate)
            for _ in range(num_layers)
        ]
        
        # Reconstruction output
        self.reconstruction_layer = layers.Dense(num_features)
        
        # Regression layers
        self.global_pooling = layers.GlobalAveragePooling1D()
        self.regression_dense1 = layers.Dense(128, activation='relu')
        self.dropout = layers.Dropout(dropout_rate)
        self.regression_dense2 = layers.Dense(64, activation='relu')
        self.regression_layer = layers.Dense(1)  # Single output for regression
    
    def call(self, inputs, training=False):
        # Input projection and positional encoding
        x = self.input_projection(inputs)
        x = self.pos_encoding(x)
        
        # Encoder
        for encoder_block in self.encoder_blocks:
            x = encoder_block(x, training=training)
        
        # Store encoded representation
        encoded = x
        
        # Regression branch
        reg_features = self.global_pooling(encoded)
        reg_features = self.regression_dense1(reg_features)
        reg_features = self.dropout(reg_features, training=training)
        reg_features = self.regression_dense2(reg_features)
        predicted = self.regression_layer(reg_features)
        
        # Decoder branch
        decoder_features = self.bottleneck(encoded)
        for decoder_block in self.decoder_blocks:
            decoder_features = decoder_block(decoder_features, training=training)
        reconstructed = self.reconstruction_layer(decoder_features)
        
        return {
            'reconstruction_output': reconstructed,
            'regression_output': predicted
        }

# Data preprocessing (same as before)
# [Previous data preprocessing code remains the same until model creation]

# Model parameters
num_layers = 4
d_model = 128
num_heads = 8
dff = 256
max_seq_len = sequence_length
num_features = X_train.shape[2]
dropout_rate = 0.1

# Create model
model = TimeSeriesTransformerAutoencoder(
    num_layers=num_layers,
    d_model=d_model,
    num_heads=num_heads,
    dff=dff,
    max_seq_len=max_seq_len,
    num_features=num_features,
    dropout_rate=dropout_rate
)

# Compile model with combined loss
optimizer = Adam(learning_rate=0.001)
model.compile(
    optimizer=optimizer,
    loss={
        'reconstruction_output': 'mse',
        'regression_output': 'huber'
    },
    loss_weights={
        'reconstruction_output': 0.3,
        'regression_output': 0.7
    }
)


# Callbacks
early_stopping = EarlyStopping(
    monitor='val_loss',  # Changed from val_regression_output_loss
    mode='min',
    patience=15,
    restore_best_weights=True
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',  # Changed from val_regression_output_loss
    mode='min',
    factor=0.2,
    patience=5,
    min_lr=0.0001
)
# Train the model
history = model.fit(
    X_train,
    {
        'reconstruction_output': X_train,
        'regression_output': y_train
    },
    epochs=100,
    batch_size=64,
    validation_split=0.2,
    callbacks=[early_stopping, reduce_lr],
    verbose=1
)

# Evaluate clean data
predictions_clean = model.predict(X_test)
y_pred_clean = predictions_clean['regression_output']
y_test_orig = scaler_y.inverse_transform(y_test)
y_pred_clean_orig = scaler_y.inverse_transform(y_pred_clean)

# Evaluate faulty data
X_test_fault, fault_indices = inject_faults(X_test, fault_percentage=0.2)
predictions_fault = model.predict(X_test_fault)
y_pred_fault = predictions_fault['regression_output']
y_pred_fault_orig = scaler_y.inverse_transform(y_pred_fault)

# Calculate metrics
metrics_clean = {
    'MAE': mean_absolute_error(y_test_orig, y_pred_clean_orig),
    'RMSE': np.sqrt(mean_squared_error(y_test_orig, y_pred_clean_orig)),
    'R2': r2_score(y_test_orig, y_pred_clean_orig)
}

metrics_fault = {
    'MAE': mean_absolute_error(y_test_orig, y_pred_fault_orig),
    'RMSE': np.sqrt(mean_squared_error(y_test_orig, y_pred_fault_orig)),
    'R2': r2_score(y_test_orig, y_pred_fault_orig)
}

# Print results
print("\nMetrics on Clean Test Data:")
for metric, value in metrics_clean.items():
    print(f"{metric}: {value:.4f}")

print("\nMetrics on Faulty Test Data:")
for metric, value in metrics_fault.items():
    print(f"{metric}: {value:.4f}")

# Plotting the training history
plt.figure(figsize=(15, 12))

# Plot losses
plt.subplot(3, 1, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Training History - Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

# Plot learning rate
plt.subplot(3, 1, 2)
plt.plot(history.history['learning_rate'], label='Learning Rate')
plt.title('Learning Rate Schedule')
plt.xlabel('Epoch')
plt.ylabel('Learning Rate')
plt.legend()
plt.grid(True)

# Predictions plot
plt.subplot(3, 1, 3)
plt.plot(y_test_orig[:100], label='Actual', alpha=0.8)
plt.plot(y_pred_clean_orig[:100], label='Predicted (Clean)', alpha=0.8)
plt.plot(y_pred_fault_orig[:100], label='Predicted (Faulty)', alpha=0.8)
plt.title('Predictions (First 100 Samples)')
plt.xlabel('Time Steps')
plt.ylabel('FST')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

# LSTM-AE

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import tensorflow as tf
from tensorflow.keras import Model, layers
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt

class LSTMAutoencoder(Model):
    def __init__(self, 
                 sequence_length,
                 num_features,
                 lstm_units=[128, 64],
                 latent_dim=32,
                 dropout_rate=0.2):
        super().__init__()
        self.sequence_length = sequence_length
        self.num_features = num_features
        self.lstm_units = lstm_units
        self.latent_dim = latent_dim
        self.dropout_rate = dropout_rate
        
        # Encoder
        self.encoder_lstm1 = layers.LSTM(lstm_units[0], return_sequences=True)
        self.encoder_dropout1 = layers.Dropout(dropout_rate)
        self.encoder_bn1 = layers.BatchNormalization()
        
        self.encoder_lstm2 = layers.LSTM(lstm_units[1], return_sequences=False)
        self.encoder_dropout2 = layers.Dropout(dropout_rate)
        self.encoder_bn2 = layers.BatchNormalization()
        
        # Bottleneck
        self.bottleneck = layers.Dense(latent_dim)
        
        # Decoder
        self.decoder_repeat = layers.RepeatVector(sequence_length)
        
        self.decoder_lstm1 = layers.LSTM(lstm_units[1], return_sequences=True)
        self.decoder_dropout1 = layers.Dropout(dropout_rate)
        self.decoder_bn1 = layers.BatchNormalization()
        
        self.decoder_lstm2 = layers.LSTM(lstm_units[0], return_sequences=True)
        self.decoder_dropout2 = layers.Dropout(dropout_rate)
        self.decoder_bn2 = layers.BatchNormalization()
        
        # Reconstruction output
        self.reconstruction_layer = layers.Dense(num_features)
        
        # Regression layers
        self.regression_dense1 = layers.Dense(64, activation='relu')
        self.regression_dropout = layers.Dropout(dropout_rate)
        self.regression_bn = layers.BatchNormalization()
        self.regression_dense2 = layers.Dense(32, activation='relu')
        self.regression_output = layers.Dense(1)  # Single output for regression
        
    def call(self, inputs, training=False):
        # Encoder
        x = self.encoder_lstm1(inputs)
        x = self.encoder_dropout1(x, training=training)
        x = self.encoder_bn1(x, training=training)
        
        x = self.encoder_lstm2(x)
        x = self.encoder_dropout2(x, training=training)
        encoded = self.encoder_bn2(x, training=training)
        
        # Bottleneck
        bottleneck = self.bottleneck(encoded)
        
        # Decoder path
        x_decoded = self.decoder_repeat(bottleneck)
        
        x_decoded = self.decoder_lstm1(x_decoded)
        x_decoded = self.decoder_dropout1(x_decoded, training=training)
        x_decoded = self.decoder_bn1(x_decoded, training=training)
        
        x_decoded = self.decoder_lstm2(x_decoded)
        x_decoded = self.decoder_dropout2(x_decoded, training=training)
        x_decoded = self.decoder_bn2(x_decoded, training=training)
        
        reconstructed = self.reconstruction_layer(x_decoded)
        
        # Regression path
        x_reg = self.regression_dense1(encoded)
        x_reg = self.regression_dropout(x_reg, training=training)
        x_reg = self.regression_bn(x_reg, training=training)
        x_reg = self.regression_dense2(x_reg)
        predicted = self.regression_output(x_reg)
        
        return {
            'reconstruction_output': reconstructed,
            'regression_output': predicted
        }

# Read and preprocess data
df = pd.read_csv('/kaggle/input/fruit-surface-temperature/FST_analysis.csv')

# Combine Date and Time
df['DateTime'] = pd.to_datetime(df['Date'] + ' ' + df['Time'])
df = df.sort_values('DateTime')

# Add time-based features
df['Hour'] = df['DateTime'].dt.hour
df['DayOfWeek'] = df['DateTime'].dt.dayofweek
df['Month'] = df['DateTime'].dt.month

# Define features and target
features = ['Air Temperature', 'Dew Point', 'Solar Radiation', 'Wind Speed', 
           'Hour', 'DayOfWeek', 'Month']
target = 'FST_EB'

# Handle missing values and outliers
df[features] = df[features].ffill().bfill()

# Remove outliers using IQR method
def remove_outliers(df, columns):
    for col in columns:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        df[col] = df[col].clip(lower_bound, upper_bound)
    return df

df = remove_outliers(df, features)

# Scale the data
scaler_X = RobustScaler()
scaler_y = RobustScaler()

X = scaler_X.fit_transform(df[features])
y = scaler_y.fit_transform(df[[target]])

# Create sequences
sequence_length = 48
stride = 1
X_sequences = []
y_sequences = []

for i in range(0, len(df) - sequence_length, stride):
    X_sequences.append(X[i:(i + sequence_length)])
    y_sequences.append(y[i + sequence_length])

X_sequences = np.array(X_sequences)
y_sequences = np.array(y_sequences)

# Split the data
X_train, X_test, y_train, y_test = train_test_split(
    X_sequences, y_sequences, test_size=0.2, random_state=42
)

# Create model
model = LSTMAutoencoder(
    sequence_length=sequence_length,
    num_features=len(features),
    lstm_units=[128, 64],
    latent_dim=32,
    dropout_rate=0.2
)

# Compile model
optimizer = Adam(learning_rate=0.001)
model.compile(
    optimizer=optimizer,
    loss={
        'reconstruction_output': 'mse',
        'regression_output': 'huber'
    },
    loss_weights={
        'reconstruction_output': 0.3,
        'regression_output': 0.7
    }
)

early_stopping = EarlyStopping(
    monitor='val_loss',  # Changed from val_regression_output_loss
    mode='min',
    patience=15,
    restore_best_weights=True
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',  # Changed from val_regression_output_loss
    mode='min',
    factor=0.2,
    patience=5,
    min_lr=0.0001
)


# Train the model
history = model.fit(
    X_train,
    {
        'reconstruction_output': X_train,
        'regression_output': y_train
    },
    epochs=100,
    batch_size=64,
    validation_split=0.2,
    callbacks=[early_stopping, reduce_lr],
    verbose=1
)


# Evaluate clean data
predictions_clean = model.predict(X_test)
y_pred_clean = predictions_clean['regression_output']
y_test_orig = scaler_y.inverse_transform(y_test)
y_pred_clean_orig = scaler_y.inverse_transform(y_pred_clean)

# Evaluate faulty data
X_test_fault, fault_indices = inject_faults(X_test, fault_percentage=0.2)
predictions_fault = model.predict(X_test_fault)
y_pred_fault = predictions_fault['regression_output']
y_pred_fault_orig = scaler_y.inverse_transform(y_pred_fault)

# Calculate metrics
metrics_clean = {
    'MAE': mean_absolute_error(y_test_orig, y_pred_clean_orig),
    'RMSE': np.sqrt(mean_squared_error(y_test_orig, y_pred_clean_orig)),
    'R2': r2_score(y_test_orig, y_pred_clean_orig)
}

metrics_fault = {
    'MAE': mean_absolute_error(y_test_orig, y_pred_fault_orig),
    'RMSE': np.sqrt(mean_squared_error(y_test_orig, y_pred_fault_orig)),
    'R2': r2_score(y_test_orig, y_pred_fault_orig)
}

# Print results
print("\nMetrics on Clean Test Data:")
for metric, value in metrics_clean.items():
    print(f"{metric}: {value:.4f}")

print("\nMetrics on Faulty Test Data:")
for metric, value in metrics_fault.items():
    print(f"{metric}: {value:.4f}")

# Plotting the training history
plt.figure(figsize=(15, 12))

# Plot losses
plt.subplot(3, 1, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Training History - Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

# Plot learning rate
plt.subplot(3, 1, 2)
plt.plot(history.history['learning_rate'], label='Learning Rate')
plt.title('Learning Rate Schedule')
plt.xlabel('Epoch')
plt.ylabel('Learning Rate')
plt.legend()
plt.grid(True)

# Predictions plot
plt.subplot(3, 1, 3)
plt.plot(y_test_orig[:100], label='Actual', alpha=0.8)
plt.plot(y_pred_clean_orig[:100], label='Predicted (Clean)', alpha=0.8)
plt.plot(y_pred_fault_orig[:100], label='Predicted (Faulty)', alpha=0.8)
plt.title('Predictions (First 100 Samples)')
plt.xlabel('Time Steps')
plt.ylabel('FST')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()