In [None]:
import sys
import Util
sys.path.append('..')
import Env_shy
import numpy as np
from gymnasium.wrappers import TimeLimit 
import matplotlib.pyplot as plt

sys.path.append('../9_Deep_Deterministic_Policy_Gradient_DDPG')
import DDPG

In [None]:
def evaluate(model, total_time, env, n):
    env.reset()
    cumulated_punish = 0
    for i in range(total_time):
        env.state = np.random.uniform(-1, 1, size=(2, 1))
        state = env.state
        ideal_action = - env.K @ state
        action = model.predict(state, deterministic=True)
        cumulated_punish += abs(action[0] - ideal_action).sum()
    return cumulated_punish/total_time

def evaluate_new(actor, total_time, env, n):
    env.reset()
    cumulated_punish = 0
    for i in range(total_time):
        env.state = np.random.uniform(-1, 1, size=(2, 1))
        state = env.state
        ideal_action = -env.K @ state
        action = actor.choose_action(state)
        cumulated_punish += abs(action[0] - ideal_action).sum()
    return cumulated_punish/total_time

def generate_state_transition_matix(n, m):
    is_A_stable = True
    while(is_A_stable):
        A = np.random.rand(n, n)
        eigenvalues = np.linalg.eigvals(A)
        is_A_stable = all(np.abs(eigenvalues) < 1)
    B = np.random.rand(n, m)
    return A, B


Demo how to use different envs run on same model

In [None]:
record = []
cumulated_punish_record = []
n = 2
m = 2
log_file = "./diff_env_a2c/"
total_timesteps = 50000

for i in range(1):

    is_A_stable = True
    while(is_A_stable):
        A = np.random.rand(n, n)
        eigenvalues = np.linalg.eigvals(A)
        is_A_stable = all(np.abs(eigenvalues) < 1)
    B = np.random.rand(n, m)

    record.append((A, B))
    env = TimeLimit(Env_shy.CustomEnv(A, B, n, m, reward_type="Shy", test=False), max_episode_steps=1)
    env.reset()
    DDPG.train(env=env)

    cumulated_punish_record.append(evaluate(DDPG.actor, 1000, env, n))


In [None]:
record = []
cumulated_punish_record = []
n = 2
m = 2
log_file = "./diff_env_a2c/"
total_timesteps = 50000

for i in range(1):

    is_A_stable = True
    while(is_A_stable):
        A = np.random.rand(n, n)
        eigenvalues = np.linalg.eigvals(A)
        is_A_stable = all(np.abs(eigenvalues) < 1)
    B = np.random.rand(n, m)

    record.append((A, B))
    env = TimeLimit(Env_shy.CustomEnv(A, B, n, m, reward_type="Shy", test=False), max_episode_steps=1)
    env.reset()
    model = sb3.A2C("MlpPolicy", env, verbose=1, tensorboard_log=log_file)
    model.learn(total_timesteps=total_timesteps)

    cumulated_punish_record.append(evaluate(model, 1000, env, n))

Demo how to use same env run on same model for multiple times

In [None]:
cumulated_punish_record = []
n = 2
m = 2
log_file = "./diff_env_a2c/"
total_timesteps = 50000

is_A_stable = True
while(is_A_stable):
    A = np.random.rand(n, n)
    eigenvalues = np.linalg.eigvals(A)
    is_A_stable = all(np.abs(eigenvalues) < 1)
B = np.random.rand(n, m)
env = TimeLimit(Env_shy.CustomEnv(A, B, n, m, reward_type="Shy", test=False), max_episode_steps=1)

for i in range(1):
    env.reset()
    model = sb3.A2C("MlpPolicy", env, verbose=1, tensorboard_log=log_file)
    model.learn(total_timesteps=total_timesteps)

    cumulated_punish_record.append(evaluate(model, 1000, env, n))

Demo how to run different max_episode_steps on same model

In [None]:
n = 2
m = 2
log_file = "./diff_env_a2c/"
total_timesteps = 50000

is_A_stable = True
while(is_A_stable):
    A = np.random.rand(n, n)
    eigenvalues = np.linalg.eigvals(A)
    is_A_stable = all(np.abs(eigenvalues) < 1)
B = np.random.rand(n, m)

cumulated_punish_record = []
for i in [1, 2, 5, 10, 20, 50, 100]:
    env = TimeLimit(Env_shy.CustomEnv(A, B, n, m, reward_type="Shy"), max_episode_steps=i)
    env.reset()
    model = sb3.A2C("MlpPolicy", env, verbose=1, tensorboard_log=log_file)
    model.learn(total_timesteps=total_timesteps)
    
    cumulated_punish_record.append(evaluate(model, 3000, env))

