# TD3 trainer
this is the notebook I used for executing TD3 in MATLAB. full python/pytorch, no weird dependencies

In [1]:
import torch
print(torch.cuda.is_available())

True


In [2]:
# train_td3_with_matlab_env.py

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
# from torch.optim.lr_scheduler import StepLR # Uncomment if you use schedulers
import numpy as np
import random
import time
import matlab.engine # Import the MATLAB Engine API
import os
import sys
import collections # For ReplayBuffer if using deque

In [3]:
print("Starting Python script for TD3 training with MATLAB Environment...")

# --- Device Setup ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using PyTorch device: {device}")

Starting Python script for TD3 training with MATLAB Environment...
Using PyTorch device: cuda


In [4]:
# --- PyTorch Class Definitions ---
# (Copy EXACTLY from your working train_notebook_td3.py/ipynb)

class ReplayBuffer:
    # Using deque for potentially better efficiency than list slicing
    def __init__(self, max_size=5e5):
        self.buffer = collections.deque(maxlen=int(max_size))
        self.max_size = int(max_size)
        # self.size is implicitly len(self.buffer) with deque

    def add(self, transition):
        # transition is tuple of (state, action, reward, next_state, done_float)
        self.buffer.append(transition)

    def sample(self, batch_size):
        indexes = np.random.randint(0, len(self.buffer), size=batch_size)
        state, action, reward, next_state, done = [], [], [], [], []

        for i in indexes:
            s, a, r, s_, d = self.buffer[i]
            state.append(np.array(s, copy=False))
            action.append(np.array(a, copy=False))
            reward.append(np.array(r, copy=False))
            next_state.append(np.array(s_, copy=False))
            done.append(np.array(d, copy=False))

        return np.array(state), np.array(action), np.array(reward), np.array(next_state), np.array(done)

    def __len__(self):
        return len(self.buffer)


class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, max_action_val): # Renamed max_action -> max_action_val
        super(Actor, self).__init__()
        self.l1 = nn.Linear(state_dim, 400)
        self.l2 = nn.Linear(400, 300)
        self.l3 = nn.Linear(300, action_dim)
        # self.max_action_val = max_action_val # Store if needed, but not used in forward

    def forward(self, state):
        a = F.relu(self.l1(state))
        a = F.relu(self.l2(a))
        a = torch.tanh(self.l3(a)) # Output scaled to [-1, 1]
        return a

class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Critic, self).__init__()
        # Q1 architecture
        self.l1 = nn.Linear(state_dim + action_dim, 400)
        self.l2 = nn.Linear(400, 300)
        self.l3 = nn.Linear(300, 1)
        # Q2 architecture
        self.l4 = nn.Linear(state_dim + action_dim, 400)
        self.l5 = nn.Linear(400, 300)
        self.l6 = nn.Linear(300, 1)

    def forward(self, state, action):
        sa = torch.cat([state, action], 1)
        q1 = F.relu(self.l1(sa))
        q1 = F.relu(self.l2(q1))
        q1 = self.l3(q1)

        q2 = F.relu(self.l4(sa))
        q2 = F.relu(self.l5(q2))
        q2 = self.l6(q2)
        return q1, q2

    def Q1(self, state, action): # Helper for actor loss
        sa = torch.cat([state, action], 1)
        q1 = F.relu(self.l1(sa))
        q1 = F.relu(self.l2(q1))
        q1 = self.l3(q1)
        return q1


