Here, we code the analytical solution we have derived earlier and apply it to the one molecule environment. Our solution is that our input is the integer $A_k = B_k - N(1 - p)$, where $B_k$ is the target, $N$ is the current number of molecules, and $p$ is the probability a molecule dies by the next timestep. This is summarized as follows 

In [2]:
import numpy as np

def optimal_action_th(molecules, target, molecule_lifetime, dt):
    prob_death = np.exp(-dt/molecule_lifetime)
    optimal_action = target - molecules*(1 - prob_death)
    return optimal_action

We attach this to our environment and simulate.

In [6]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn.functional as F
from torch import nn
from torch.optim import Adam
from collections import deque
import time
import random

from gymnasium import spaces

class OneMoleculeEnv(gym.Env):
    def __init__(self, initial_value=10, molecule_lifetime = 1, dt = 0.1, max_steps=100, history_length=5, target_value=10, obs_cap=100, render_mode=None):
        super(OneMoleculeEnv, self).__init__()

        self.initial_value = initial_value
        self.molecule_lifetime = molecule_lifetime
        self.dt = dt
        self.max_steps = max_steps
        self.history_length = history_length
        self.target_value = target_value
        self.obs_cap = obs_cap  # Cap for the observation space
        self.render_mode = render_mode

        self.current_value = initial_value
        self.current_step = 0

        # Define action and observation space
        self.action_space = spaces.Discrete(obs_cap)  # the number of molecules to send in ranges from 0 to the cap
        self.observation_space = spaces.Box(
            low=0, high=obs_cap, shape=(history_length,), dtype=np.float32
        )

        # Initialize the history of values
        self.history = np.full(history_length, initial_value, dtype=np.float32)

        # Pre-generate random numbers to avoid generating them at each step
        self.random_numbers = np.random.rand(max_steps)

    def reset(self, seed=None, options=None):
        if seed is not None:
            np.random.seed(seed)
            self.random_numbers = np.random.rand(self.max_steps)  # Re-generate random numbers if seeded
        self.current_value = self.initial_value
        self.current_step = 0
        self.history = np.full(self.history_length, self.initial_value, dtype=np.float32)
        return self.history, {}

    def _ensure_random_numbers(self):
        # Reset the random numbers if current_step exceeds max_steps
        if self.current_step >= self.max_steps:
            self.random_numbers = np.random.rand(self.max_steps)
            self.current_step = 0  # Reset current step to start fresh

    def step(self, action):
        # Apply action
        #if action == 1:
        # Ensure enough random numbers are available
        self._ensure_random_numbers()

        # Decrease the value with a number drawn from a binomial distribution
        # This reflects each molecule having a finite probability of decaying in the timestep
        # The probability a molecule has decayed in the time interval given is
        self.prob_death = np.exp(-self.dt/self.molecule_lifetime)

        #print ("death number ", -np.random.binomial(self.current_value, self.prob_death, 1))
        
        self.current_value += -np.random.binomial((int)(self.current_value), self.prob_death, 1)
        
        # We then add in molecules
        self.current_value += action

        # Cap the current value to obs_cap
        self.current_value = min(self.current_value, self.obs_cap)

        # Update the history using a simple list rotation
        self.history[:-1] = self.history[1:]
        self.history[-1] = np.float32(self.current_value)  # Ensure it's a scalar

        # Increment step count
        self.current_step += 1

        # Check if done
        done = self.current_step >= self.max_steps

        # Calculate reward
        reward = -float((self.current_value - self.target_value) ** 2)  # Ensure the reward is a float

        return self.history, reward, done, done, {}

    def render(self):
        if self.render_mode == 'human':
            print(f"Step: {self.current_step}, Value: {self.current_value}, History: {self.history}")

    def close(self):
        pass

# Register the environment
gym.envs.registration.register(
    id='OneMoleculeEnv-v0',
    entry_point=OneMoleculeEnv,
    max_episode_steps=1000000,
)