Demo how to use specific A, B to create  env and train on same model for multiple times

In [None]:
A = np.array([[0.36315674, 0.06881483], [0.83578281, 0.42088278]])
B = np.array([[0.96880357, 0.72537713], [0.14729658, 0.63892631]]) 
cumulated_punish_record = []
n = 2
m = 2
log_file = "./diff_env_a2c/"
total_timesteps = 50000

for i in range(1):
    env = TimeLimit(Env_shy.CustomEnv(A, B, n, m, reward_type="Shy"), max_episode_steps=1)
    env.reset()
    model = sb3.A2C("MlpPolicy", env, verbose=1, tensorboard_log=log_file)
    model.learn(total_timesteps=total_timesteps)

    cumulated_punish_record.append(evaluate(model, 1000, env, n))

Demo how to compare ideal_action with action given by trained model

In [None]:
env.reset()
cumulated_punish = 0
total_timesteps = 3000

for i in range(total_timesteps):
    env.state = np.random.rand(2, 1)
    state = env.state
    ideal_action = - env.K @ state
    action = model.predict(state, deterministic=True)
    cumulated_punish += abs(action[0] - ideal_action).sum()
cumulated_punish

Demo how to compare ideal_action and X_star_k_plus_1 with trained model

In [None]:
total_timesteps = 10000

for i in range(total_timesteps):
    env.state = np.random.rand(2, 1)
    state = env.state
    ideal_action = - env.K @ state
    action = model.predict(state, deterministic=True)[0]
    X_star_k_plus_1 = (env.Acl @ state).astype(np.float32)
    X_k_plus_1 = (A @ state + B @ action).astype(np.float32)
    print(action, ideal_action, X_k_plus_1, X_star_k_plus_1)

Demo how to compare different algorithms on same env

In [None]:
n = 2
m = 2
log_file = "./diff_env_a2c/"
total_timesteps = 50000

is_A_stable = True
while(is_A_stable):
    A = np.random.rand(n, n)
    eigenvalues = np.linalg.eigvals(A)
    is_A_stable = all(np.abs(eigenvalues) < 1)
B = np.random.rand(n, m)
env = TimeLimit(Env_shy.CustomEnv(A, B, n, m, reward_type="Shy", test=False), max_episode_steps=1)

algorithms = ['A2C', 'DDPG', 'PPO', 'SAC']
record_test = []
for algorithm in algorithms:
    if algorithm == 'A2C':
        model = sb3.A2C("MlpPolicy", env, verbose=1, tensorboard_log=log_file)  
    elif algorithm == 'DDPG':
        model = sb3.DDPG("MlpPolicy", env, verbose=1, tensorboard_log=log_file)
    elif algorithm == 'PPO':
        model = sb3.PPO("MlpPolicy", env, verbose=1, tensorboard_log=log_file)
    elif algorithm == 'SAC':
        model = sb3.SAC("MlpPolicy", env, verbose=1, tensorboard_log=log_file)
    elif algorithm == 'TD3':
        model = sb3.TD3("MlpPolicy", env, verbose=1, tensorboard_log=log_file)
    else:
        print("error: ", algorithm)
    model.learn(total_timesteps=total_timesteps)

    record_test.append(evaluate(model, 3000, env, m))

Demo how to draw 3D graph(3D: x, y, reward) given A, B, Acl...

In [None]:
def calculateReward(X_star_k_plus_1, X_k_plus_1, P):
        punish = 0
        for i in range(n):
            for j in range(i, n):
                if i == j:
                    diff = pow(X_k_plus_1[i], 2) - pow(X_star_k_plus_1[i], 2)
                    punish += abs(P[i, j] * diff)
                else:
                    diff = X_k_plus_1[i] * X_k_plus_1[j] - X_star_k_plus_1[i] * X_star_k_plus_1[j]
                    punish += abs(2 * P[i, j] * diff)
        return punish 

In [None]:
x = np.linspace(-1, 1, 100)
y = np.linspace(-1, 1, 100)
X, Y = np.meshgrid(x, y)

A, B = generate_state_transition_matix(2, 2)
env = TimeLimit(Env_shy.CustomEnv(A, B, n, m, reward_type="Shy", test=False), max_episode_steps=1)

Z = np.zeros_like(X)
for i in range(X.shape[0]):
    for j in range(X.shape[1]):
        x_val, y_val = X[i, j], Y[i, j]
        action = np.array([[x_val], [y_val]])
        X_k_plus_1 = (A @ state + B @ action).astype(np.float32)
        X_star_k_plus_1 = np.ones_like(X_k_plus_1)
        z_val = calculateReward(X_k_plus_1, X_star_k_plus_1, env.P)
        Z[i, j] = z_val[0]

