<a href="https://colab.research.google.com/github/abrahama976/CNN_LSTM_Production_LightWeight/blob/Latest_Testing/Enhanced_Production_Prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#1. Environment Setup & Google Drive Integration
This section mounts Google Drive to access the baseline datasets and provides a secure location to save our training logs and generated synthetic files.

In [2]:
from google.colab import drive
import os

# Mount Google Drive
drive.mount('/content/drive')

# Ensure directories exist
BASE_DIR = "/content/drive/MyDrive/Colab_Data/Production_Prediction/"
SYNTH_DIR = os.path.join(BASE_DIR, "Enhanced_Synthetic")
LOG_DIR = os.path.join(BASE_DIR, "Training_Logs")

os.makedirs(SYNTH_DIR, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)
print("Directories verified and ready.")

üìÇ Mounting Google Drive...
Mounted at /content/drive
‚úÖ Google Drive mounted.


#2. Synthetic Data Generation Engine
This cell creates an independent Python module (enhanced_data_generator.py) in the background. It simulates machine failures, cascading delays, and tool-wear degradation to stress-test the CNN-LSTM network.

In [None]:
%%writefile enhanced_data_generator.py
import numpy as np
import os

def generate_enhanced_data(X_base, y_base, n_samples=20000, save_path=None):
    print(f"[*] Initializing Synthetic Data Generation Engine...")
    N, T, F = X_base.shape

    # Extract statistics and preserve cross-feature correlations
    X_flat = X_base.reshape(-1, F)
    means = np.mean(X_flat, axis=0)
    stds = np.std(X_flat, axis=0)
    cov_matrix = np.cov(X_flat, rowvar=False) + (np.eye(F) * 1e-5)
    L = np.linalg.cholesky(cov_matrix)

    # Base Correlated Synthesis
    Z = np.random.normal(0, 1, size=(n_samples * T, F))
    X_synth_flat = means + Z @ L.T
    X_synth = X_synth_flat.reshape(n_samples, T, F)
    y_synth = np.random.normal(np.mean(y_base), np.std(y_base), size=(n_samples, 1))

    # Pattern A: Machine Failures (15%)
    cascade_indices = np.random.choice(n_samples, int(0.15 * n_samples), replace=False)
    fault_features = np.random.choice(F, 50, replace=False)
    for idx in cascade_indices:
        t_fail = np.random.randint(2, 8)
        cascade_multiplier = np.exp(np.linspace(0, 2, T - t_fail))
        for i, t in enumerate(range(t_fail, T)):
            X_synth[idx, t, fault_features] *= (1.0 + (0.5 * cascade_multiplier[i]))
        y_synth[idx] += np.std(y_base) * np.random.uniform(3.0, 6.0)

    # Pattern B: Tool-Wear (25%)
    wear_indices = np.random.choice(n_samples, int(0.25 * n_samples), replace=False)
    wear_indices.sort()
    force_features = np.random.choice(F, 30, replace=False)
    degradation_curve = np.exp(np.linspace(0, 1.5, len(wear_indices)))
    for i, idx in enumerate(wear_indices):
        X_synth[idx, :, force_features] *= degradation_curve[i]
        if degradation_curve[i] > 2.5:
            y_synth[idx] += np.std(y_base) * np.random.uniform(1.5, 3.5)

    # Pattern C: Shift Distribution Shifts
    shift_size = n_samples // 4
    for shift_id in range(4):
        start_idx, end_idx = shift_id * shift_size, (shift_id + 1) * shift_size
        X_synth[start_idx:end_idx, :, :] += (stds * np.random.normal(0, 0.2))

    if save_path and "baseline" not in save_path.lower():
        np.save(os.path.join(save_path, "X_hist_train_enhanced.npy"), X_synth)
        np.save(os.path.join(save_path, "y_hist_train_enhanced.npy"), y_synth)

    return X_synth, y_synth

#3. Global Configuration & Imports
Defines standard libraries, sets reproducibility seeds, and configures the DATA_MODE routing switch.

In [None]:
import sys
import random
import numpy as np
import pandas as pd
import tensorflow as tf

from tensorflow.keras.layers import (Input, Reshape, Conv1D, MaxPooling1D, LSTM, Dense, Dropout, BatchNormalization, TimeDistributed, MultiHeadAttention)
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import CSVLogger, EarlyStopping # Added CSVLogger for test recording

# Import our new standalone module
import enhanced_data_generator

# Reproducibility settings
SEED = 42
np.random.seed(SEED)
random.seed(SEED)
tf.random.set_seed(SEED)

