In [None]:
import gym
from gym import spaces
import numpy as np
import pandas as pd
import scipy
from scipy.integrate import odeint
import csv

In [None]:
!pip install gym

In [None]:
Ad      = 4.4e16
Ed      = 140.06e3
Ap      = 1.7e11/60
Ep      = 16.9e3/0.239
deltaHp = -82.2e3
UA      = 33.3083 #%18.8445;
Qc      = 650
Qs      = 12.41e-2
V       = 0.5
Tc      = 27
Tamb    = 27
Cpc     = 4.184
R       = 8.3145
alpha   = 1.212827
beta    = 0.000267
epsilon = 0.5
theta   = 1.25
m1      = 450
cp1     = 4.184
mjCpj   = (18*4.184)+(240*0.49)
cp2     = 187
cp3     = 110.58 #%J/molK
cp4     = 84.95
m5      = 220
cp5     = 0.49
m6      = 7900
cp6     = 0.49
M0      = 0.7034
I0      = 4.5e-3

# Define Batch Reactor model
def br(x,t,u,Ad):
    # Inputs:
    # Coolant flow rate
    F = u*16.667

    # States (4):
    # Initiator
    Ii  = x[0]
    # Monomer
    M  = x[1]
    # Reactor temperature
    Tr = x[2]
    # Jacket temperature
    Tj = x[3]

    Ri    = Ad*Ii*(np.exp(-Ed/(R*(Tr+273.15))))
    Rp    = Ap*(Ii**epsilon)*(M**(theta)*(np.exp(-Ep/(R*(Tr+273.15)))))
    mrCpr = m1*cp1+ Ii*cp2*V + M*cp3*V +(M0-M)*cp4*V+ m5*cp5 + m6*cp6
    Qpr   = alpha*(Tr-Tc)**beta

    # Computing the rate of change of I, M, Tr, Tj using Differential Equations
    dy1_dt = -Ri
    dy2_dt = -Rp
    dy3_dt = (Rp*V*(-deltaHp)-UA*(Tr-Tj)+Qc+Qs-Qpr)/mrCpr
    dy4_dt = (UA*(Tr-Tj)-F*Cpc*(Tj-Tc))/mjCpj

    # Return xdot:
    xdot = np.zeros(4)
    xdot[0] = dy1_dt
    xdot[1] = dy2_dt
    xdot[2] = dy3_dt
    xdot[3] = dy4_dt

    return xdot



In [None]:
class BR3 (gym.Env):

    def __init__(self):

        self.action_space = spaces.Box(low=0.25, high=0.75, shape=(1,), dtype=np.float32)

        self.observation_space = spaces.Box(low=0, high=100, shape=(2,), dtype=np.float32)

        self.t = np.linspace(0,7200,7201)
        self.i= 0

        Tr_ref = pd.read_csv('Trajectory2.csv') # Enter the CSV file directory for setpoints
        self.a1 = Tr_ref.values.tolist()
        self.sp = self.a1[self.i][0] # Saving the setpoints in an array

       # Assign Initial conditions to the variables
        self.I = 4.5e-3     # Initiator
        self.M = 0.7034     # Monomer
        self.Tr = 45.0      # Reactor Temperature
        self.Tj = 40.0      # Jacket Temperature

        self.state = self.Tr ,self.sp

        # Creating an NumPy array to store the values of I, M, Tr and Tj for give it as an argument to the br_equation function

        self.y0= np.empty(4)
        self.y0[0] = self.I        # Initiator Concentration
        self.y0[1] = self.M        # Monomer Concentration
        self.y0[2] = self.Tr       # Reactor Temperature
        self.y0[3] = self.Tj       # Jacket Temperature


        self.time_step= 7200 # Number of computation steps

    # Step function to give the coolant flow rate (in LPM) to the model, returning the next state and computed reward
    def step(self, action): # Note that the action is an array

        action = action[0]
        u = action

        ts = [self.t[self.i],self.t[self.i+1]]

        y = scipy.integrate.odeint(br,self.y0, ts , args=(u,4.4e16),)

        x = np.round(y, decimals=4)

        self.I = x[-1][0]
        self.M = x[-1][1]
        self.Tr = x[-1][2]
        self.Tj = x[-1][3]

        self.y0= np.empty(4)
        self.y0[0] = self.I        # Initiator Concentration
        self.y0[1] = self.M        # Monomer Concentration
        self.y0[2] = self.Tr       # Reactor Temperature
        self.y0[3] = self.Tj       # Jacket Temperature


        # Data saving snippet when using the model - Saves the data to a csv file
        data = [self.sp, self.Tr, self.Tj, action]
        with open('data.csv', 'a') as file:
            writer = csv.writer(file)
            writer.writerow(data)

        self.sp=self.a1[self.i][0]
        self.i += 1

        # Calculate the difference between Setpoint and Reactor Temperature (Process Variable)
        difference = self.sp - self.Tr
        self.reward = 0

        # Calculate the modulus of the difference
        error = abs(difference)

        # These rewards can be modified in accordance with the requirements
        if error <= 0.5:
            self.reward = +100
        elif error <= 1:
            self.reward = +50
        elif error <= 3:
            self.reward = +25
        elif error <= 4:
            self.reward = +10
        else:
            self.reward = -100

        if self.i>= self.time_step:
            done = True
        else:
            done = False

        info = {}

        self.state = self.Tr , self.sp

        return self.state, self.reward ,done ,info

    def reset(self):
        self.I  = 4.5e-3
        self.M  = 0.7034
        self.Tr = 45.0
        self.Tj = 40.0
        self.i  = 0

        self.sp = self.a1[self.i][0]
        self.state=self.Tr,self.sp

        self.y0= np.empty(4)
        self.y0[0] = self.I        # Initiator Concentration
        self.y0[1] = self.M        # Monomer Concentration
        self.y0[2] = self.Tr       # Reactor Temperature
        self.y0[3] = self.Tj       # Jacket Temperature

        return self.state

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.integrate
import gym
from gym import spaces
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import random
import csv
import os