def Optimal_Simulation(steps=10000, target = 20, molecule_lifetime = 1, dt = 0.5, RUN_SEED = 0):
    ##Here we choose our environment
    env = gym.make('OneMoleculeEnv-v0', initial_value=target, molecule_lifetime = molecule_lifetime, dt = dt, max_steps=steps, history_length=1, target_value=target, obs_cap=100, render_mode=None)
    
    rewards = []
    molecule_numbers = []
    deaths = []
    actions = []
    RUN_SEED = 0
    
    observation, info = env.reset(seed=RUN_SEED)
    
    for step in range(steps):

        ##at step k, we have the current number of molecules, the optimal action, and we can calculate how many died this step using the next step
        ##the next step is the previous number of molecules, previous inputs, and deaths between the two steps
        action = optimal_action_th(observation, target, molecule_lifetime, dt)
        rounded_action = (int)(np.round(action))
        prev_molecule = observation[0]
        observation, reward, done, truncated, info = env.step(rounded_action)
        #print ("Reward ", reward, " Observation ", observation, " Action ", action)

        death_molecule = observation[0] - (prev_molecule + rounded_action)
        
        #prev_molecule - observation[0] - rounded_action
        rewards.append(reward)
        molecule_numbers.append(prev_molecule)
        deaths.append(death_molecule)
        actions.append((int)(np.round(action)))
    return rewards, molecule_numbers, deaths, actions

##We have a theory for the result. Given these parameters
steps = 1000
target = 40
molecule_lifetime = 1
dt = 0.5

prob_death = np.exp(-dt/molecule_lifetime)

##The analytical answer is approximately
def analytical_var(target, molecule_lifetime, dt):
    prob_death = np.exp(-dt/molecule_lifetime)
    print ("The probability of a molecular decay is ", prob_death)
    return target*prob_death/(1 - prob_death)

rewards, molecule_numbers, deaths, actions = Optimal_Simulation(steps, target, molecule_lifetime, dt, RUN_SEED = 0)

#print (deaths)

##Lets check all our formulas from the start
avg_death = np.sum(deaths)/steps
avg_molecule = np.sum(molecule_numbers)/steps
avg_input = np.sum(actions)/steps

print ("The averages are ", avg_molecule, ' ', avg_death, ' ',avg_input)

total = molecule_numbers + deaths + actions

vardeath = 0
varmolecule = 0
varinput = 0
covdeathmol = 0
covdeathin = 0
covmolein = 0

var_total = 0 

for cyc in range(steps):
    vardeath += (deaths[cyc] - avg_death)**2
    varmolecule += (molecule_numbers[cyc] - avg_molecule)**2
    varinput += (actions[cyc] - avg_input)**2
    covdeathmol += (deaths[cyc] - avg_death)*(molecule_numbers[cyc] - avg_molecule)
    covdeathin += (deaths[cyc] - avg_death)*(actions[cyc] - avg_input)
    covmolein += (molecule_numbers[cyc] - avg_molecule)*(actions[cyc] - avg_input)
    var_total+= (total[cyc] - (avg_molecule + avg_death + avg_input))**2

vardeath = vardeath/steps
varmolecule = varmolecule/steps
varinput = varinput/steps
covdeathmol = covdeathmol/steps
covdeathin = covdeathin/steps
covmolein = covmolein/steps
var_total = var_total/steps

print ("The variances are (mol, death, input) ", varmolecule, ' ', vardeath, ' ', varinput)
print ("The covariances are (ND, DA, NA) ", covdeathmol, ' ', covdeathin, ' ', covmolein)
print ("The total variance is ", var_total)

##Time to test some theoretical assumptions
print ("Var A = Var N (1 - p)^2 ", varinput , ' = ', varmolecule*(1 - prob_death)**2)
print ("Cov(N, A) = -Var N (1 - p) ", covmolein , ' = ', - varmolecule*(1 - prob_death))
print ("Cov(D, A) = -Cov(D, N)(1 - p) ", covdeathin , ' = ', - covdeathmol*(1 - prob_death))

