In [8]:
import gymnasium as gym
import matplotlib.pyplot as plt
import numpy as np
from gymnasium import spaces

class SDEEnv_train(gym.Env):
    """ will remove the stoichastic part of the system """
    def __init__(self):
        super(SDEEnv_train, self).__init__()
        # State is [y1, y2]
        self.observation_space = spaces.Box(low=0, high=np.inf, shape=(2,), dtype=np.float32)
        
        # Actions are [u1, u2], both in some control range
        self.action_space = spaces.Box(low=0, high=10, shape=(2,), dtype=np.float32)
        
        # Time step for numerical integration
        self.dt = 0.001
        
        # Initial values for state variables y1 and y2
        self.state = np.array([1, 1])  # You can set this based on the problem
        
    def reset(self,seed = None,options = None):
        # Reset the state to initial values
        self.state = np.array([0.1, 0.1]) #initial value can be changed, the bigger the value helps the model, but 0 is optimal
        
        return self.state

    def step(self, action):
        u1, u2 = action
        y1, y2 = self.state
        
        dt = self.dt
        
        # Deterministic part of the system (first equation)
        #dy1 = -(u1 + 0.5 * u1**2 * y1 + 0.5 * u2 * y2 / (y1 + y2)) * dt
        dy1 = ( -1*(u1 + 0.5 * u1**2 )* y1 + 0.5 * u2 * y2 / (y1 + y2)) * dt

        # Stochastic part of the second equation
        dW = np.random.normal(0, np.sqrt(dt))  # Wiener process for stochastic term
        dy2 = (u1 * y1 - 0.7 * u2 * y1) * dt + (0.1 * np.sqrt(y1)) 
        
        
        # Update states
        y1 += dy1
        y2 += dy2
        
        # Ensure non-negative concentrations
        #y1 = max(0, y1)
        #y2 = max(0, y2)
        
        self.state = np.array([y1, y2])
        
        # Reward is based on maximizing y2
        reward = y2*10
        
        # Done if the system has run too long or if values go out of bounds
        done = False
        if y1 < .01 or y2 < .01:
            reward = -1000
            done = True
        
        return self.state, reward, done, False, {}

    def render(self):
        # Optional rendering for visualization, not essential
        print(f"State: y1={self.state[0]}, y2={self.state[1]}")


In [4]:

class SDEEnv_train_2(gym.Env):
    """ nothing will change ,expcep that its stoichastic """
    def __init__(self):
        super(SDEEnv_train_2, self).__init__()
        # State is [y1, y2]
        self.observation_space = spaces.Box(low=0, high=np.inf, shape=(2,), dtype=np.float32)
        
        # Actions are [u1, u2], both in some control range
        self.action_space = spaces.Box(low=0, high=10, shape=(2,), dtype=np.float32)
        
        # Time step for numerical integration
        self.dt = 0.001
        
        # Initial values for state variables y1 and y2
        self.state = np.array([1, 1])  
        
    def reset(self,seed = None,options = None):
        # Reset the state to initial values
        self.state = np.array([0.1, 0.1])
        return self.state

    def step(self, action):
        u1, u2 = action
        y1, y2 = self.state
        
        dt = self.dt
        
        # Deterministic part of the system (first equation)
        dy1 = ( -1*(u1 + 0.5 * u1**2 )* y1 + 0.5 * u2 * y2 / (y1 + y2) ) * dt
        
        # Stochastic part of the second equation
        dW = np.random.normal(0, np.sqrt(dt))  # Wiener process for stochastic term
        dy2 = (u1 * y1 - 0.7 * u2 * y1) * dt + (0.1 * np.sqrt(y1) ) * dW
        
        
        # Update states
        y1 += dy1
        y2 += dy2
        
        # Ensure non-negative concentrations
        #y1 = max(0, y1)
        #y2 = max(0, y2)
        
        self.state = np.array([y1, y2])
        
        # Reward is based on maximizing y2
        reward = y2*100
        
        # Done if the system has run too long or if values go out of bounds
        done = False
        if y1 < 0 or y2 < 0:
            reward = -1000
            done = True
        
        return self.state, reward, done, False, {}

    def render(self):
        # Optional rendering for visualization, not essential
        print(f"State: y1={self.state[0]}, y2={self.state[1]}")


In [10]:
env=SDEEnv_train()
env.dt

0.001

In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Normal