# If you're running on Google Colab, uncomment these lines to install stable-baselines3
# !pip install stable-baselines3
# !pip install tensorboard

# Create a mock Trajectory2.csv file with setpoints for training
# (Replace this with loading your actual file if available)
def create_mock_trajectory():
    # Create a simple temperature profile that varies from 45 to 70 degrees
    time_steps = 7201
    setpoints = np.zeros((time_steps, 1))
    
    # Initial phase - maintain at 45°C
    setpoints[:1000] = 45.0
    
    # Ramp up to 70°C
    ramp_indices = np.arange(1000, 3000)
    setpoints[1000:3000] = 45.0 + (70.0 - 45.0) * (ramp_indices - 1000) / 2000
    
    # Hold at 70°C
    setpoints[3000:5000] = 70.0
    
    # Ramp down to 50°C
    ramp_indices = np.arange(5000, 6000)
    setpoints[5000:6000] = 70.0 - (70.0 - 50.0) * (ramp_indices - 5000) / 1000
    
    # Hold at 50°C until the end
    setpoints[6000:] = 50.0
    
    # Save to CSV
    df = pd.DataFrame(setpoints)
    df.to_csv('Trajectory2.csv', index=False, header=False)
    
    return setpoints

# Check if Trajectory2.csv exists, if not create it
if not os.path.exists('Trajectory2.csv'):
    create_mock_trajectory()

# Reactor parameters
Ad      = 4.4e16
Ed      = 140.06e3
Ap      = 1.7e11/60
Ep      = 16.9e3/0.239
deltaHp = -82.2e3
UA      = 33.3083 #%18.8445;
Qc      = 650
Qs      = 12.41e-2
V       = 0.5
Tc      = 27
Tamb    = 27
Cpc     = 4.184
R       = 8.3145
alpha   = 1.212827
beta    = 0.000267
epsilon = 0.5
theta   = 1.25
m1      = 450
cp1     = 4.184
mjCpj   = (18*4.184)+(240*0.49)
cp2     = 187
cp3     = 110.58 #%J/molK
cp4     = 84.95
m5      = 220
cp5     = 0.49
m6      = 7900
cp6     = 0.49
M0      = 0.7034
I0      = 4.5e-3

