In [1]:
import os
import numpy as np
import torch as T 
import torch.nn as nn
import torch.optim as optim
from torch.distributions.categorical import Categorical

In [8]:
import gym
from gym import spaces
import numpy as np
import torch
import torch.nn as nn

class PIDEnv(gym.Env):
    def __init__(self, predictive_model_path):
        super(PIDEnv, self).__init__()
        
        # Define the observation space
        self.observation_space = spaces.Dict({
            'glucose': spaces.Box(low=0, high=300, shape=(1,), dtype=np.float32),
            'insulin': spaces.Box(low=0, high=10, shape=(1,), dtype=np.float32),
            'meal_size': spaces.Box(low=0, high=200, shape=(1,), dtype=np.float32),
        })
        
        # Define the action space
        self.action_space = spaces.Box(low=0, high=10, shape=(1,), dtype=np.float32)
        
        # Define the initial state
        self.state = {
            'glucose': None,
            'insulin': 0,
            'meal_size': 0,
        }
        
        # Define target glucose range
        self.target_range = (80, 110)
        
        # Load the predictive model
        self.predictive_model = self.load_predictive_model(predictive_model_path)
        self.predictive_model.eval()
    
    def step(self, action):
        # Simulate environment dynamics based on the action
        
        # Use the predictive model to get the predicted glucose
        with torch.no_grad():
            predicted_glucose = self.predictive_model(torch.tensor([[self.state['glucose'], self.state['insulin'], self.state['meal_size']]]).float()).item()
        
        # Update glucose level based on insulin action
        self.state['glucose'] -= 0.1 * action
        
        # Update glucose level based on meal intake and predicted glucose
        self.state['glucose'] += 0.5 * self.state['meal_size'] + (predicted_glucose - self.state['glucose'])
        
        # Calculate reward based on deviation from target glucose range
        reward = self.calculate_reward(self.state['glucose'])
        
        # Update state
        self.state['insulin'] = action
        
        # Terminal condition: episode ends when glucose level is out of [60, 300]
        done = False
        if self.state['glucose'] < 60 or self.state['glucose'] > 300:
            done = True

        # Clip glucose level to ensure it remains within a reasonable range
        self.state['glucose'] = np.clip(self.state['glucose'], 60, 300)

        # Ensure all state values are single numeric values
        self.state['glucose'] = np.asscalar(self.state['glucose'])
        self.state['insulin'] = np.asscalar(self.state['insulin'])
        self.state['meal_size'] = np.asscalar(self.state['meal_size'])

        # Convert state dictionary to a NumPy array
        state_array = np.array([self.state['glucose'], self.state['insulin'], self.state['meal_size']])

        return state_array, reward, done, {}

    def reset(self):
        # Reset the environment to initial state
        
        # Initialize glucose level using a default value or some other logic
        initial_glucose = np.random.uniform(80, 120)
        self.state['glucose'] = initial_glucose
        
        # Update target glucose range based on initial glucose level
        self.target_range = (initial_glucose - 10, initial_glucose + 10)
        
        return self.state
    
    def calculate_reward(self, glucose):
        # Calculate reward based on deviation from target glucose range
        if glucose >= self.target_range[0] and glucose <= self.target_range[1]:
            return 1
        else:
            return -1
    
    def load_predictive_model(self, predictive_model_path):
        # Define the same model architecture
        model = nn.Sequential(
            nn.Linear(3, 20),
            nn.ReLU(),
            nn.Linear(20, 10),
            nn.ReLU(),
            nn.Linear(10, 1)
        )
        
        # Load model state dictionary
        x = torch.load(predictive_model_path)
        model.load_state_dict(x['model_state_dict'])
        
        return model


# Example usage
predictive_model_path = "model.pth"  # Path to the trained PyTorch predictive model
env = PIDEnv(predictive_model_path)


In [9]:
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import VecNormalize
from stable_baselines3.common.evaluation import evaluate_policy


predictive_model_path = 'model.pth'

# Step 1: Define and instantiate your custom environment
env = PIDEnv(predictive_model_path)

# Step 3: Choose a reinforcement learning algorithm and instantiate it with your custom environment
model = PPO("MultiInputPolicy", env, verbose=1)

# Step 4: Train the agent
model.learn(total_timesteps=10000)

# Step 5: Evaluate the trained agent
mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)
print(f"Mean reward: {mean_reward}, Std reward: {std_reward}")


Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


AttributeError: module 'numpy' has no attribute 'asscalar'