class TD3:
    def __init__(self, lr_actor, lr_critic, state_dim, action_dim, max_action_val):
        self.actor = Actor(state_dim, action_dim, max_action_val).to(device)
        self.actor_target = Actor(state_dim, action_dim, max_action_val).to(device)
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr_actor)

        self.critic_1 = Critic(state_dim, action_dim).to(device) # Critic holds Q1 and Q2 layers
        self.critic_1_target = Critic(state_dim, action_dim).to(device)
        self.critic_1_target.load_state_dict(self.critic_1.state_dict())
        self.critic_1_optimizer = optim.Adam(self.critic_1.parameters(), lr=lr_critic)

        # TD3 uses two critics conceptually, but implementation can share layers
        # If Critic class above defines separate layers for Q1/Q2, they are optimized together here.
        # If you intended fully separate critics, you'd need two Critic instances.

        self.max_action_val = max_action_val # Max value for action scaling
        self.action_dim = action_dim

    def select_action(self, state, noise=0.0): # Add noise parameter for exploration
        # state is numpy array
        state_tensor = torch.FloatTensor(state.reshape(1, -1)).to(device)
        # Actor outputs [-1, 1]
        action_unscaled = self.actor(state_tensor)
        # Add noise in the scaled range for exploration
        noise_tensor = torch.normal(0, self.max_action_val * noise, size=(1, self.action_dim)).to(device)
        # Scale actor output, add noise, then clip to physical limits
        action_scaled_noisy = (action_unscaled * self.max_action_val + noise_tensor).clamp(-self.max_action_val, self.max_action_val)
        return action_scaled_noisy.cpu().data.numpy().flatten()

    def update(self, replay_buffer, batch_size, gamma, polyak, policy_noise, noise_clip, policy_delay, iterations):

        if len(replay_buffer) < batch_size:
            return # Not enough samples yet

        # Sample replay buffer
        state_np, action_np, reward_np, next_state_np, done_np = replay_buffer.sample(batch_size)

        # Convert numpy arrays to PyTorch tensors
        state = torch.FloatTensor(state_np).to(device)
        action_scaled = torch.FloatTensor(action_np).to(device) # Action stored IS the scaled one sent to env
        reward = torch.FloatTensor(reward_np).reshape((batch_size, 1)).to(device)
        next_state = torch.FloatTensor(next_state_np).to(device)
        done = torch.FloatTensor(done_np).reshape((batch_size, 1)).to(device) # 0.0 or 1.0

        # --- Critic Update ---
        with torch.no_grad():
            # Select target action according to target policy + clipped noise
            noise = (torch.randn_like(action_scaled) * policy_noise).clamp(-noise_clip, noise_clip)
            # Target actor outputs [-1, 1], scale noise relative to max_action?
            # Let's assume policy_noise and noise_clip are defined in the [-1, 1] space
            # matching the raw actor output.
            next_action_unscaled = self.actor_target(next_state)
            next_action_unscaled_noisy = (next_action_unscaled + noise).clamp(-1.0, 1.0) # Clip raw action

            # Compute target Q value (Clipped Double-Q)
            # Critic expects action in [-1, 1] range
            target_Q1, target_Q2 = self.critic_1_target(next_state, next_action_unscaled_noisy)
            target_Q = torch.min(target_Q1, target_Q2)
            target_Q = reward + (1.0 - done) * gamma * target_Q # done is 0.0 or 1.0

        # Get current Q estimates
        # CRITICAL: Rescale the action from the buffer back to [-1, 1] for the critic input
        action_unscaled_buffer = action_scaled / self.max_action_val
        current_Q1, current_Q2 = self.critic_1(state, action_unscaled_buffer)

        # Compute critic loss
        # Use the combined forward pass which returns Q1, Q2
        critic_loss = F.mse_loss(current_Q1, target_Q) + F.mse_loss(current_Q2, target_Q)

        # Optimize the critic
        self.critic_1_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_1_optimizer.step()

        # --- Delayed Actor Update ---
        if iterations % policy_delay == 0:
            # Actor output is [-1, 1]
            actor_actions_unscaled = self.actor(state)
            # Actor loss uses Q1 from the *non-target* critic
            # Pass the unscaled actor action to the critic's Q1 method
            actor_loss = -self.critic_1.Q1(state, actor_actions_unscaled).mean()

            # Optimize the actor
            self.actor_optimizer.zero_grad()
            actor_loss.backward()
            self.actor_optimizer.step()

            # --- Update Target Networks (Polyak Averaging) ---
            for param, target_param in zip(self.critic_1.parameters(), self.critic_1_target.parameters()):
                 target_param.data.copy_(polyak * target_param.data + (1 - polyak) * param.data)

            for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
                 target_param.data.copy_(polyak * target_param.data + (1 - polyak) * param.data)

    def save(self, filename):
        # Save actor and critic models
        # Note: Saving TD3 state directly might be better if using schedulers etc.
        torch.save(self.actor.state_dict(), filename + "_actor.pth")
        torch.save(self.critic_1.state_dict(), filename + "_critic.pth") # Save critic Q1/Q2 params
        print(f"Saved Actor and Critic models to {filename}_actor/critic.pth")

    def load(self, filename):
        # Load actor and critic models
        self.actor.load_state_dict(torch.load(filename + "_actor.pth", map_location=device))
        self.actor_target.load_state_dict(self.actor.state_dict()) # Update target too
        self.critic_1.load_state_dict(torch.load(filename + "_critic.pth", map_location=device))
        self.critic_1_target.load_state_dict(self.critic_1.state_dict()) # Update target too
        print(f"Loaded Actor and Critic models from {filename}_actor/critic.pth")