# Define Batch Reactor model
def br(x, t, u, Ad):
    # Inputs:
    # Coolant flow rate
    F = u*16.667
    # States (4):
    # Initiator
    Ii  = x[0]
    # Monomer
    M  = x[1]
    # Reactor temperature
    Tr = x[2]
    # Jacket temperature
    Tj = x[3]
    Ri    = Ad*Ii*(np.exp(-Ed/(R*(Tr+273.15))))
    Rp    = Ap*(Ii**epsilon)*(M**(theta)*(np.exp(-Ep/(R*(Tr+273.15)))))
    mrCpr = m1*cp1+ Ii*cp2*V + M*cp3*V +(M0-M)*cp4*V+ m5*cp5 + m6*cp6
    Qpr   = alpha*(Tr-Tc)**beta
    # Computing the rate of change of I, M, Tr, Tj using Differential Equations
    dy1_dt = -Ri
    dy2_dt = -Rp
    dy3_dt = (Rp*V*(-deltaHp)-UA*(Tr-Tj)+Qc+Qs-Qpr)/mrCpr
    dy4_dt = (UA*(Tr-Tj)-F*Cpc*(Tj-Tc))/mjCpj
    # Return xdot:
    xdot = np.zeros(4)
    xdot[0] = dy1_dt
    xdot[1] = dy2_dt
    xdot[2] = dy3_dt
    xdot[3] = dy4_dt
    return xdot

class BR3(gym.Env):
    def __init__(self):
        self.action_space = spaces.Box(low=0.25, high=0.75, shape=(1,), dtype=np.float32)
        self.observation_space = spaces.Box(low=0, high=100, shape=(2,), dtype=np.float32)
        self.t = np.linspace(0, 7200, 7201)
        self.i = 0
        Tr_ref = pd.read_csv('Trajectory2.csv') # Enter the CSV file directory for setpoints
        self.a1 = Tr_ref.values.tolist()
        self.sp = self.a1[self.i][0] # Saving the setpoints in an array
        # Assign Initial conditions to the variables
        self.I = 4.5e-3     # Initiator
        self.M = 0.7034     # Monomer
        self.Tr = 45.0      # Reactor Temperature
        self.Tj = 40.0      # Jacket Temperature
        self.state = self.Tr, self.sp
        # Creating an NumPy array to store the values of I, M, Tr and Tj for give it as an argument to the br_equation function
        self.y0 = np.empty(4)
        self.y0[0] = self.I        # Initiator Concentration
        self.y0[1] = self.M        # Monomer Concentration
        self.y0[2] = self.Tr       # Reactor Temperature
        self.y0[3] = self.Tj       # Jacket Temperature
        self.time_step = 7200 # Number of computation steps
        
        # For logging data
        if os.path.exists('data.csv'):
            os.remove('data.csv')
        with open('data.csv', 'w') as file:
            writer = csv.writer(file)
            writer.writerow(['Setpoint', 'Tr', 'Tj', 'Action'])
    
    # Step function to give the coolant flow rate (in LPM) to the model, returning the next state and computed reward
    def step(self, action): # Note that the action is an array
        action = action[0]
        u = action
        ts = [self.t[self.i], self.t[self.i+1]]
        y = scipy.integrate.odeint(br, self.y0, ts, args=(u, 4.4e16))
        x = np.round(y, decimals=4)
        self.I = x[-1][0]
        self.M = x[-1][1]
        self.Tr = x[-1][2]
        self.Tj = x[-1][3]
        self.y0 = np.empty(4)
        self.y0[0] = self.I        # Initiator Concentration
        self.y0[1] = self.M        # Monomer Concentration
        self.y0[2] = self.Tr       # Reactor Temperature
        self.y0[3] = self.Tj       # Jacket Temperature
        # Data saving snippet when using the model - Saves the data to a csv file
        data = [self.sp, self.Tr, self.Tj, action]
        with open('data.csv', 'a') as file:
            writer = csv.writer(file)
            writer.writerow(data)
        
        self.i += 1
        if self.i < len(self.a1):
            self.sp = self.a1[self.i][0]
        
        # Calculate the difference between Setpoint and Reactor Temperature (Process Variable)
        difference = self.sp - self.Tr
        self.reward = 0
        # Calculate the modulus of the difference
        error = abs(difference)
        # These rewards can be modified in accordance with the requirements
        if error <= 0.5:
            self.reward = +100
        elif error <= 1:
            self.reward = +50
        elif error <= 3:
            self.reward = +25
        elif error <= 4:
            self.reward = +10
        else:
            self.reward = -100
        
        if self.i >= self.time_step:
            done = True
        else:
            done = False
        
        info = {}
        self.state = self.Tr, self.sp
        return np.array(self.state, dtype=np.float32), self.reward, done, info
    
    def reset(self):
        self.I  = 4.5e-3
        self.M  = 0.7034
        self.Tr = 45.0
        self.Tj = 40.0
        self.i  = 0
        self.sp = self.a1[self.i][0]
        self.state = self.Tr, self.sp
        self.y0 = np.empty(4)
        self.y0[0] = self.I        # Initiator Concentration
        self.y0[1] = self.M        # Monomer Concentration
        self.y0[2] = self.Tr       # Reactor Temperature
        self.y0[3] = self.Tj       # Jacket Temperature
        
        # For logging data - reset the data file
        if os.path.exists('data.csv'):
            os.remove('data.csv')
        with open('data.csv', 'w') as file:
            writer = csv.writer(file)
            writer.writerow(['Setpoint', 'Tr', 'Tj', 'Action'])
            
        return np.array(self.state, dtype=np.float32)