##These formulas are pretty good so far
print ("VarD + VarA + 2 CovND + 2 Cov NA + 2 Cov DA = 0 ", vardeath + varinput + 2*covdeathmol + 2*covmolein + 2*covdeathin, ' close to 0 as expected')

##This formula also checked out well.
##next we check some more...

print ("VarN = VarD + 2Cov(ND)p/(1 - p) ", varmolecule , ' = ', (vardeath + 2*covdeathmol*prob_death)/(1 - prob_death**2), ' this is also fine')
#This has a large problem. Let's go through the earlier steps
print ("0 = VarD + Var(N)(p^2 - 1) + 2Cov(ND)p ", vardeath + varmolecule*(prob_death**2 - 1) + 2*covdeathmol*prob_death)

##So the theory matches well. The problem is our predictions for the variance of the death and the covariance aren't correct. We'll check that here
var_death_alt = target*prob_death*(1 - prob_death)
print ("The two expressions for the death variance are ", vardeath, '(exact) = ', var_death_alt , ' (theory)')
print ('We see we underestimate the variance fo the death rate, and are not properly estimating the covariance between the death and molecule count')

##This expression has a problem because it's the covariance of N with the previous D, not the same D, so there is an odd correlation and such...
print ("covad is vard/(1 - p) ", covdeathmol, ' = ', vardeath/(1 - prob_death))
print ("An expression for covnd is vard/(1 - p) ", covdeathmol, ' = ', vardeath/(1 - prob_death))

#print ("The molecular numbers are ", molecule_numbers, ' with len ', len(molecule_numbers))
#print ("The molecular decays are ", deaths, ' with len ', len(deaths))
#print ("The molecular inputs are ", actions, ' with len ', len(actions))
#print ("The total is ", total, ' with len ', len(total))

##The theory suggests
#var_alt = (death_var - 2*cov_d_n*prob_death)/(1 - prob_death)

#other_var = np.sum((np.array(molecule_numbers) - 20)**2)/steps

other_var2 = np.var(molecule_numbers)

print ("The average variance was ", average_var)
#print ("The average variance was ", other_var)
print ("The average variance was ", other_var2)
#print ("Theoretical variance gives ", var_alt)


print ("The theoretical value was ", analytical_var(target, molecule_lifetime, dt))



The averages are  40.077   -24.179   24.179
The variances are (mol, death, input)  10.551071000000084   13.77695899999998   1.7329590000000006
The covariances are (ND, DA, NA)  -5.8882169999999965   2.3100409999999982   -4.176783000000033
The total variance is  10.551071000000084
Var A = Var N (1 - p)^2  1.7329590000000006  =  1.6334969946305544
Cov(N, A) = -Var N (1 - p)  -4.176783000000033  =  -4.1515229456951985
Cov(D, A) = -Cov(D, N)(1 - p)  2.3100409999999982  =  2.3168328584588553
VarD + VarA + 2 CovND + 2 Cov NA + 2 Cov DA = 0  -8.171241461241152e-14  close to 0 as expected
VarN = VarD + 2Cov(ND)p/(1 - p)  10.551071000000084  =  10.495135183092517  this is also fine
0 = VarD + Var(N)(p^2 - 1) + 2Cov(ND)p  -0.0353581798421434
The two expressions for the death variance are  13.77695899999998 (exact) =  9.546048741647644  (theory)
We see we underestimate the variance fo the death rate, and are not properly estimating the covariance between the death and molecule count
covad is vard

  rounded_action = (int)(np.round(action))
  self.history[-1] = np.float32(self.current_value)  # Ensure it's a scalar
  reward = -float((self.current_value - self.target_value) ** 2)  # Ensure the reward is a float
  actions.append((int)(np.round(action)))
  self.current_value += -np.random.binomial((int)(self.current_value), self.prob_death, 1)


NameError: name 'average_var' is not defined