In [None]:
# Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved This software is distributed under the MIT License, Contact: Alessandro Fanfarillo

In [1]:
import gym
from gym import spaces
import numpy as np

class DynamicSnowDensityEnv(gym.Env):
    def __init__(self):
        super(DynamicSnowDensityEnv, self).__init__()
        
        # Action space: Adjust alpha_T, alpha_u, and alpha_h (scaling factors)
        self.action_space = spaces.Box(low=np.array([0, 0, 0]), high=np.array([5, 20, 5]), dtype=np.float32)
        
        # Observation space: Temperature, Wind Speed, Humidity
        self.observation_space = spaces.Box(low=np.array([-40, 0, 0]), high=np.array([0, 20, 100]), dtype=np.float32)
        
        # Initialize environment variables (temperature, wind speed, and humidity)
        self.state = np.array([0, 5, 80])  # Initial temperature, wind speed, and humidity
        self.rho_min = 100  # Minimum snow density (kg/m³)
        self.T_ref = 0  # Reference temperature
    
    def reset(self):
        """
        Resets the environment to a random initial state (random weather conditions).
        """
        self.state = np.array([np.random.uniform(-20, 0),  # Random temperature
                               np.random.uniform(0, 20),   # Random wind speed
                               np.random.uniform(0, 100)]) # Random humidity
        return self.state

    def step(self, action):
        """
        Takes an action (adjusts alpha_T, alpha_u, alpha_h) and returns the next state, reward, and done flag.
        """
        # Unpack action into the scaling factors
        alpha_T, alpha_u, alpha_h = action
        
        # Unpack the current state (environmental variables)
        T, u, h = self.state
        
        # Predicted snow density based on the chosen parameters
        predicted_density = self.rho_min + alpha_T * (self.T_ref - T) + alpha_u * u + alpha_h * h
        
        # Simulated observed snow density (real-world value) based on a true model
        real_density = self.get_real_snow_density(T, u, h)
        
        # Reward is the negative absolute error (agent tries to minimize this)
        reward = -abs(predicted_density - real_density)
        
        # Change the weather conditions dynamically for the next step
        self.state = np.array([np.random.uniform(-20, 0),  # New temperature
                               np.random.uniform(0, 20),   # New wind speed
                               np.random.uniform(0, 100)]) # New humidity
        
        done = False  # Environment doesn't end (continuous)
        return self.state, reward, done, {}
    
    def get_real_snow_density(self, T, u, h):
        """
        Simulate the 'true' snow density based on real-world data or a known relationship.
        You can replace this with a real-world dataset or a more sophisticated model.
        """
        # Example fixed parameters for the true model
        alpha_T_true = 1.5
        alpha_u_true = 10
        alpha_h_true = 0.5
        
        # True snow density based on environmental conditions
        true_density = self.rho_min + alpha_T_true * (self.T_ref - T) + alpha_u_true * u + alpha_h_true * h
        
        # Clip the density to physical limits (100 to 600 kg/m³)
        return np.clip(true_density, 100, 600)

In [2]:
from stable_baselines3 import PPO

# Create the dynamic snow density environment
env = DynamicSnowDensityEnv()

# Create the PPO model
model = PPO("MlpPolicy", env, gamma=0, verbose=1)

# Train the model
model.learn(total_timesteps=300000)

# Save the trained model
model.save("dynamic_snow_density_model")

  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




-----------------------------
| time/              |      |
|    fps             | 1636 |
|    iterations      | 1    |
|    time_elapsed    | 1    |
|    total_timesteps | 2048 |
-----------------------------
-----------------------------------------
| time/                   |             |
|    fps                  | 1216        |
|    iterations           | 2           |
|    time_elapsed         | 3           |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.039030924 |
|    clip_fraction        | 0.342       |
|    clip_range           | 0.2         |
|    entropy_loss         | -4.3        |
|    explained_variance   | 0.00196     |
|    learning_rate        | 0.0003      |
|    loss                 | 7.19e+03    |
|    n_updates            | 10          |
|    policy_gradient_loss | -0.0552     |
|    std                  | 1.02        |
|    value_loss           | 1.54e+04    |
----------------------------------

In [3]:
def fixed_parameters_estimate(T, u, h):
    rho_min = 100  # Minimum snow density (kg/m³)
    T_ref = 0  # Reference temperature
    alpha_T_true_fixed = 1.4
    alpha_u_true_fixed = 9.0 #4.0
    alpha_h_true_fixed = 0.2 #0.1
    true_density = rho_min + alpha_T_true_fixed * (T_ref - T) + alpha_u_true_fixed * u + alpha_h_true_fixed * h
    return np.clip(true_density, 100, 600)

def get_real_snow_density(T, u, h):
        """
        Simulate the 'true' snow density based on real-world data or a known relationship.
        You can replace this with a real-world dataset or a more sophisticated model.
        """
        rho_min = 100  # Minimum snow density (kg/m³)
        T_ref = 0  # Reference temperature
        # Example fixed parameters for the true model
        alpha_T_true = 1.5
        alpha_u_true = 10
        alpha_h_true = 0.5
        
        # True snow density based on environmental conditions
        true_density = rho_min + alpha_T_true * (T_ref - T) + alpha_u_true * u + alpha_h_true * h
        
        # Clip the density to physical limits (100 to 600 kg/m³)
        return np.clip(true_density, 100, 600)

# Load the trained model
model = PPO.load("dynamic_snow_density_model")

# Reset the environment to start the evaluation
obs = env.reset()

total_reward = 0
total_fixed_reward = 0

# Run the model for several steps to see how it adjusts the parameters in response to dynamic conditions
for i in range(50):
    # Get action (parameter adjustments) from the trained model
    action, _states = model.predict(obs)
    real_density = get_real_snow_density(obs[0],obs[1],obs[2])
    predicted_fixed_density = fixed_parameters_estimate(obs[0],obs[1],obs[2])
    # Step the environment with the chosen action
    obs, reward, done, info = env.step(action)
    fixed_reward = -abs(predicted_fixed_density - real_density)

    total_reward += reward
    total_fixed_reward += fixed_reward
    # Print out the state, action, and reward
    print(f"State: {obs}, Action (alpha_T, alpha_u, alpha_h): {action}, Reward: {reward}")
    print(f"Total reward: {total_reward}, total fixed reward: {total_fixed_reward}")
    if done:
        obs = env.reset()  # Reset environment if done (though we're not using 'done' in this case)

State: [-2.93363506  9.96611839  3.80367113], Action (alpha_T, alpha_u, alpha_h): [0.8437308 0.        2.275432 ], Reward: -4.317366908606601
Total reward: -4.317366908606601, total fixed reward: -26.500789883170114
State: [-11.47750377   7.44232678  58.50893838], Action (alpha_T, alpha_u, alpha_h): [3.1136553 9.175301  1.6221595], Reward: -0.7831495922036993
Total reward: -5.100516500810301, total fixed reward: -37.90137311222722
State: [-14.27440985  14.11609417  80.51903129], Action (alpha_T, alpha_u, alpha_h): [0.44853973 0.         1.8797525 ], Reward: -5.76355194979385
Total reward: -10.864068450604151, total fixed reward: -64.04413177883359
State: [-9.5860498   1.05716399 54.56860538], Action (alpha_T, alpha_u, alpha_h): [0.8859652 0.        2.3215256], Reward: -3.258450990551978
Total reward: -14.122519441156129, total fixed reward: -103.74337631934262
State: [-10.7793952   16.76313194  26.58601845], Action (alpha_T, alpha_u, alpha_h): [0.        0.        0.7882787], Reward: -