# Define a CNN+LSTM neural network architecture for the RL agent
class CNNLSTMNetwork(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, sequence_length=10):
        super(CNNLSTMNetwork, self).__init__()
        self.hidden_dim = hidden_dim
        self.sequence_length = sequence_length
        
        # CNN layers for feature extraction
        self.conv1 = nn.Conv1d(in_channels=input_dim, out_channels=16, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3, padding=1)
        
        # LSTM layer for sequential decision making
        self.lstm = nn.LSTM(input_size=32, hidden_size=hidden_dim, batch_first=True)
        
        # Fully connected layers for action prediction
        self.fc1 = nn.Linear(hidden_dim, 64)
        self.fc2 = nn.Linear(64, output_dim)
        
    def forward(self, x):
        # x shape: (batch_size, sequence_length, input_dim)
        batch_size = x.size(0)
        
        # Reshape for CNN (batch_size, input_dim, sequence_length)
        x = x.permute(0, 2, 1)
        
        # Apply CNN layers
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        
        # Reshape for LSTM (batch_size, sequence_length, features)
        x = x.permute(0, 2, 1)
        
        # Apply LSTM layer
        lstm_out, _ = self.lstm(x)
        
        # Take the output from the last time step
        lstm_out = lstm_out[:, -1, :]
        
        # Apply fully connected layers
        x = F.relu(self.fc1(lstm_out))
        x = self.fc2(x)
        
        return x

# Define Deep Q-Network Agent with Experience Replay
class DQNAgent:
    def __init__(self, state_size, action_size, hidden_size=64, sequence_length=10):
        self.state_size = state_size
        self.action_size = action_size
        self.sequence_length = sequence_length
        self.memory = deque(maxlen=10000)
        self.gamma = 0.95    # discount rate
        self.epsilon = 1.0   # exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        
        # Create Q networks
        self.model = CNNLSTMNetwork(state_size, hidden_size, action_size, sequence_length)
        self.target_model = CNNLSTMNetwork(state_size, hidden_size, action_size, sequence_length)
        self.update_target_model()
        
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        self.criterion = nn.MSELoss()
        
        # Store past states for sequence input
        self.state_memory = deque(maxlen=sequence_length)
        
    def update_target_model(self):
        self.target_model.load_state_dict(self.model.state_dict())
    
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    def get_sequence(self, state):
        # Add current state to memory
        self.state_memory.append(state)
        
        # If we don't have enough states, pad with zeros
        if len(self.state_memory) < self.sequence_length:
            padding = [np.zeros_like(state) for _ in range(self.sequence_length - len(self.state_memory))]
            sequence = padding + list(self.state_memory)
        else:
            sequence = list(self.state_memory)
            
        return np.array(sequence)
    
    def act(self, state):
        # Get sequence of states
        state_sequence = self.get_sequence(state)
        
        if np.random.rand() <= self.epsilon:
            # Explore: choose random action
            return np.random.uniform(0.25, 0.75, (1,))
        
        # Exploit: choose best action from Q values
        state_sequence = torch.FloatTensor(state_sequence).unsqueeze(0)  # Add batch dimension
        with torch.no_grad():
            q_values = self.model(state_sequence)
            
        # Convert q_values to continuous action (between 0.25 and 0.75)
        action_value = q_values.numpy()[0][0]
        # Scale the output to the action range
        scaled_action = 0.25 + (0.75 - 0.25) * (action_value + 1) / 2
        scaled_action = np.clip(scaled_action, 0.25, 0.75)
        
        return np.array([scaled_action])
    
    def replay(self, batch_size):
        if len(self.memory) < batch_size:
            return
            
        minibatch = random.sample(self.memory, batch_size)
        
        for state, action, reward, next_state, done in minibatch:
            # Get sequence for current and next state
            state_sequence = self.get_sequence(state)
            next_state_sequence = self.get_sequence(next_state)
            
            state_sequence = torch.FloatTensor(state_sequence).unsqueeze(0)
            next_state_sequence = torch.FloatTensor(next_state_sequence).unsqueeze(0)
            
            # Calculate target Q value
            target = reward
            if not done:
                with torch.no_grad():
                    target = reward + self.gamma * torch.max(self.target_model(next_state_sequence))
            
            # Get current Q values
            current_q = self.model(state_sequence)
            
            # Calculate loss
            loss = self.criterion(current_q, torch.FloatTensor([[target]]))
            
            # Perform optimization
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
        
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