# --- End of Class Definitions ---

In [18]:
# ================================================
# Main Training Script
# ================================================
if __name__ == "__main__":

    # --- Configuration ---
    matlab_env_creator_func = 'createAttitudeControlEnv' # Function in .m file that returns env object
    matlab_env_path = '.' # Directory containing the .m file

    # TD3 Hyperparameters
    lr_actor = 1e-4        # Learning rate for actor
    lr_critic = 1e-3       # Learning rate for critic
    gamma = 0.99           # Discount factor
    polyak = 0.005         # Target network update rate (tau)
    policy_noise = 0.2     # Std dev for target policy smoothing noise (applied to action in range [-1,1])
    noise_clip = 0.5       # Limit for target policy smoothing noise (applied to action in range [-1,1])
    policy_delay = 2       # Actor update frequency (steps)
    expl_noise = 0.1       # Std dev for exploration noise (relative to max_action)

    # Training loop parameters
    start_timesteps = 1000 # Steps of random actions before training starts
    max_timesteps = 1e5    # Total training timesteps (adjust as needed)
    eval_freq = 5000       # How often to evaluate policy (in timesteps)
    save_model_freq = 50000 # How often to save the model
    batch_size = 100       # Batch size for TD3 update

    # Environment parameters (MUST MATCH VALUES IN CreateAttitudeEnv.m for consistency)
    state_dim = 7
    action_dim = 3
    max_action = 0.5       # Corresponds to ActionInfo limits in MATLAB env

    # ** Define Python-side parameter dictionary **
    params_py = {
        'maxEpisodeSteps': 500,   # Max agent steps (MUST match MATLAB)
        'maxAngVelo': 0.5,        # Max angular velocity limit (rad/s)
    }
    # ** Define a simple variable for the fallback max steps **
    DEFAULT_MAX_STEPS = params_py['maxEpisodeSteps']
    
    # Setup
    seed = 0
    save_model_dir = "./matlab_td3_models"
    os.makedirs(save_model_dir, exist_ok=True)
    file_name = f"TD3_MATLAB_Env_{seed}" # Base name for saved models

    # Set random seeds
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

