In [None]:
import numpy as np
import gym
from gym import spaces
import random
from scipy.integrate import solve_ivp

class CSTRCoolingEnv(gym.Env):
    def __init__(self):
        super(CSTRCoolingEnv, self).__init__()
        
        # Parameters
        self.V = 100             # Reactor volume (L)
        self.q = 20             # Flowrate (L/min)
        self.Caf = 1.0           # Feed concentration (mol/L)
        self.Tf = 350            # Feed temperature (K)
        self.k0 = 15.2e10         # 1/min
        self.EoverR = 10000       # K
        self.mdelH = 5e4         # J/mol
        self.rho = 1000          # g/L
        self.Cp = 0.239          # J/g-K
        self.UA = 5e4            # J/min-K
        self.Tj_base = 300       # K
        self.Tj_delta = 30        # delta per action level
        self.dt = 1.0            # time per step (min)
        self.max_steps = 200
        self.possible_setpoints = [290,300,310,320,330,340,350,360,370]

        # Normalization ranges
        self.Ca_min = 0.0
        self.Ca_max = 1.2      # expected max around feed conc.
        
        self.T_min = 300.0
        self.T_max = 400.0      # may vary depending on dynamics


        # Action space: 3 discrete cooling settings
        self.action_space = spaces.Discrete(5)

        # Observation space: [Ca, T]
        low_obs = np.array([0.0, 300.0, 300.0], dtype=np.float32)
        high_obs = np.array([2.0, 500.0, 500.0], dtype=np.float32)
        self.observation_space = spaces.Box(low=low_obs, high=high_obs, dtype=np.float32)

        self.state = None
        self.steps = 0

    def step(self, action):
        Ca, T, sp = self.state

        sp = self.setpoints[self.steps]

        # Determine Tj based on action
        if action == 0:
            Tj = self.Tj_base + 2*self.Tj_delta
        elif action == 1:
            Tj = self.Tj_base + self.Tj_delta
        elif action == 2:
            Tj = self.Tj_base
        elif action == 3:
            Tj = self.Tj_base - self.Tj_delta
        elif action == 4:
            Tj = self.Tj_base - 2*self.Tj_delta

        # Integrate ODEs for 1 time step
        def cstr_odes(t, y):
            Ca, T = y
            rA = self.k0 * np.exp(-self.EoverR / T) * Ca
            dCadt = self.q / self.V * (self.Caf - Ca) - rA
            dTdt = (self.q / self.V * (self.Tf - T)
                    + (-self.mdelH) / (self.rho * self.Cp) * rA
                    + self.UA / (self.rho * self.Cp * self.V) * (Tj - T))
            return [dCadt, dTdt]

        sol = solve_ivp(cstr_odes, [0, self.dt], [Ca, T], method='RK45')
        Ca_next, T_next = sol.y[:, -1]

        self.state = np.array([Ca_next, T_next, sp], dtype=np.float32)

        # Reward: penalize deviation from setpoint
        reward = (-abs(T_next - sp))/100

        self.steps += 1
        done = self.steps >= self.max_steps

        obs = self.normalize_state(*self.state)
        return obs, reward, done, {}

    def generate_sectioned_list(self, C, num_sections=10, section_size=20):
        result = []
        for _ in range(num_sections):
            value = random.choice(C)
            section = [value] * section_size
            result.extend(section)
        return result

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.steps = 0
        Ca0 = 1.0 
        T0 = 350.0 + self.np_random.uniform(-5.0, 5.0)  #5
        sp0 = 300.0
        C = self.possible_setpoints
        self.setpoints = self.generate_sectioned_list(C)
        self.state = np.array([Ca0, T0, sp0], dtype=np.float32)
        obs = self.normalize_state(*self.state)
        return obs, {}

    def render(self, mode='human'):
        Ca, T = self.state
        print(f"Step: {self.steps}, Ca: {Ca:.3f}, T: {T:.2f}")

    def normalize_state(self, Ca, T, sp):
        Ca_norm = (Ca - self.Ca_min) / (self.Ca_max - self.Ca_min)
        T_norm = (T - self.T_min) / (self.T_max - self.T_min)
        sp_norm = (sp - self.T_min) / (self.T_max - self.T_min)
        return np.array([Ca_norm, T_norm, sp_norm], dtype=np.float32)

    def close(self):
        pass


In [None]:
import numpy as np
import torch
import tqdm
import gymnasium as gym
import matplotlib.pyplot as plt
import collections
import warnings
import random
import csv

warnings.filterwarnings("ignore")

#Author: Daniel Rangel-Martinez
#Based on the work from: Pascal Poupart 
# Course: CS885 - Reinforcement Learning

# Hyperparameters
MINIBATCH_SIZE = 64
LEARNING_RATE = 5e-5
EPISODES = 2000
GAMMA = 0.99
TRAIN_AFTER_EPISODES = 50
TRAIN_EPOCHS = 5
BUFSIZE = 20_000
TEST_EPISODES = 7
HIDDEN = 100
TARGET_UPDATE_FREQ = 3
STARTING_EPSILON = 1.0
EPSILON_END = 0.01
STEPS_MAX = 500_000

