In [None]:
# Install Colab
!pip install -q condacolab
import condacolab
condacolab.install()

In [None]:
# Install Stable Baselines, Gym, and Pytao
!apt-get install ffmpeg freeglut3-dev xvfb
!pip install "stable-baselines3[extra]>=2.0.0a4"
!pip install gymnasium

In [None]:
!conda install -c conda-forge pytao
!conda install -c conda-forge bmad

In [None]:
# Import libraries
import environ as en
import gymnasium as gym
import matplotlib.pyplot as plt
import numpy as np
from stable_baselines3 import SAC
#from stable_baselines3 import PPO
#from stable_baselines3 import TD3
from stable_baselines3.sac.policies import MlpPolicy
#from stable_baselines3.ppo.policies import MlpPolicy
#from stable_baselines3.td3.policies import MlpPolicy
import pytao
from tqdm import tqdm

In [None]:
# Num of updates = max_steps_per_episode / num_steps_in_model

# Hyperparameters (Default)
model_type = "SAC" #############################################################
max_steps_per_episode = 128
num_episodes = 10
steps_to_train = 128
num_steps_in_model = 64 # PPO Hyp
train_freq = 1 # SAC #Hyp
gradient_steps = 1 # SAC  #Hyp
learning_rate = 0.0001 #Hyp
gamma = 0.99 # SAC #Hyp
tau = 0.005 # SAC  #Hyp
gae_lambda = 0.95 # PPO Hyp
batch_size = 64   #Hyp
determ = True
my_convergence_val = 0.005 ##################################################################################
my_bad_convergence_val = 0.01 # Hyp
min_step_value = 5 # Like Training convergence
my_reward_alpha = 1.0 # Hyp
my_reward_beta = 1.0 # Hyp
my_reward_fun = 4 ####################################################################
what_to_do = "y_only"

In [None]:
# Create an enviroment for the model

if what_to_do == "x_and_y":
  env = en.LTBEnv("tao.init",
                  {"orbit.x": "MW", "orbit.y": "MW"},
                  ["correctors_x", "correctors_y"],
                  -0.01,
                  0.01,
                  max_steps = max_steps_per_episode,
                  convergence_val = my_convergence_val,
                  bad_convergence_val = my_bad_convergence_val,
                  reward_alpha = my_reward_alpha,
                  reward_beta = my_reward_beta,
                  reward_fun = my_reward_fun)
elif what_to_do == "x_only":
  env = en.LTBEnv("tao.init",
                  {"orbit.x": "MW"},
                  ["correctors_x"],
                  -0.01,
                  0.01,
                  max_steps = max_steps_per_episode,
                  convergence_val = my_convergence_val,
                  bad_convergence_val = my_bad_convergence_val,
                  reward_alpha = my_reward_alpha,
                  reward_beta = my_reward_beta,
                  reward_fun = my_reward_fun)
elif what_to_do == "y_only":
  env = en.LTBEnv("tao.init",
                  {"orbit.y": "MW"},
                  ["correctors_y"],
                  -0.01,
                  0.01,
                  max_steps = max_steps_per_episode,
                  convergence_val = my_convergence_val,
                  bad_convergence_val = my_bad_convergence_val,
                  reward_alpha = my_reward_alpha,
                  reward_beta = my_reward_beta,
                  reward_fun = my_reward_fun)

# Make the model

## PPO
if model_type == "PPO":
  model = PPO(MlpPolicy, env, verbose=0, n_steps = num_steps_in_model, learning_rate = learning_rate, batch_size = batch_size, gamma = gamma, gae_lambda = gae_lambda)
  random_model = PPO(MlpPolicy, env, verbose=0, n_steps = num_steps_in_model, learning_rate = learning_rate, batch_size = batch_size, gamma = gamma, gae_lambda = gae_lambda)