In [20]:

    # --- Start MATLAB Engine ---
    print("Starting MATLAB engine...")
    eng = None # Initialize
    try:
        eng = matlab.engine.start_matlab("-nodisplay")
        print("MATLAB engine started successfully.")
        eng.addpath(matlab_env_path, nargout=0)
        print(f"Added '{matlab_env_path}' to MATLAB path.")
    except Exception as e:
        print(f"FATAL: Error starting MATLAB engine or adding path: {e}")
        sys.exit(1)

    try:
        # --- Instantiate MATLAB Environment ---
        print(f"Instantiating MATLAB environment by calling '{matlab_env_creator_func}'...")
        matlab_env = eng.eval(matlab_env_creator_func, nargout=1)
        print("MATLAB environment handle obtained.")

        # --- Initialize Policy and Replay Buffer ---
        policy = TD3(lr_actor, lr_critic, state_dim, action_dim, max_action)
        replay_buffer = ReplayBuffer(max_size=int(1e6))
        print("Python TD3 policy and Replay Buffer initialized.")

        # --- Training Loop ---
        print(f"Starting training loop for {max_timesteps} timesteps...")
        total_timesteps = 0
        episode_num = 0
        evaluations = []

        while total_timesteps < max_timesteps:
            # --- Reset episode ---
            episode_reward = 0
            episode_timesteps = 0
            done = False

            matlab_obs, logged_signals = eng.reset(matlab_env, nargout=2)
            obs = np.array(matlab_obs).astype(np.float32).flatten()

            while True: # Inner loop for steps within an episode
                episode_timesteps += 1
                total_timesteps += 1

                # --- Select Action ---
                if total_timesteps < start_timesteps:
                    action = np.random.uniform(-max_action, max_action, action_dim)
                else:
                    # Pass the noise standard deviation directly
                    action = policy.select_action(obs, noise_std=max_action * expl_noise)

                # --- Step MATLAB Environment ---
                action_ml = matlab.double(action.reshape(-1, 1))
                try:
                    next_matlab_obs, reward_ml, done_ml, logged_signals = eng.step(matlab_env, action_ml, nargout=4)
                except Exception as e_step:
                    print(f"\nError during MATLAB eng.step at T={total_timesteps}: {e_step}")
                    done = True
                    break

                # --- Convert results ---
                next_obs = np.array(next_matlab_obs).astype(np.float32).flatten()
                reward = float(reward_ml)
                done_bool = bool(done_ml)

                # --- Refine 'done' flag for Replay Buffer ---
                max_steps_reached = False
                # Use the DEFAULT_MAX_STEPS defined earlier for fallbacks
                max_steps_default = DEFAULT_MAX_STEPS
                if isinstance(logged_signals, dict):
                    try:
                        current_matlab_steps = int(logged_signals.get('nSteps', -1))
                        # Try to get max steps from MATLAB, fallback to default
                        max_episode_matlab_steps = int(logged_signals.get('MaxSteps', max_steps_default))

                        if current_matlab_steps >= 0:
                             max_steps_reached = bool(current_matlab_steps >= max_episode_matlab_steps)
                        else:
                             # Estimate based on Python step counter if 'nSteps' missing
                             max_steps_reached = done_bool and episode_timesteps >= max_steps_default
                    except (ValueError, KeyError, TypeError) as e_log:
                        print(f"Warning: Error processing logged_signals at T={total_timesteps}: {e_log}. Estimating truncation.")
                        max_steps_reached = done_bool and episode_timesteps >= max_steps_default
                else:
                    print(f"Warning: logged_signals not dict ({type(logged_signals)}) at T={total_timesteps}. Estimating truncation.")
                    max_steps_reached = done_bool and episode_timesteps >= max_steps_default

                real_done_for_buffer = 1.0 if (done_bool and not max_steps_reached) else 0.0

                # --- Store transition ---
                replay_buffer.add((obs, action, reward, next_obs, real_done_for_buffer))

                # Update state
                obs = next_obs
                episode_reward += reward

                # --- Train Policy ---
                if total_timesteps >= start_timesteps:
                    # Pass policy_noise_std instead of policy_noise variable name if different
                    policy.update(replay_buffer, batch_size, gamma, polyak, policy_noise_std, noise_clip, policy_delay)

                # --- Check episode end ---
                if done_bool:
                    break # Exit inner while loop

            # --- End of Episode ---
            print(f"Ep {episode_num + 1}: T={total_timesteps}, Steps={episode_timesteps}, Reward={episode_reward:.3f}")
            episode_num += 1

            # --- Periodic Evaluation ---
            if total_timesteps >= start_timesteps and total_timesteps // eval_freq > (total_timesteps - episode_timesteps) // eval_freq :
                print(f"\n--- Evaluating Policy at Timestep {total_timesteps} (Placeholder) ---")
                # Add evaluation logic here
                print(f"--- Evaluation Complete ---")

            # --- Periodic Model Saving ---
            if total_timesteps >= start_timesteps and total_timesteps // save_model_freq > (total_timesteps - episode_timesteps) // save_model_freq:
                 policy.save(f"{save_model_dir}/{file_name}_ts{total_timesteps}")

            # Check outer loop condition
            if total_timesteps >= max_timesteps:
                print(f"Reached maximum total timesteps ({max_timesteps}). Stopping training.")
                break

    except KeyboardInterrupt:
        print("\nTraining interrupted by user.")
    except Exception as e:
        print(f"\n!!!!!!!! An error occurred during training !!!!!!!!")
        print(e)
        traceback.print_exc() # Print detailed traceback

    finally:
        # --- Stop MATLAB Engine ---
        if eng: # Check if engine started successfully
            print("\nStopping MATLAB engine...")
            try:
                eng.quit()
                print("MATLAB engine stopped.")
            except Exception as e_quit:
                print(f"Error stopping MATLAB engine: {e_quit}")

    print("Training finished.")