# ‚öôÔ∏è GLOBAL CONFIG & ROUTING
# Switch this variable to test different data streams
DATA_MODE = "enhanced_synth"    # Options: "baseline", "enhanced_synth", "enhanced_file"

BASELINE_DATA_PATH = "/content/drive/MyDrive/Colab_Data/Production_Prediction/"
ENHANCED_DATA_PATH = "/content/drive/MyDrive/Colab_Data/Production_Prediction/Enhanced_Synthetic/"
ENHANCED_N_SAMPLES = 20000

#4. Data Loading and Routing Logic
Loads the raw historical files and seamlessly routes them through the synthetic augmentation pipeline if requested, guaranteeing the correct $N \times 10 \times 948$ tensor shape.#

In [None]:
def load_baseline_raw(data_path):
    """
    Load the Liu2023-style baseline dataset from.npy files.

    Note:
    - This dataset is **synthetic**, generated by a manufacturing simulator
      designed to approximate the multi-stage machining line described in Liu (2023).
    - Each sample is a sequence of 10 timesteps with 948 features, including:
      machine states, process variables, quality indicators, environmental
      variables, temporal and control parameters.
    """

    print("\nüì• Loading baseline raw data")
    print("=" * 80)

    X_hist_train = np.load(os.path.join(data_path, "X_hist_train.npy"))
    y_hist_train = np.load(os.path.join(data_path, "y_hist_train.npy"))
    X_hist_val = np.load(os.path.join(data_path, "X_hist_val.npy"))
    y_hist_val = np.load(os.path.join(data_path, "y_hist_val.npy"))

    X_curr_train = np.load(os.path.join(data_path, "X_curr_train.npy"))
    y_curr_train = np.load(os.path.join(data_path, "y_curr_train.npy"))
    X_curr_val = np.load(os.path.join(data_path, "X_curr_val.npy"))
    y_curr_val = np.load(os.path.join(data_path, "y_curr_val.npy"))

    print("‚úÖ Loaded arrays:")
    print(f"   X_hist_train: {X_hist_train.shape}, y_hist_train: {y_hist_train.shape}")
    print(f"   X_hist_val  : {X_hist_val.shape}, y_hist_val  : {y_hist_val.shape}")
    print(f"   X_curr_train: {X_curr_train.shape}, y_curr_train: {y_curr_train.shape}")
    print(f"   X_curr_val  : {X_curr_val.shape}, y_curr_val  : {y_curr_val.shape}")

    # Sanity: each batch is (N, 10, 948)
    assert X_hist_train.shape[1:] == (10, 948)
    assert X_hist_val.shape[1:] == (10, 948)
    assert X_curr_train.shape[1:] == (10, 948)
    assert X_curr_val.shape[1:] == (10, 948)

    # Combine historical + current, train + val, into one pool
    X_raw = np.vstack([X_hist_train, X_hist_val, X_curr_train, X_curr_val])
    y_raw = np.vstack([y_hist_train, y_hist_val, y_curr_train, y_curr_val])

    print(f"\nüìä Combined raw dataset:")
    print(f"   X_raw: {X_raw.shape} (N, timesteps, features)")
    print(f"   y_raw: {y_raw.shape}")
    print(f"   Progress range: [{y_raw.min():.3f}, {y_raw.max():.3f}]")
    print("=" * 80)

    return X_raw.astype("float32"), y_raw.astype("float32")

print("‚úÖ Function 'load_baseline_raw()' defined successfully.")

pass

# 1. Always load the baseline data first
print(f"Loading raw baseline data...")
X_raw_base, y_raw_base = load_baseline_raw(BASELINE_DATA_PATH)

# 2. Route the data based on DATA_MODE
if DATA_MODE == "baseline":
    X_train_eval = X_raw_base
    y_train_eval = y_raw_base
    print("-> Mode: 'baseline'. Continuing with unmodified baseline tensors.")

elif DATA_MODE == "enhanced_synth":
    print(f"-> Mode: 'enhanced_synth'. Generating {ENHANCED_N_SAMPLES} samples in-memory...")
    X_train_eval, y_train_eval = enhanced_data_generator.generate_enhanced_data(
        X_base=X_raw_base, y_base=y_raw_base,
        n_samples=ENHANCED_N_SAMPLES, save_path=ENHANCED_DATA_PATH
    )

elif DATA_MODE == "enhanced_file":
    print("-> Mode: 'enhanced_file'. Loading serialized arrays...")
    X_train_eval = np.load(os.path.join(ENHANCED_DATA_PATH, "X_hist_train_enhanced.npy"))
    y_train_eval = np.load(os.path.join(ENHANCED_DATA_PATH, "y_hist_train_enhanced.npy"))

