<a href="https://colab.research.google.com/github/KIRTAN28/industry4.0-predictive-maintenance-deep-rl/blob/main/RL_single_agent_code1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import math
from typing import Optional, Union, Tuple, Dict
from collections import deque
import random
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, InputLayer
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError

# --- ENVIRONMENT: PredictiveMaintenanceEnv ---

class PredictiveMaintenanceEnv(gym.Env):
    """
    Observation space (10 features per component):
    [tᵢ, Δt_last, λᵢ, Rᵢ, RULᵢ, MTTFᵢ, Lᵢ, MTTRᵢ, a_prev, Cᵢ]

    Action space (MultiDiscrete[3] * N): 0=NOOP, 1=PM, 2=CM
    """
    metadata = {"render_modes": ["human"], "render_fps": 1}

    def __init__(self, time_step: float = 1.0, render_mode: Optional[str] = None):
        super(PredictiveMaintenanceEnv, self).__init__()

        # Environment Configuration
        self.TIME_STEP = time_step
        self.render_mode = render_mode

        # Termination parameters
        self.MAX_AGE_TERMINATION = 10000.0
        self.RELIABILITY_THRESHOLD = 0.15 # Episode terminates if R < 15%

        # Weibull parameters for components (β = shape, η = scale)
        self.components = {
            #"Acc Piston": {"beta": 2.69, "eta": 3744},
            "Electrodes" : {"beta" : 1.61,"eta" : 2388}
        }

        self.num_components = len(self.components)
        self.component_names = list(self.components.keys())

        # Cost Parameters (ADJUSTED TO USER REQUIREMENTS)
        self.COST_OP_HOUR_BASE = 10.0
        # Increased significantly to incentivize early PM (Requirement 2)
        self.OP_HAZARD_SCALE = 40000.0          # Factor to make operating cost increase aggressively with low hazard rate

        self.COST_PM_FIXED_BASE = 3000.0        # FIXED PM cost
        self.COST_CM_FIXED_BASE = 15000.0       # CM cost remains 15000 Rs

        self.COST_DOWN_HOUR = 500.0
        self.PENALTY_UNPLANNED_FAIL = 100000.0
        self.DOWNTIME_PM_HRS = 8.0
        self.MTTR_R_PENALTY = 5.0               # Factor to make MTTR increase aggressively with low reliability

        # Imperfect Maintenance Parameter (NEW Requirement)
        self.PM_RESTORATION_FACTOR = 0.85 # Max R after PM is 85% of R at time of PM

        # State Initialization
        self.ages = np.zeros(self.num_components)        # Weibull Age (resets/reduces on PM/CM)
        self.time_since_maintenance = np.zeros(self.num_components)
        self.last_action = np.zeros(self.num_components)
        self.load_fraction = np.ones(self.num_components) * 0.8

        # Spaces
        self.action_space = spaces.MultiDiscrete([3] * self.num_components)
        self.observation_space = spaces.Box(
            low=0.0, high=np.inf, shape=(self.num_components, 10), dtype=np.float32,
        )

    # --- Weibull helper functions ---

    def _calculate_age_for_reliability(self, target_R: float, component_index: int) -> float:
        """
        Calculates the age (t) at which reliability R(t) equals target_R
        for a specific component's Weibull parameters.
        """
        params = self.components[self.component_names[component_index]]
        beta, eta = params["beta"], params["eta"]

        # R(t) = exp(-(t/eta)^beta) = target_R
        # (t/eta)^beta = -ln(target_R)
        # t = eta * (-ln(target_R))^(1/beta)
        try:
            # Added small epsilon check: if target_R is 1.0 (or very close), age must be 0.0
            if target_R >= 1.0 - 1e-9 or target_R <= 0.0: return 0.0

            # Ensure target_R is slightly below 1.0 for the logarithm
            target_R = max(1e-9, min(target_R, 1.0 - 1e-9))

            t_prime = eta * ((-math.log(target_R)) ** (1 / beta))
            return t_prime
        except ValueError:
            return 0.0

    def hazard_rate(self, t: float, beta: float, eta: float) -> float:
        """Instantaneous failure rate (Weibull hazard function)."""
        if t <= 0.0: return 0.0
        return (beta / eta) * ((t / eta) ** (beta - 1))

    def reliability(self, t: float, beta: float, eta: float) -> float:
        """Probability of survival until time t."""
        return np.exp(-((t / eta) ** beta))

    def mean_time_to_failure(self, beta: float, eta: float) -> float:
        """Expected lifespan of the component."""
        return eta * math.gamma(1 + 1 / beta)

    def remaining_useful_life(self, t: float, beta: float, eta: float, threshold: float = 0.1) -> float:
        """Time remaining until reliability drops below the threshold."""
        try:
            R_current = self.reliability(t, beta, eta)
            if R_current <= threshold: return 0.0
            t_threshold = eta * ((-math.log(threshold)) ** (1 / beta))
            RUL = t_threshold - t
            return max(0.0, RUL)
        except ValueError:
            return 0.0

    def mean_time_to_repair(self, reliability: float) -> float:
        """
        Calculates MTTR, where repair time increases significantly as reliability drops.
        (Requirement 3: Downtime cost increases with low R)
        """
        # Dynamic Scaling: Factor increases as (1 - R) increases.
        scaling_factor = 1.0 + (self.MTTR_R_PENALTY * (1.0 - reliability))

        # Base mean rates (1/hours)
        CMT_rate = 0.05
        TTAS_rate = 0.1
        TTAP_rate = 0.1

        # Mean times increase with scaling_factor
        CMT = (1 / CMT_rate) * scaling_factor
        TTAS = (1 / TTAS_rate) * scaling_factor
        TTAP = (1 / TTAP_rate) * scaling_factor

        MDT = max(TTAS, TTAP)
        MTTR = CMT + MDT
        return MTTR

    # --- Environment core methods ---
    def reset(self, seed: Optional[int] = None, options: Optional[dict] = None) -> Tuple[np.ndarray, Dict]:
        super().reset(seed=seed)
        self.ages.fill(0.0)
        self.time_since_maintenance.fill(0.0)
        self.last_action.fill(0.0)
        return self._get_state(), {}

    def step(self, actions: Union[np.ndarray, list]) -> Tuple[np.ndarray, float, bool, bool, Dict]:
        actions = np.array(actions)
        total_reward = 0.0
        info = {}

        terminate_max_age = False
        terminate_low_reliability = False
        running_cost_total = 0.0

        for i, (name, params) in enumerate(self.components.items()):
            beta, eta = params["beta"], params["eta"]

            # Calculate state BEFORE action to determine costs/reliability
            hazard = self.hazard_rate(self.ages[i], beta, eta)
            reliability = self.reliability(self.ages[i], beta, eta)

            # 1. DYNAMIC OPERATION COST
            cost_op = self.COST_OP_HOUR_BASE * (1.0 + self.OP_HAZARD_SCALE * hazard)
            running_cost_total += cost_op

            component_mttr = self.mean_time_to_repair(reliability)

            # Check for failure probability
            prob_failure = 1.0 - np.exp(-hazard * self.TIME_STEP)
            failed_spontaneously = np.random.rand() < prob_failure


            # --- Maintenance Handling ---
            maintenance_occurred = False

            if actions[i] == 1:  # Preventive Maintenance (PM)
                maintenance_occurred = True

                cost_pm_fixed = self.COST_PM_FIXED_BASE
                cost_downtime = self.COST_DOWN_HOUR
                total_reward -= (cost_pm_fixed + cost_downtime)

                # IMPERFECT RESTORATION FIX: R_target = R_current * Factor
                # We remove the max(R_current, ...) which prevented age from progressing when R was 1.0
                R_current = reliability
                R_target = R_current * self.PM_RESTORATION_FACTOR

                # If component is brand new (R=1.0) and PM is done, the best we can do is reset to 0.0.
                # However, R_target will be 0.85, giving a small effective age, which seems to be the intent
                # of "imperfect" maintenance.

                t_prime = self._calculate_age_for_reliability(R_target, i)

                self.ages[i] = t_prime
                self.time_since_maintenance[i] = 0.0

            elif actions[i] == 2:  # Corrective Maintenance (CM) (Replacement)
                maintenance_occurred = True

                dynamic_cm_fixed_cost = self.COST_CM_FIXED_BASE
                cost_downtime = self.COST_DOWN_HOUR * component_mttr
                cost_cm = dynamic_cm_fixed_cost

                total_reward -= (cost_cm + cost_downtime)
                self.ages[i] = 0.0 # Full replacement (as-good-as-new)
                self.time_since_maintenance[i] = 0.0

            elif actions[i] == 0: # Do Nothing (NOOP)
                 if failed_spontaneously:
                    maintenance_occurred = True

                    # Forced CM due to unplanned failure (most costly outcome)
                    dynamic_cm_fixed_cost = self.COST_CM_FIXED_BASE
                    cost_downtime = self.COST_DOWN_HOUR * component_mttr
                    cost_cm = dynamic_cm_fixed_cost + self.PENALTY_UNPLANNED_FAIL

                    total_reward -= (cost_cm + cost_downtime)
                    self.ages[i] = 0.0
                    self.time_since_maintenance[i] = 0.0

            # Age update
            # Age and TSM progress only if NOOP and no failure, OR if maintenance did not occur this step.
            if not maintenance_occurred:
                self.ages[i] += self.TIME_STEP
                self.time_since_maintenance[i] += self.TIME_STEP

            # Check for termination conditions
            if self.ages[i] >= self.MAX_AGE_TERMINATION:
                terminate_max_age = True

            # Re-calculate reliability with the (possibly) new age
            current_reliability = self.reliability(self.ages[i], beta, eta)
            if current_reliability <= self.RELIABILITY_THRESHOLD:
                terminate_low_reliability = True

            self.last_action[i] = actions[i]

        # --- System Running Cost ---
        total_reward -= running_cost_total * self.TIME_STEP

        # --- Termination Condition ---
        done = terminate_max_age or terminate_low_reliability

        next_state = self._get_state()

        return next_state, total_reward, done, False, info

    # Construct observation matrix
    def _get_state(self) -> np.ndarray:
        state = []
        for i, (name, params) in enumerate(self.components.items()):
            beta, eta = params["beta"], params["eta"]
            age = self.ages[i]

            hazard = self.hazard_rate(age, beta, eta)
            reliability = self.reliability(age, beta, eta)
            RUL = self.remaining_useful_life(age, beta, eta)
            MTTF = self.mean_time_to_failure(beta, eta)
            MTTR = self.mean_time_to_repair(reliability) # Calculate based on R
            load_fraction = self.load_fraction[i]
            confidence = reliability

            s_i = [
                age,                                   # 1. tᵢ (Weibull Age)
                self.time_since_maintenance[i],        # 2. Δt_last,i
                hazard,                                # 3. λᵢ
                reliability,                           # 4. Rᵢ
                RUL,                                   # 5. RULᵢ
                MTTF,                                  # 6. MTTFᵢ
                load_fraction,                         # 7. Lᵢ
                MTTR,                                  # 8. MTTRᵢ
                self.last_action[i],                   # 9. a_{prev},i
                confidence,                            # 10. Cᵢ
            ]
            state.append(s_i)
        return np.array(state, dtype=np.float32)

    def render(self):
        pass

    def close(self):
        pass