Starting MATLAB engine...
MATLAB engine started successfully.
Added '.' to MATLAB path.
Instantiating MATLAB environment by calling 'createAttitudeControlEnv'...
MATLAB environment handle obtained.
Python TD3 policy and Replay Buffer initialized.
Starting training loop for 100000.0 timesteps...
Ep 1: T=500, Steps=500, Reward=-28.400

!!!!!!!! An error occurred during training !!!!!!!!
TD3.select_action() got an unexpected keyword argument 'noise_std'

Stopping MATLAB engine...
MATLAB engine stopped.
Training finished.


Traceback (most recent call last):
  File "C:\Users\srira\AppData\Local\Temp\ipykernel_51084\516060712.py", line 48, in <module>
    action = policy.select_action(obs, noise_std=max_action * expl_noise)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: TD3.select_action() got an unexpected keyword argument 'noise_std'


In [14]:

    # --- Start MATLAB Engine ---
    print("Starting MATLAB engine...")
    eng = None # Initialize
    try:
        # Start engine (specify '-nodisplay' if you don't need MATLAB UI)
        eng = matlab.engine.start_matlab() # Or matlab.engine.start_matlab("-nodisplay")
        print("MATLAB engine started.")
        # Add path to the environment .m file
        eng.addpath(matlab_env_path, nargout=0)
        print(f"Added '{matlab_env_path}' to MATLAB path.")
    except Exception as e:
        print(f"FATAL: Error starting MATLAB engine or adding path: {e}")
        sys.exit(1)

    try:
        # --- Instantiate MATLAB Environment ---
        print(f"Instantiating MATLAB environment by calling '{matlab_env_creator_func}'...")
        # nargout=1 because createAttitudeControlEnv returns one variable (the env handle)
        matlab_env = eng.eval(matlab_env_creator_func, nargout=1)
        print("MATLAB environment handle obtained.")

        # --- Initialize Policy and Replay Buffer ---
        policy = TD3(lr_actor, lr_critic, state_dim, action_dim, max_action)
        replay_buffer = ReplayBuffer(max_size=1e6) # Example buffer size
        print("Python TD3 policy and Replay Buffer initialized.")

        # --- Training Loop ---
        print(f"Starting training loop for {int(max_timesteps)} timesteps...")
        total_timesteps = 0
        episode_num = 0
        evaluations = [] # To store evaluation results

        while total_timesteps < max_timesteps:
            # --- Reset episode ---
            episode_reward = 0
            episode_timesteps = 0
            done = False

            print(f"Starting Episode {episode_num + 1}...")
            matlab_obs, logged_signals = eng.reset(matlab_env, nargout=2)
            obs = np.array(matlab_obs).astype(np.float32).flatten()

            while not done:
                episode_timesteps += 1
                total_timesteps += 1

                # --- Select Action ---
                if total_timesteps < start_timesteps:
                    # Take random action scaled to environment limits
                    action = np.random.uniform(-max_action, max_action, action_dim)
                else:
                    # Select action according to policy, add exploration noise
                    # select_action includes scaling and returns numpy
                    action = policy.select_action(obs, noise=expl_noise) # Pass exploration noise factor

                # --- Step MATLAB Environment ---
                action_ml = matlab.double(action.reshape(-1, 1)) # Convert to MATLAB column vector
                # Call step method on the environment handle
                next_matlab_obs, reward_ml, done_ml, logged_signals = eng.step(matlab_env, action_ml, nargout=4)
                
                # Convert MATLAB results to Python types
                next_obs = np.array(next_matlab_obs).astype(np.float32).flatten()
                reward = float(reward_ml)
                done_bool = bool(done_ml) # Overall done flag from MATLAB

                # Refine done flag for replay buffer (distinguish termination/truncation)
                # Access fields directly from the logged_signals dict
                max_steps_reached = False
                is_velocity_limit = False
                try:
                    # logged_signals returned from MATLAB Engine is usually a dict
                    if isinstance(logged_signals, dict):
                        current_matlab_steps = int(logged_signals.get('nSteps', -1)) # Use dict.get for safety
                        max_episode_matlab_steps = int(logged_signals.get('MaxSteps', params_py['maxEpisodeSteps'])) # Use Python param as default
                        max_steps_reached = bool(current_matlab_steps >= max_episode_matlab_steps)
                        # You could also check specific flags if you added them:
                        # is_velocity_limit = bool(logged_signals.get('IsDoneVelocity', False))
                    else:
                        # Fallback if logged_signals is not a dict (shouldn't happen)
                        print("Warning: logged_signals not a dictionary.")
                        current_matlab_steps = episode_timesteps + 1 # Estimate
                        max_steps_reached = done_bool and current_matlab_steps >= params_py['maxEpisodeSteps'] # Use Python param

                except Exception as e_log:
                    print(f"Warning: Error processing logged_signals: {e_log}")
                    current_matlab_steps = episode_timesteps + 1 # Estimate
                    max_steps_reached = done_bool and current_matlab_steps >= params_py['maxEpisodeSteps'] # Use Python param

                # Determine the 'done' flag for the replay buffer (1.0 = termination, 0.0 = truncation/ongoing)
                real_done_for_buffer = 1.0 if (done_bool and not max_steps_reached) else 0.0

                # Store data in replay buffer
                replay_buffer.add((obs, action, reward, next_obs, real_done_for_buffer))

                # Update state
                obs = next_obs
                episode_reward += reward

                # Check if MATLAB step indicated episode end
                if done_bool:
                    print(f"Episode {episode_num + 1} finished after {episode_timesteps} steps. Reward: {episode_reward:.3f}. Reason: {'Velocity/Steps Limit' if done_bool else 'Unknown'}")
                    break # Exit inner while loop

            # --- End of Episode ---
            episode_num += 1

            # --- Evaluate Policy Periodically ---
            if total_timesteps // eval_freq > (total_timesteps - episode_timesteps) // eval_freq:
                print(f"\n--- Evaluating at Timestep {total_timesteps} ---")
                # avg_reward_eval = evaluate_policy(policy, matlab_env_creator_func, matlab_env_path, seed) # Placeholder
                # print(f"Evaluation Average Reward: {avg_reward_eval:.3f}")
                # evaluations.append(avg_reward_eval)
                # np.save(f"{save_model_dir}/evaluations_{file_name}.npy", evaluations)
                print(f"--- Evaluation Complete ---")

            # --- Save Model Periodically ---
            if total_timesteps // save_model_freq > (total_timesteps - episode_timesteps) // save_model_freq:
                policy.save(f"{save_model_dir}/{file_name}_ts{total_timesteps}")


    except KeyboardInterrupt:
        print("\nTraining interrupted by user.")
    except Exception as e:
        print(f"\n!!!!!!!! An error occurred during training !!!!!!!!")
        print(e)
        import traceback
        traceback.print_exc()

    finally:
        # --- Stop MATLAB Engine ---
        if eng: # Check if engine started successfully
            print("\nStopping MATLAB engine...")
            try:
                eng.quit()
                print("MATLAB engine stopped.")
            except Exception as e_quit:
                print(f"Error stopping MATLAB engine: {e_quit}")

    print("Training finished.")