else:
    raise ValueError("Invalid DATA_MODE selected.")

# 3. Shape Verification
print(f"Final Input Tensor Shape  : {X_train_eval.shape}")
print(f"Final Target Tensor Shape : {y_train_eval.shape}")
assert X_train_eval.shape[1:] == (10, 948), "Input must be (N, 10, 948)"

#5. Model Compilation and Tracked Training
Builds the CNN-LSTM with Attention model. Uses Keras CSVLogger to automatically save epoch metrics (loss, accuracy, validation scores) to a CSV file on Google Drive for easy comparison of different experiments.

In [None]:
def build_model(input_shape):
    """Build the CNN‚ÄìAttention‚ÄìLSTM model."""
    inputs = Input(shape=input_shape)

    # Add channel dimension for Conv1D
    x = Reshape((input_shape[0], input_shape[1], 1))(inputs)

    # TimeDistributed CNN blocks
    for filters in [48, 96, 160]:
        x = TimeDistributed(Conv1D(filters, 3, padding='same', activation='relu'))(x)
        x = TimeDistributed(BatchNormalization())(x)
        x = TimeDistributed(MaxPooling1D(2))(x)

    # Global average pooling across spatial dimension
    x = TimeDistributed(GlobalAveragePooling1D())(x)  # (None, timesteps, 160)

    # Multi-head self-attention over timesteps
    attn = MultiHeadAttention(num_heads=4, key_dim=32)(x, x, x)
    attn = Dropout(0.1)(attn)  # small dropout for stability
    x = Add()([x, attn])
    x = LayerNormalization()(x)

    # LSTM stack
    x = LSTM(320, return_sequences=True)(x)
    x = BatchNormalization()(x)
    x = Dropout(0.3)(x)

    x = LSTM(128, return_sequences=False)(x)
    x = BatchNormalization()(x)
    x = Dropout(0.3)(x)

    # Dense head
    x = Dense(96, activation='relu')(x)
    x = Dropout(0.3)(x)
    x = Dense(48, activation='relu')(x)
    outputs = Dense(1, activation='linear')(x)

    model = Model(inputs, outputs)
    return model

print("\nüèóÔ∏è STEP 6: MODEL ARCHITECTURE")
print("=" * 80)

model = build_model(input_shape)
model.compile(
    optimizer=Adam(2e-3),
    loss='mse',
    metrics=['mae', R2Score(name='r2')]
)
model.summary()

n_params = model.count_params()
print(f"\n‚úÖ Model built with {n_params:,} trainable parameters.")
print("=" * 80)

# Set up the CSV Logger to track experiments
import datetime
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
log_file = os.path.join("/content/drive/MyDrive/Colab_Data/Production_Prediction/Training_Logs", f"model_run_{DATA_MODE}_{timestamp}.csv")

csv_logger = CSVLogger(log_file, separator=',', append=False)
early_stop = EarlyStopping(monitor='val_loss', patience=5)

print(f"[*] Training logs will be saved securely to: {log_file}")

# COPY YOUR MODEL.FIT CODE HERE, AND ADD THE CALLBACK
history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    shuffle=True,
    callbacks=[early_stop, reduce_lr],
    verbose=1
    callbacks=[csv_logger, early_stop],
)

#6. AI Diagnostic Assistant (Gemini)
This cell utilizes the built-in Gemini models via the google.colab.ai library. It passes the final evaluation metrics of the CNN-LSTM network to the LLM, which automatically generates a human-readable diagnostic report for factory floor operators.

In [None]:
# Google Colab now provides free access to Gemini models directly via this library
from google.colab import ai

# Assuming 'history' is the output of your model.fit() process
# We extract the final training and validation loss
final_train_loss = history.history['loss'][-1]
final_val_loss = history.history['val_loss'][-1]

# Create a prompt combining our model's data with instructions for the AI
prompt = f"""
You are an expert Industrial IoT Factory Manager.
I have just trained a CNN-LSTM predictive maintenance model on a dataset containing 948 sensor features to predict production progress delays.

Here are the results of the latest training run using data mode: {DATA_MODE}.
- Final Training Loss: {final_train_loss:.4f}
- Final Validation Loss: {final_val_loss:.4f}

Based on these metrics and the fact that we are trying to predict cascading machine failures and tool wear, write a brief, 3-sentence executive summary. Tell me if the model seems to be overfitting, and suggest one practical step the engineering team should take next on the factory floor.
"""

print("Requesting AI Diagnostic Report from Gemini...\n")
response = ai.generate_text(prompt)
print("=== AI FACTORY MANAGER REPORT ===")
print(response.text)