# Torch helper class
class TorchHelper:
    def __init__(self):
        if torch.cuda.is_available():
            self.device = torch.device("gpu")
            print("Working on CUDA")
        else:
            self.device = torch.device("cpu")
            print("Working on CUDA")
    
    def f(self, x):
        return torch.tensor(x, dtype=torch.float32).to(self.device)
    
    def i(self, x):
        return torch.tensor(x, dtype=torch.int32).to(self.device)
    
    def l(self, x):
        return torch.tensor(x, dtype=torch.long).to(self.device)

t = TorchHelper()
DEVICE = t.device

# Replay buffer
class ReplayBuffer:
    def __init__(self, N):
        self.buf = collections.deque(maxlen=N)

    def add(self, s, a, r, s2, d):
        self.buf.append((s, a, r, s2, d))

    def sample(self, n):
        minibatch = random.sample(self.buf, n)
        S, A, R, S2, D = zip(*minibatch)
        return t.f(S), t.l(A), t.f(R), t.f(S2), t.i(D)

# Epsilon-greedy policy
def policy(obs, Q, epsilon):
    obs = t.f(obs).view(-1, OBS_N)
    if np.random.rand() < epsilon:
        act = np.random.randint(ACT_N)
        return act
    else:
        act = torch.argmax(Q(obs)).item()
        return act

# Target network update
def update(target, source):
    for tp, p in zip(target.parameters(), source.parameters()):
        tp.data.copy_(p.data)

# Training update
def update_networks(epi, buf, Q, Qt, OPT):
    S, A, R, S2, D = buf.sample(MINIBATCH_SIZE)
    qvalues = Q(S).gather(1, A.view(-1, 1)).squeeze()
    q2values = Qt(S2).max(dim=1).values
    targets = R + GAMMA * q2values * (1 - D)
    loss = torch.nn.MSELoss()(targets.detach(), qvalues)
    OPT.zero_grad()
    loss.backward()
    OPT.step()

    if epi % TARGET_UPDATE_FREQ == 0:
        update(Qt, Q)

    return loss.item()

# Play one episode
def play_episode(env, Q, epsilon, buf=None, max_steps=100):
    obs, _ = env.reset(seed=123)
    done = False
    states, actions, rewards = [], [], []

    for _ in range(max_steps):
        act = policy(obs, Q, epsilon)
        next_obs, reward, done, _ = env.step(act)

        if buf is not None:
            buf.add(obs, act, reward, next_obs, done)

        states.append(obs)
        actions.append(act)
        rewards.append(reward)

        obs = next_obs
        if done:
            break

    return states, actions, rewards

# Training loop
def train():
    global OBS_N, ACT_N

    env = CSTRCoolingEnv()
    test_env = CSTRCoolingEnv()

    OBS_N = env.observation_space.shape[0]
    ACT_N = env.action_space.n

    Q = torch.nn.Sequential(
        torch.nn.Linear(OBS_N, HIDDEN),
        torch.nn.ReLU(),
        torch.nn.Linear(HIDDEN, HIDDEN),
        torch.nn.ReLU(),
        torch.nn.Linear(HIDDEN, HIDDEN),
        torch.nn.ReLU(),
        torch.nn.Linear(HIDDEN, ACT_N)
    ).to(DEVICE)

    Qt = torch.nn.Sequential(
        torch.nn.Linear(OBS_N, HIDDEN),
        torch.nn.ReLU(),
        torch.nn.Linear(HIDDEN, HIDDEN),
        torch.nn.ReLU(),
        torch.nn.Linear(HIDDEN, HIDDEN),
        torch.nn.ReLU(),
        torch.nn.Linear(HIDDEN, ACT_N)
    ).to(DEVICE)

    OPT = torch.optim.Adam(Q.parameters(), lr=LEARNING_RATE)
    buf = ReplayBuffer(BUFSIZE)
    epsilon = STARTING_EPSILON

    testRs, last25testRs = [], []

    print("Training:")
    pbar = tqdm.trange(EPISODES)
    for epi in pbar:
        _,_,R = play_episode(env, Q, epsilon, buf=buf)

        if epi >= TRAIN_AFTER_EPISODES:
            for _ in range(TRAIN_EPOCHS):
                update_networks(epi, buf, Q, Qt, OPT)

        Rews = []
        for _ in range(TEST_EPISODES):
            _, _, R = play_episode(test_env, Q, epsilon)
            Rews += [sum(R)]
        testRs += [sum(Rews)/TEST_EPISODES]

        if epi  == EPISODES:
            torch.save(Q.state_dict(), f"dqn_actor_3{epi}.pth")

        last25testRs += [sum(testRs[-25:])/len(testRs[-25:])]
        pbar.set_description("R(%g), EPSILON(%g)" % (last25testRs[-1], epsilon)) 

        f = open('rewards', 'a')
        writer = csv.writer(f, lineterminator = '\n')
        writer.writerow([last25testRs[-1]])
        f.close()

        epsilon = STARTING_EPSILON * (EPSILON_END/STARTING_EPSILON)**(epi/EPISODES) #max(EPSILON_END, epsilon - (2.0 / EPISODES))

    pbar.close()
    env.close()
    test_env.close()

    return last25testRs