Starting MATLAB engine...
MATLAB engine started.
Added '.' to MATLAB path.
Instantiating MATLAB environment by calling 'createAttitudeControlEnv'...
MATLAB environment handle obtained.
Python TD3 policy and Replay Buffer initialized.
Starting training loop for 200000 timesteps...
Starting Episode 1...

!!!!!!!! An error occurred during training !!!!!!!!
name 'params_py' is not defined

Stopping MATLAB engine...
MATLAB engine stopped.
Training finished.


Traceback (most recent call last):
  File "C:\Users\srira\AppData\Local\Temp\ipykernel_51084\3133956687.py", line 74, in <module>
    max_episode_matlab_steps = int(logged_signals.get('MaxSteps', params_py['maxEpisodeSteps'])) # Use Python param as default
                                                                  ^^^^^^^^^
NameError: name 'params_py' is not defined

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\srira\AppData\Local\Temp\ipykernel_51084\3133956687.py", line 87, in <module>
    max_steps_reached = done_bool and current_matlab_steps >= params_py['maxEpisodeSteps'] # Use Python param
                                                              ^^^^^^^^^
NameError: name 'params_py' is not defined


In [16]:
fname = '20250415_run1_cont.dat'
torch.save(policy.actor.state_dict(), fname)

In [17]:
fname = '20250415_run1_cont_critic1.dat'
torch.save(policy.critic_1.state_dict(), fname)