## SAC
if model_type == "SAC":
  model = SAC(MlpPolicy, env, verbose=0, train_freq = train_freq, gradient_steps = gradient_steps, learning_rate = learning_rate, batch_size = batch_size, gamma = gamma, tau = tau)
  random_model = SAC(MlpPolicy, env, verbose=0, train_freq = train_freq, gradient_steps = gradient_steps, learning_rate = learning_rate, batch_size = batch_size, gamma = gamma, tau = tau)

## TD3
if model_type == "TD3":
  model = TD3(MlpPolicy, env, verbose=0, train_freq = train_freq, gradient_steps = gradient_steps, learning_rate = learning_rate, batch_size = batch_size, gamma = gamma, tau = tau)
  random_model = TD3(MlpPolicy, env, verbose=0, train_freq = train_freq, gradient_steps = gradient_steps, learning_rate = learning_rate, batch_size = batch_size, gamma = gamma, tau = tau)


# Train the model
steps_per_episode = []
max_state_per_episode = []
average_rewards = []
final_action_list = []

for episode in tqdm(range(num_episodes)):
  # Train the model
  model.learn(total_timesteps = steps_to_train, progress_bar = True)

  state, _ = env.reset()
  state = state + np.random.uniform(-0.001, 0.001, size = state.shape)
  step = 0

  # Replay Buffer
  states = [state]
  action_list = []
  rewards = []
  dones = []
  truncateds = []

  # Test the convergence of the model
  while True:
      action, _ = model.predict(state, deterministic=determ)
      new_state, reward, done, truncated, _ = env.step(action)
      states.append(new_state), rewards.append(reward)
      dones.append(done), truncateds.append(truncated), action_list.append(action)

      step += 1

      state = new_state
      if done:
        break

  print(step)
  max_state_at_end = np.max(np.abs(states[-1]))
  avg_reward = np.mean(rewards)
  average_rewards.append(avg_reward)
  steps_per_episode.append(step)
  max_state_per_episode.append(max_state_at_end)
  final_action_list.append(action_list[-1])
  print(max_state_at_end)

  # Converged finally
  if max_state_at_end <= my_convergence_val:
    break

episode_array = np.arange(start = 1, stop = episode+2, step = 1)

In [None]:
for i, step_val_epis in enumerate(steps_per_episode):
  print(f"Max Steps To Converge for Episode {i+1} is {step_val_epis}")

plt.plot(episode_array, steps_per_episode)
plt.xlabel("Episode")
plt.ylabel("Number of steps taken")
plt.title("Convergence Rate Plot")
plt.show()

In [None]:
for i, final_action in enumerate(final_action_list):
  print(f"Final Corrector Values for Episode {i+1} are {final_action}")
  print(f"Final Max Orbit for Episode {i+1} are {max_state_per_episode[i]}")

print(f"\nMax Orbit for Initial State is {np.max(np.abs(env.initial_state))}")
print(f"Biggest Difference of Max Orbit is {np.max(np.abs(env.initial_state)) - np.min(max_state_per_episode)}")

plt.plot(episode_array, max_state_per_episode)
plt.xlabel("Episode")
plt.ylabel("Max Orbit Value")
plt.title("Max Orbit Value at end of Episode")
plt.show()

In [None]:
### Only use to combine actions
x_action = [-0.00020242,  0.00029166, -0.00033901,  0.00074605, -0.00028885,  0.00035161]
y_action = [ 1.73087597e-03,  1.42020226e-03, -6.98799491e-04, -1.31446242e-03,
 -1.12302303e-04, -6.85226917e-05, -3.06529999e-04]

combined_action = x_action + y_action
print(combined_action)
tao = env.tao

en.set_new_variable_values(tao, ["correctors_x"], x_action)
en.set_new_variable_values(tao, ["correctors_y"], y_action)
print(f"Max Orbit: {np.max(np.abs(en.get_data_values(tao, {'orbit.x': 'MW', 'orbit

In [None]:
for i, avg_rew_epi in enumerate(average_rewards):
  print(f"Average Reward for Episode {i+1} is {avg_rew_epi}")

plt.plot(episode_array, average_rewards)
plt.xlabel("Episode")
plt.ylabel("Average Reward")
plt.title("Reward Plot")
plt.show()