# --- DDQN AGENT IMPLEMENTATION (Standard) ---

class ReplayBuffer:
    """Stores experiences for DDQN training."""
    def __init__(self, capacity: int):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size: int):
        if len(self.buffer) < batch_size: return []
        return random.sample(self.buffer, batch_size)

    def __len__(self):
        return len(self.buffer)

class DDQNAgent:
    """Double Deep Q-Network Agent."""
    def __init__(self, state_shape, action_dim, gamma=0.99, lr=0.001, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.9999, buffer_size=50000, batch_size=64, update_target_freq=100):

        self.state_shape = state_shape
        self.action_dim = action_dim
        self.num_components = state_shape[0]
        self.output_dim = self.action_dim ** self.num_components

        self.gamma = gamma
        self.lr = lr
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.batch_size = batch_size
        self.update_target_freq = update_target_freq
        self.memory = ReplayBuffer(buffer_size)
        self.train_step_count = 0

        self.q_local = self._build_model()
        self.q_target = self._build_model()
        self.q_target.set_weights(self.q_local.get_weights())

    def _build_model(self):
        """Creates a Neural Network for the Q-function."""
        model = Sequential()
        model.add(InputLayer(input_shape=self.state_shape))
        model.add(tf.keras.layers.Flatten())

        model.add(Dense(256, activation='relu'))
        model.add(Dense(128, activation='relu'))

        model.add(Dense(self.output_dim, activation='linear'))

        model.compile(loss=MeanSquaredError(), optimizer=Adam(learning_rate=self.lr))
        return model

    def get_action(self, state: np.ndarray) -> np.ndarray:
        """Epsilon-greedy action selection."""
        if np.random.rand() <= self.epsilon:
            actions = np.random.randint(0, self.action_dim, size=self.num_components)
        else:
            state = np.expand_dims(state, axis=0)
            q_values = self.q_local.predict(state, verbose=0)[0]
            flat_action = np.argmax(q_values)
            actions = self._flat_to_multi_discrete(flat_action)

        return actions

    def _flat_to_multi_discrete(self, flat_action: int) -> np.ndarray:
        """Converts flat action index back to MultiDiscrete action array."""
        actions = []
        base = self.action_dim
        temp_action = flat_action
        for _ in range(self.num_components):
            action_i = temp_action % base
            actions.append(action_i)
            temp_action //= base
        return np.array(actions[::-1])

    def _multi_discrete_to_flat(self, actions: np.ndarray) -> int:
        """Converts MultiDiscrete action array to flat action index."""
        flat_action = 0
        base = self.action_dim
        for i, action_i in enumerate(reversed(actions)):
            flat_action += action_i * (base ** i)
        return flat_action

    def train(self):
        """Trains the Q-network using DDQN logic."""
        if len(self.memory) < self.batch_size:
            return

        batch = self.memory.sample(self.batch_size)

        states = np.array([exp[0] for exp in batch])
        actions_md = np.array([exp[1] for exp in batch])
        rewards = np.array([exp[2] for exp in batch])
        next_states = np.array([exp[3] for exp in batch])
        dones = np.array([exp[4] for exp in batch])

        q_values_local = self.q_local.predict(states, verbose=0)
        q_next_local = self.q_local.predict(next_states, verbose=0)
        max_action_indices_flat = np.argmax(q_next_local, axis=1)

        q_next_target = self.q_target.predict(next_states, verbose=0)
        q_target_evaluated = q_next_target[np.arange(self.batch_size), max_action_indices_flat]

        target_q_values = rewards + self.gamma * q_target_evaluated * (1 - dones)

        q_targets = q_values_local.copy()
        for i in range(self.batch_size):
            flat_action = self._multi_discrete_to_flat(actions_md[i])
            q_targets[i, flat_action] = target_q_values[i]

        self.q_local.fit(states, q_targets, epochs=1, verbose=0)

        self.train_step_count += 1
        if self.train_step_count % self.update_target_freq == 0:
            self.q_target.set_weights(self.q_local.get_weights())

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