fname = '20250415_run1_cont_critic2.dat'
torch.save(policy.critic_2.state_dict(), fname)

In [17]:
policy.actor.load_state_dict(torch.load('20200515_run3_cont_refine.dat'))

FileNotFoundError: [Errno 2] No such file or directory: '20200515_run3_cont_refine.dat'

In [18]:
done = False

obs = env.reset()

reward_list = []
obs_list = []

while not done:
    action = policy.select_action(obs)

    action = action# + np.random.normal(0, exploration_noise, size=env.action_space.shape[0])
    action = action.clip(env.action_space.low, env.action_space.high)

    # no noise--select optimal
    #action = action + np.random.normal(0, exploration_noise, size=env.action_space.shape[0])
    obs, reward, done, _ = env.step(action)
    orn = np.array([obs[1], obs[2], obs[3], obs[0]])
    reward_list.append(reward)
    obs_list.append(obs)

In [19]:
x = np.linspace(0, 500, len(obs_list))
q4 = [i[0] for i in obs_list]
q1 = [i[1] for i in obs_list]
q2 = [i[2] for i in obs_list]
q3 = [i[3] for i in obs_list]

w1 = [i[8] for i in obs_list]
w2 = [i[9] for i in obs_list]
w3 = [i[10] for i in obs_list]

In [20]:
import time
import plotly.graph_objects as go
import pandas as pd
import plotly.express as px

In [21]:
2*np.arccos(np.max(q4))*(180/np.pi)

np.float64(112.36376144920521)

In [25]:
fig = go.Figure()

fig.add_trace(go.Scatter(x=x, y=q4,
                    mode='lines',
                    name='q4'))
fig.add_trace(go.Scatter(x=x, y=q1,
                    mode='lines',
                    name='q1'))
fig.add_trace(go.Scatter(x=x, y=q2,
                    mode='lines',
                    name='q2'))
fig.add_trace(go.Scatter(x=x, y=q3,
                    mode='lines',
                    name='q3'))

fig.add_trace(go.Scatter(x=x, y=reward_list,
                    mode='lines',
                    name='reward'))


fig.update_layout(
    title="Orientation over Time",
    xaxis_title="timestep",
    yaxis_title="N.D.",
    font=dict(
        family="Arial",
        size=18,
        color="#000000"
    )
)

fig.show()

In [24]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=x, y=w1,
                    mode='lines',
                    name='wx'))
fig.add_trace(go.Scatter(x=x, y=w2,
                    mode='lines',
                    name='wy'))
fig.add_trace(go.Scatter(x=x, y=w3,
                    mode='lines',
                    name='wz'))

fig.update_layout(
    title="Angular Velocity over Time",
    xaxis_title="timestep",
    yaxis_title="Ang. Velo. (rad/s)",
    font=dict(
        family="Arial",
        size=18,
        color="#000000"
    )
)

fig.show()