# Plot results
def plot_arrays(vars, color, label):
    mean = np.mean(vars, axis=0)
    std = np.std(vars, axis=0)
    plt.plot(range(len(mean)), mean, color=color, label=label)
    plt.fill_between(range(len(mean)), mean - std, mean + std, color=color, alpha=0.3)

if __name__ == "__main__":

    open("rewards", "w").close()
    
    curves = []
    for _ in range(1):
        curves.append(train())

    plt.figure(figsize=(12, 6))
    plot_arrays(curves, 'b', 'Rewards')
    plt.xlabel("Episode")
    plt.ylabel("Average Reward")
    plt.title("DQN Learning Curve")
    plt.grid(True)
    plt.legend()
    plt.show()


In [None]:
import numpy as np
import csv
import torch

# Torch helper class
class TorchHelper:
    def __init__(self):
        if torch.cuda.is_available():
            self.device = torch.device("cpu")
            print("Working on CPU")
        else:
            self.device = torch.device("cpu")
            print("Working on CUDA")
    
    def f(self, x):
        return torch.tensor(x, dtype=torch.float32).to(self.device)
    
    def i(self, x):
        return torch.tensor(x, dtype=torch.int32).to(self.device)
    
    def l(self, x):
        return torch.tensor(x, dtype=torch.long).to(self.device)

t = TorchHelper()
DEVICE = t.device

def tester(policy):
    Q = torch.nn.Sequential(
        torch.nn.Linear(OBS_N, HIDDEN),
        torch.nn.ReLU(),
        torch.nn.Linear(HIDDEN, HIDDEN),
        torch.nn.ReLU(),
        torch.nn.Linear(HIDDEN, HIDDEN),
        torch.nn.ReLU(),
        torch.nn.Linear(HIDDEN, ACT_N)
    ).to(DEVICE)

    Q.load_state_dict(policy)

    state, _ = env.reset()
    reward = 0.0
    done = False
    action = 1
    while not done:
        f = open('episode_test', 'a')
        writer = csv.writer(f, lineterminator = '\n')
        writer.writerow([state, reward, done, action])
        f.close()

        state_ = torch.from_numpy(state)
        action = Q(state_)  
        action = torch.argmax(action).item()
        next_state, reward, done, _ = env.step(action)
        state = next_state

if __name__ == "__main__":
    open('episode_test', 'w').close()             
    HIDDEN = 100       
    policy = torch.load("dqn_actor_32000.pth")
    env = CSTRCoolingEnv()
    OBS_N = env.observation_space.shape[0]
    ACT_N = env.action_space.n
    tester(policy)


In [None]:
import matplotlib.pyplot as plt

Ca_min, Ca_max = 0.0, 1.2
T_min, T_max = 300.0, 400.0

def unnormalize_state(ca_norm, t_norm, sp_norm):
    ca = ca_norm * (Ca_max - Ca_min) + Ca_min
    t = t_norm * (T_max - T_min) + T_min
    sp = sp_norm * (T_max - T_min) + T_min
    #Tj = tj_norm * (T_max - T_min) + T_min
    return ca, t, sp


states_Ca = []
states_T = []
rewards = []
actions = []
setpoints = []

with open("episode_test", "r") as file:
    for line in file:
        parts = line.strip().split("],")
        state_str = parts[0].replace("[", "")
        rest = parts[1].split(",", 3)

        reward = float(rest[0])
        action = int(rest[2])


        ca_norm, t_norm, sp = map(float, state_str.strip().split())
        ca, t, sp = unnormalize_state(ca_norm, t_norm, sp)

        states_Ca.append(ca)
        states_T.append(t)
        rewards.append(reward)
        actions.append(action)
        setpoints.append(sp)

#plt.figure(figsize=(15, 12))

# 1. Ca
plt.plot(states_Ca, label="Ca (mol/L)")
plt.title("Concentration")
plt.ylabel("Ca (mol/L)")
plt.xlabel("Time Step (min)")
plt.grid(True)
plt.legend()
plt.show()

# 3. Reward
plt.plot(rewards, label="Reward", color='green')
plt.title("Rewards obtained at each step")
plt.ylabel("Reward")
plt.xlabel("Time Step (min)")
plt.grid(True)
plt.legend()
plt.show()
# 4. Tj (Jacket Temp)

# 5. Tj (Jacket Temp)
plt.plot(states_T, label="Temperature (K)", color='orange')
plt.plot(setpoints, label="Setpoint", color='green')
plt.title("Temperature of the Reactor vs Setpoint")
plt.ylabel("Temperature (K)")
plt.xlabel("Time Step (min)")
plt.grid(True)
plt.legend()
plt.show()


        