fig = plt.figure(dpi=100)
ax = plt.axes(projection='3d')
ax.plot_surface(X, Y, Z, cmap='viridis') 
ax.view_init(elev=30, azim=45)

ax.set_xlabel('a_1')
ax.set_ylabel('b_1')
ax.set_zlabel('potential-based reward')

plt.show()

Demo how to draw field given env

In [None]:
x = np.linspace(-1, 1, 100)
y = np.linspace(-1, 1, 100)
X, Y = np.meshgrid(x, y)
n = 2
m = 2

A, B = generate_state_transition_matix(2, 2)
env = TimeLimit(Env_shy.CustomEnv(A, B, n, m, reward_type="Shy", test=False), max_episode_steps=1)

num_points = 30
side_length = 1
colors = plt.cm.viridis(np.linspace(0, 1, num_points))
initial_points = Util.generate_X0_uniform(num_points, side_length)
fig, ax = plt.subplots()
x_values, y_values, U, V = Util.generate_field_grid(env.Acl, initial_points)
ax.quiver(x_values, y_values, U, V, color=colors[0], angles='xy',
            scale_units='xy', scale=3, width=.005)
ax.set_aspect('equal')
plt.show()

Demo how to draw field given trained model

In [None]:
x = np.linspace(-1, 1, 100)
y = np.linspace(-1, 1, 100)
X, Y = np.meshgrid(x, y)
n = 2
m = 2

A, B = generate_state_transition_matix(2, 2)
env = TimeLimit(Env_shy.CustomEnv(A, B, n, m, reward_type="Shy", test=False), max_episode_steps=1)
#train u model
model = sb3.A2C("MlpPolicy", env, verbose=1)
model.learn(10000)

num_points = 30
side_length = 1
colors = plt.cm.viridis(np.linspace(0, 1, num_points))
initial_points = Util.generate_X0_uniform(num_points, side_length)
fig, ax = plt.subplots()
x_values, y_values, U, V = Util.generate_field_grid_for_model(model, initial_points, env)
ax.quiver(x_values, y_values, U, V, color=colors[0], angles='xy',
            scale_units='xy', scale=3, width=.005)
ax.set_aspect('equal')
plt.show()

Demo how to draw trajectory given A

In [None]:
A = np.array([[0.8, -0.5], [0.3, 0.9]]) 
num_points = 5
num_steps = 50
colors = plt.cm.viridis(np.linspace(0, 1, num_points))
initial_points = Util.generate_X0_circle(num_points, (0, 0),5)
fig, ax = plt.subplots()
for i, X0 in enumerate(initial_points):
    x_values, y_values, U, V = Util.generate_trajectory(A, X0, num_steps)
    ax.quiver(x_values, y_values, U, V, color=colors[i], angles='xy',
            scale_units='xy', scale=2, width=.005)
plt.show()

Demo how to draw trajectory given our env

In [None]:
n = 2
m = 2
A = (np.random.rand(n, n) - 1) * 10
B = (np.random.rand(n, m) - 1) * 10
Q = np.eye(n)
R = np.eye(m) * 100
K, P = Util.findPK(A, B, Q, R)
Acl = A - B @ K

A = Acl
num_points = 20
num_steps = 40
colors = plt.cm.viridis(np.linspace(0, 1, num_points))
initial_points = Util.generate_X0_circle(num_points, (0, 0), 5)
fig, ax = plt.subplots()
for i, X0 in enumerate(initial_points):
    x_values, y_values, U, V = Util.generate_trajectory(A, X0, num_steps)
    ax.quiver(x_values, y_values, U, V, color=colors[i], angles='xy',
            scale_units='xy', scale=1, width=.005)
plt.show()

Demo how to draw trajectory given model

In [None]:
n = 2
m = 2
A, B = generate_state_transition_matix(n, m)
env = TimeLimit(Env_shy.CustomEnv(A, B, n, m, reward_type="Shy", test=False), max_episode_steps=1)
Acl = env.Acl
model = sb3.A2C("MlpPolicy", env, verbose=1)
model.learn(10000)

In [None]:
num_points = 10
num_steps = 10
colors = plt.cm.viridis(np.linspace(0, 1, num_points))
initial_points = Util.generate_X0_circle(num_points, (0, 0), 1)
fig, ax = plt.subplots()
for i, X0 in enumerate(initial_points):
    x_values, y_values, U, V = Util.generate_trajectory_for_model(env, model, X0, num_steps)
    ax.quiver(x_values, y_values, U, V, color=colors[i], angles='xy',
            scale_units='xy', scale=1, width=.005)
plt.show()