# --- TRAINING EXECUTION ---

REWARD_SCALE_FACTOR = 1000.0 # Divide all rewards/penalties by 1000

def train_ddqn():
    # Set seed for reproducibility
    tf.random.set_seed(42)
    np.random.seed(42)
    random.seed(42)

    # Environment Setup
    # Time step of 1.0 means each step is 1 hour
    env = PredictiveMaintenanceEnv(time_step=1.0)
    state_shape = env.observation_space.shape
    action_dim = env.action_space.nvec[0]
    num_components = env.num_components

    # Agent Setup
    agent = DDQNAgent(
        state_shape=state_shape,
        action_dim=action_dim,
        lr=0.0005,
        epsilon_decay=0.9995,
        buffer_size=50000,
        batch_size=32,
        update_target_freq=50
    )

    EPISODES = 100
    # Log at every step (1 step = 1 hour)
    LOG_STEP_INTERVAL = 1
    ACTION_MAP = {0: "NOOP", 1: "PM (Planned)", 2: "CM (Corrective)"}

    print("--- DDQN Training Start ---")
    print("Environment: {num_components} Component. Max Age: {env.MAX_AGE_TERMINATION} hrs. R_min: {env.RELIABILITY_THRESHOLD}")
    print("PM Fixed Cost: {env.COST_PM_FIXED_BASE:,.0f} Rs")
    print("PM Imperfect Restoration: {env.PM_RESTORATION_FACTOR * 100:.0f}% of R_prev")
    print("MTTR Reliability Penalty (x): {env.MTTR_R_PENALTY}")
    print("Operating Cost Hazard Scale (x): {env.OP_HAZARD_SCALE:,.0f} (Highly Aggressive)")
    print("Reward Scaling: 1 / {REWARD_SCALE_FACTOR:.0f}")

    for episode in range(1, EPISODES + 1):
        state, _ = env.reset()
        done = False
        episode_reward_raw = 0
        episode_reward_scaled = 0
        step_count = 0

        while not done:
            actions = agent.get_action(state)
            next_state, reward_raw, terminated, truncated, info = env.step(actions)
            done = terminated or truncated

            reward_scaled = reward_raw / REWARD_SCALE_FACTOR

            agent.memory.push(state, actions, reward_scaled, next_state, done)
            agent.train()

            # --- DETAILED LOGGING (for Component 1) ---
            #if step_count % LOG_STEP_INTERVAL == 0:
            comp_index = 0

            # State features
            age, tsm, hazard, R, RUL, MTTF, L, MTTR, a_prev, _ = state[comp_index]

            # Calculate current dynamic Operating cost for display
            dynamic_op_cost = env.COST_OP_HOUR_BASE * (1.0 + env.OP_HAZARD_SCALE * hazard)

            print("\n[Ep: {episode} | Step: {step_count} | Time: {age:.0f} hrs | TSM: {tsm:.0f} hrs | Epsilon: {agent.epsilon:.4f}]")
            print("  Component 1 ({env.component_names[comp_index]}):")
            print("    Weibull Age (t): {age:.0f} hrs")
            print("    Reliability (R): {R:.4f}")
            print("    Hazard Rate (λ): {hazard:.6f}")
            print("    RUL Estimate: {RUL:.0f} hrs")
            print("    Prev Action (a_prev): {ACTION_MAP[int(a_prev)]}")
            print("    Dynamic Op Cost: {dynamic_op_cost:.2f} Rs/hr | Current MTTR: {MTTR:.2f} hrs")
            print("    Action Taken: {ACTION_MAP[actions[comp_index]]} (All Actions: {actions})")
            print("    Cost/Reward (Raw): {reward_raw:,.0f} | (Scaled): {reward_scaled:.2f}")

            state = next_state
            episode_reward_raw += reward_raw
            episode_reward_scaled += reward_scaled
            step_count += 1

        # --- END OF EPISODE LOGGING ---
        if episode % 10 == 0 or episode == EPISODES:
            print("\n--- Episode {episode}/{EPISODES} Summary ---")
            print("TOTAL COST (Raw): {episode_reward_raw:,.0f} (Note: Reward is negative cost)")
            print("Steps: {step_count} | Total Time: {step_count * env.TIME_STEP} hrs")
            print("Epsilon: {agent.epsilon:.4f} | Final R (Comp 1): {state[0, 3]:.4f}")
            print("-" * 40)

    env.close()
    print("Training complete.")