# Function to run a test episode with trained agent
def run_test_episode(env, agent):
    state = env.reset()
    done = False
    total_reward = 0
    
    # Initialize arrays to store data for plotting
    setpoints = []
    temperatures = []
    jacket_temps = []
    actions = []
    
    while not done:
        action = agent.act(state)
        next_state, reward, done, _ = env.step(action)
        total_reward += reward
        state = next_state
        
        # Store data for plotting
        setpoints.append(state[1])
        temperatures.append(state[0])
        jacket_temps.append(env.Tj)
        actions.append(action[0])
    
    return setpoints, temperatures, jacket_temps, actions, total_reward

# Train the DQN agent
def train_dqn_agent(episodes=100, batch_size=64):
    env = BR3()
    state_size = 2
    action_size = 1
    agent = DQNAgent(state_size, action_size)
    
    scores = []
    
    for e in range(episodes):
        state = env.reset()
        total_reward = 0
        done = False
        
        while not done:
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            agent.remember(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward
            
            if len(agent.memory) > batch_size:
                agent.replay(batch_size)
                
        # Update target model periodically
        if e % 10 == 0:
            agent.update_target_model()
            
        scores.append(total_reward)
        print(f"Episode: {e+1}/{episodes}, Score: {total_reward}, Epsilon: {agent.epsilon:.2f}")
    
    # Plot training performance
    plt.figure(figsize=(10, 5))
    plt.plot(scores)
    plt.title('Training Performance')
    plt.xlabel('Episode')
    plt.ylabel('Total Reward')
    plt.savefig('training_performance.png')
    
    return env, agent

# Main execution
if __name__ == "__main__":
    # Set random seeds for reproducibility
    np.random.seed(0)
    torch.manual_seed(0)
    random.seed(0)
    
    # Train the agent
    print("Training DQN agent with CNN+LSTM architecture...")
    env, agent = train_dqn_agent(episodes=100)
    
    # Run a test episode and plot results
    print("Running test episode...")
    setpoints, temperatures, jacket_temps, actions, total_reward = run_test_episode(env, agent)
    
    # Plot the results
    plt.figure(figsize=(15, 10))
    
    # Plot temperature vs setpoint
    plt.subplot(3, 1, 1)
    plt.plot(setpoints, 'r-', label='Setpoint')
    plt.plot(temperatures, 'b-', label='Reactor Temperature')
    plt.title('Reactor Temperature Control with RL')
    plt.xlabel('Time (s)')
    plt.ylabel('Temperature (°C)')
    plt.legend()
    plt.grid(True)
    
    # Plot jacket temperature
    plt.subplot(3, 1, 2)
    plt.plot(jacket_temps, 'g-', label='Jacket Temperature')
    plt.title('Jacket Temperature')
    plt.xlabel('Time (s)')
    plt.ylabel('Temperature (°C)')
    plt.legend()
    plt.grid(True)
    
    # Plot control actions
    plt.subplot(3, 1, 3)
    plt.plot(actions, 'k-', label='Coolant Flow Rate')
    plt.title('Control Actions (Normalized Coolant Flow Rate)')
    plt.xlabel('Time (s)')
    plt.ylabel('Flow Rate (normalized)')
    plt.legend()
    plt.grid(True)
    
    plt.tight_layout()
    plt.savefig('reinforcement_learning_results.png')
    plt.show()
    
    print(f"Test episode completed with total reward: {total_reward}")
    print("Results saved as 'reinforcement_learning_results.png'")