# Environment representing the simplified model
class PhotoProductionEnv(gym.Env):
    def __init__(self):
        super(PhotoProductionEnv, self).__init__()
        
        # Time step (10 intervals over a dimensionless time of 1)
        self.dt = 0.1
        self.n_steps = 10
        
        # Define state and action spaces
        self.observation_space = spaces.Box(low=0, high=np.inf, shape=(2,), dtype=np.float32)
        self.action_space = spaces.Box(low=0, high=5, shape=(2,), dtype=np.float32)
        
        # Initial state values
        self.state = np.array([1.0, 1.0])
        self.t = 0
        
    def reset(self):
        # Reset states (random initial state)
        self.state = np.array([0.1, 0.1])
        self.t = 0
        return self.state
    
    def step(self, action):
        u1, u2 = action
        y1, y2 = self.state
        
        dt = self.dt
        
        # Equations (19) and (20)
        dy1 = (-1 * (u1 + 0.5 * u1**2) * y1 + u2) * dt
        dy2 = (u1 * y1 - u2 * y1) * dt
        
        # Add Gaussian noise with mean 0 and std deviation 0.02
        noise = np.random.normal(0, 0.02, size=2)
        y1 = max(0, y1 + dy1 + noise[0])
        y2 = max(0, y2 + dy2 + noise[1])
        
        self.state = np.array([y1, y2])
        self.t += 1
        
        # Reward is based on maximizing y2 at final time step
        reward = 0
        done = False
        if self.t >= self.n_steps:
            reward = y2
            done = True
        
        return self.state, reward, done, False, {}

# RNN Policy Network for each control action (u1 and u2)
class RNNPolicyNetwork(nn.Module):
    def __init__(self, input_dim=2, hidden_dim=20, output_dim=1):
        super(RNNPolicyNetwork, self).__init__()
        self.rnn = nn.RNN(input_dim, hidden_dim, num_layers=2, nonlinearity='tanh', batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x, hidden):
        out, hidden = self.rnn(x, hidden)
        out = self.fc(out)
        return out, hidden
    
    def init_hidden(self, batch_size=1):
        return torch.zeros(2, batch_size, 20)

# Train the RNN policies using REINFORCE algorithm
def train_policy(env, n_epochs=100, n_episodes=800, lr=1e-2):
    # Initialize policy networks for u1 and u2
    policy_u1 = RNNPolicyNetwork()
    policy_u2 = RNNPolicyNetwork()
    
    # Optimizers for both policies
    optimizer_u1 = optim.Adam(policy_u1.parameters(), lr=lr)
    optimizer_u2 = optim.Adam(policy_u2.parameters(), lr=lr)
    
    # Training loop
    for epoch in range(n_epochs):
        total_rewards = []
        for episode in range(n_episodes):
            state = env.reset()
            hidden_u1 = policy_u1.init_hidden()
            hidden_u2 = policy_u2.init_hidden()
            
            rewards = []
            log_probs_u1 = []
            log_probs_u2 = []
            
            for t in range(env.n_steps):
                state_tensor = torch.FloatTensor(state).unsqueeze(0).unsqueeze(0)  # Batch and time dimension
                
                # Policy for u1
                action_u1, hidden_u1 = policy_u1(state_tensor, hidden_u1)
                action_u1 = torch.clamp(action_u1, 0, 5)  # Constraint action to [0, 5]
                dist_u1 = Normal(action_u1, 0.1)  # Action distribution with small std
                u1 = dist_u1.sample()
                log_prob_u1 = dist_u1.log_prob(u1)
                
                # Policy for u2
                action_u2, hidden_u2 = policy_u2(state_tensor, hidden_u2)
                action_u2 = torch.clamp(action_u2, 0, 5)
                dist_u2 = Normal(action_u2, 0.1)
                u2 = dist_u2.sample()
                log_prob_u2 = dist_u2.log_prob(u2)
                
                # Step environment
                action = [u1.item(), u2.item()]
                next_state, reward, done, _, _ = env.step(action)
                
                rewards.append(reward)
                log_probs_u1.append(log_prob_u1)
                log_probs_u2.append(log_prob_u2)
                
                state = next_state
                if done:
                    break
            
            # Compute returns (reward to go)
            returns = []
            R = 0
            for r in reversed(rewards):
                R = r + 0.99 * R  # Discount factor
                returns.insert(0, R)
            returns = torch.tensor(returns)
            
            # Update policy networks using REINFORCE
            optimizer_u1.zero_grad()
            optimizer_u2.zero_grad()
            
            loss_u1 = -torch.stack(log_probs_u1) * returns
            loss_u2 = -torch.stack(log_probs_u2) * returns
            
            loss_u1.sum().backward()
            loss_u2.sum().backward()
            
            optimizer_u1.step()
            optimizer_u2.step()
            
            total_rewards.append(sum(rewards))
        
        # Average reward per epoch
        print(f"Epoch {epoch + 1}, Average Reward: {np.mean(total_rewards)}")

# Main Execution
if _name_ == "_main_":
    env = PhotoProductionEnv()
    train_policy()