if __name__ == "__main__":
    tf.config.experimental.set_visible_devices([], 'GPU')
    print("TensorFlow Version: {tf.__version__}")

    train_ddqn()

TensorFlow Version: 2.19.0
--- DDQN Training Start ---
Environment: 1 Component. Max Age: 10000.0 hrs. R_min: 0.15
PM Fixed Cost: 3,000 Rs
PM Imperfect Restoration: 85% of R_prev
MTTR Reliability Penalty (x): 5.0
Operating Cost Hazard Scale (x): 40,000 (Highly Aggressive)
Reward Scaling: 1 / 1000

[Ep: 1 | Step: 0 | Time: 0 hrs | TSM: 0 hrs | Epsilon: 1.0000]
  Component 1 (Electrodes):
    Weibull Age (t): 0 hrs
    Reliability (R): 1.0000
    Hazard Rate (λ): 0.000000
    RUL Estimate: 4009 hrs
    Prev Action (a_prev): NOOP
    Dynamic Op Cost: 10.00 Rs/hr | Current MTTR: 30.00 hrs
    Action Taken: NOOP (All Actions: [0])
    Cost/Reward (Raw): -10 | (Scaled): -0.01

[Ep: 1 | Step: 1 | Time: 1 hrs | TSM: 1 hrs | Epsilon: 1.0000]
  Component 1 (Electrodes):
    Weibull Age (t): 1 hrs
    Reliability (R): 1.0000
    Hazard Rate (λ): 0.000006
    RUL Estimate: 4008 hrs
    Prev Action (a_prev): NOOP
    Dynamic Op Cost: 12.35 Rs/hr | Current MTTR: 30.00 hrs
    Action Taken: NOOP (All



[1;30;43mStreaming output truncated to the last 5000 lines.[0m
    Hazard Rate (λ): 0.000034
    RUL Estimate: 3991 hrs
    Prev Action (a_prev): NOOP
    Dynamic Op Cost: 23.68 Rs/hr | Current MTTR: 30.06 hrs
    Action Taken: NOOP (All Actions: [0])
    Cost/Reward (Raw): -24 | (Scaled): -0.02

[Ep: 19 | Step: 1756 | Time: 19 hrs | TSM: 19 hrs | Epsilon: 0.0387]
  Component 1 (Electrodes):
    Weibull Age (t): 19 hrs
    Reliability (R): 0.9996
    Hazard Rate (λ): 0.000035
    RUL Estimate: 3990 hrs
    Prev Action (a_prev): NOOP
    Dynamic Op Cost: 24.13 Rs/hr | Current MTTR: 30.06 hrs
    Action Taken: NOOP (All Actions: [0])
    Cost/Reward (Raw): -24 | (Scaled): -0.02

[Ep: 19 | Step: 1757 | Time: 20 hrs | TSM: 20 hrs | Epsilon: 0.0386]
  Component 1 (Electrodes):
    Weibull Age (t): 20 hrs
    Reliability (R): 0.9995
    Hazard Rate (λ): 0.000036
    RUL Estimate: 3989 hrs
    Prev Action (a_prev): NOOP
    Dynamic Op Cost: 24.58 Rs/hr | Current MTTR: 30.07 hrs
    Action T