In [None]:
from torch import nn
import torch, gym, gym_unbalanced_disk, time, gym.wrappers
import numpy as np
from matplotlib import pyplot as plt
from copy import deepcopy
import os

In [None]:
class Qfunction(nn.Module):
    def __init__(self, env):
        super(Qfunction,self).__init__()
        self.lay1 = nn.Linear(env.observation_space.shape[0], 40)
        self.F1 =  nn.Tanh() #a)
        self.lay2 = nn.Linear(40,env.action_space.n)
    
    def forward(self, obs):
        return self.lay2(self.F1(self.lay1(obs)))

In [None]:
def show(Q,env,setting):
    u = []
    theta = []
    omega = []
    with torch.no_grad():
        #you can use Qfun(obs) as a shorthand for the q function.
        Qfun = lambda x: Q(torch.tensor(x[None,:],dtype=torch.float32))[0].numpy() #convert x to torch.tensor -> put in the Q function -> back to numpy
        try:
            obs = env.reset() #b)
            env.render() #b)
            time.sleep(1) #b)
            while True: #b)
                action = np.argmax(Qfun(obs)) #b)
                obs, reward, done, info = env.step(action) #b)
                time.sleep(1/60) #b)
                env.render() #b)
                if setting == "u":
                    print(env.u)
                elif setting == 'omega':
                    print(env.omega)
                elif setting == 'theta':
                    print(env.th)
                u.append(env.u)
                theta.append(env.th)
                omega.append(env.omega)
                if done: #b)
                    time.sleep(0.5)  #b)
                    break  #b)
        finally: #this will always run even when an error occurs
            env.close()
            return u,theta,omega

In [None]:
def rollout(Q, env, epsilon=0.1, N_rollout=10_000): 
    #save the following (use .append)
    Start_state = [] #hold an array of (x_t)
    Actions = [] #hold an array of (u_t)
    Rewards = [] #hold an array of (r_{t+1})
    End_state = [] #hold an array of (x_{t+1})
    Terminal = [] #hold an array of (terminal_{t+1})
    # Qfun( a numpy array of the obs) -> a numpy array of Q values
    Qfun = lambda x: Q(torch.tensor(x[None,:],dtype=torch.float32))[0].numpy() 
    with torch.no_grad():
        
        obs = env.reset() #c)
        for i in range(N_rollout): #c)
            if np.random.uniform()>epsilon: #c)
                Qnow = Qfun(obs) #c)
                action = np.argmax(Qnow) #c)
            else: #c)
                action = env.action_space.sample() #c)
            Start_state.append(obs) #c)
            Actions.append(action) #c)

            obs_next, reward, done, info = env.step(action) #c)
            terminal = done and not info.get('TimeLimit.truncated', False) #c)

            Terminal.append(terminal) #c)
            Rewards.append(reward) #c)
            End_state.append(obs_next) #c)

            if done: #c)
                obs = env.reset() #c)
            else: #c)
                obs = obs_next #c)
                
    #error checking:
    assert len(Start_state)==len(Actions)==len(Rewards)==len(End_state)==len(Terminal), f'error in lengths: {len(Start_state)}=={len(Actions)}=={len(Rewards)}=={len(End_state)}=={len(Dones)}'
    return np.array(Start_state), np.array(Actions), np.array(Rewards), np.array(End_state), np.array(Terminal).astype(int)

In [None]:
def eval_Q(Q,env):
    with torch.no_grad():
        Qfun = lambda x: Q(torch.tensor(x[None,:],dtype=torch.float32))[0].numpy()
        rewards_acc = 0 #d)
        obs = env.reset() #d)
        while True: #d)
            action = np.argmax(Qfun(obs)) #d)
            obs, reward, done, info = env.step(action) #d)
            rewards_acc += reward #d)
            if done: #d)
                return rewards_acc #d)

In [None]:
def DQN_rollout(Q, optimizer, env, gamma=0.98, use_target_net=False, N_iterations=21, N_rollout=20000, \
                N_epochs=10, batch_size=32, N_evals=10, target_net_update_feq=100):
    best = -float('inf')
    torch.save(Q.state_dict(),'Q-checkpoint')
    try:
        for iteration in range(N_iterations):
            epsilon = 1.0 - iteration/(N_iterations-1) #e=) 1.
            print(f'rollout iteration {iteration} with epsilon={epsilon:.2%}...')
            
            #2. rollout
            Start_state, Actions, Rewards, End_state, Dones = rollout(Q, env, epsilon=epsilon, N_rollout=N_rollout) #e) 2.
            
            #Data conversion, no changes required
            convert = lambda x: [torch.tensor(xi,dtype=torch.float32) for xi in x]
            Start_state, Rewards, End_state, Dones = convert([Start_state, Rewards, End_state, Dones])
            Actions = Actions.astype(int)

            print('starting training on rollout information...')
            t = 0
            for epoch in range(N_epochs): 
                for i in range(batch_size,len(Start_state)+1,batch_size): 
                    if t%target_net_update_feq==0:
                        Qtarget = deepcopy(Q) #g)
                        pass
                    t += 1
                    
                    Start_state_batch, Actions_batch, Rewards_batch, End_state_batch, Dones_batch = [d[i-batch_size:i] for d in [Start_state, Actions, Rewards, End_state, Dones]] #e=) 3.
                    
                    with torch.no_grad(): #3.
                        if use_target_net:
                            maxQ = torch.max(Qtarget(End_state_batch),dim=1)[0] #g)
                        else:
                            maxQ = torch.max(Q(End_state_batch),dim=1)[0] #e=) 3.
                    
                    action_index = np.stack((np.arange(batch_size),Actions_batch),axis=0)
                    Qnow = Q(Start_state_batch)[action_index] #Q(x_t,u_t) is given
                    
                    Loss = torch.mean((Rewards_batch + gamma*maxQ*(1-Dones_batch) - Qnow)**2) #e) 3.
                    optimizer.zero_grad() #e) 3.
                    Loss.backward() #e) 3.
                    optimizer.step() #e) 3.
                
                score = np.mean([eval_Q(Q,env) for i in range(N_evals)]) #e=) 3.
                
                print(f'iteration={iteration} epoch={epoch} Average Reward per episode:',score)
                if score>best:
                    best = score
                    print('################################# \n new best',best,'saving Q... \n#################################')
                    torch.save(Q.state_dict(),'Q-checkpoint')
            
            print('loading best result')
            Q.load_state_dict(torch.load('Q-checkpoint'))
    finally: #this will always run even when using the a KeyBoard Interrupt. 
        print('loading best result')
        Q.load_state_dict(torch.load('Q-checkpoint'))

In [None]:
max_episode_steps = 300
env = gym.make('unbalanced-disk-sincos-v0', dt=0.025, umax=3.)
env = gym.wrappers.time_limit.TimeLimit(env,max_episode_steps=max_episode_steps)

target_angle = np.pi # target set to be balanced on top

angle_reward = lambda self: ((np.cos(self.th-target_angle)+1.5)**2 - 0.25)
correctVelocity_reward = lambda self: 0.00125*(((np.cos(self.th)+1)/2)*(self.omega)**2)
voltage_penalty = lambda self: -0.01*((self.u)**2)
incorrectAngle_penalty = lambda self: -0.01*np.abs(self.th-target_angle)

reward_function = lambda self: angle_reward(self) + correctVelocity_reward(self) + voltage_penalty(self) + incorrectAngle_penalty(self)

env.change_reward_function(reward_function)

# env.set_discrete_values(discrete_size = 9, minmax = 3.0, div = 2.5, rnd = 2)
env.set_discrete_values_manual([-3.0, -1.5, -0.75, -0.5, -0.25, -0.15, -0.1, 0, 0.1, 0.15, 0.25, 0.5, 0.75, 1.5, 3.0])

gamma = 0.99 #f=)
batch_size = 45 #f=)
N_iterations = 18 #f=)
N_rollout = 20000 #f=)
N_epochs = 7 #f=)
N_evals = 4 #f=)
lr = 0.001 #given

assert isinstance(env.action_space,gym.spaces.Discrete), 'action space requires to be discrete'
Q = Qfunction(env)
optimizer = torch.optim.Adam(Q.parameters(),lr=lr) #low learning rate
DQN_rollout(Q, optimizer, env, use_target_net=True, gamma=gamma, N_iterations=N_iterations, \
            N_rollout=N_rollout, N_epochs=N_epochs, N_evals=N_evals)

In [None]:
# max_episode_steps = 300
# env = gym.make('unbalanced-disk-sincos-v0', dt=0.025, umax=3.)
# env = gym.wrappers.time_limit.TimeLimit(env,max_episode_steps=max_episode_steps)

# target_angle = np.pi # target set to be balanced on top


# # reward_function =  lambda self: np.exp(-(self.th%(2*np.pi)-np.pi)**2/(2*(np.pi/7)**2))
# # reward_function =  lambda self: (np.cos(self.th - target_angle)+1)**2  - np.cos(self.th-(np.pi+target_angle)) - 0.01*(self.omega)**2 - 0.01*(self.u)**2
# # reward_function =  lambda self: ((np.cos(self.th-target_angle)+1.5)**2 - 0.25) + 0.00125*(((np.cos(self.th)+1)/2)*(self.omega)**2) - 0.0075*((self.u)**2) - 0.0025*((self.omega)**2)

# # Washing machine (Qfunction2)
# # reward_function = lambda self: np.exp(-(self.th % (2 * np.pi) - np.pi) ** 2 / (2 * (np.pi / 7) ** 2)) + (1.0 if np.abs(self.th % (2 * np.pi) - np.pi) < np.pi / 2 else 0.0) - 0.001 * self.u ** 2
# # stuck at u=3
# # reward_function = lambda self: ((np.cos(self.th-target_angle)+1.6)** 2 - 0.25) - 0.0025*(self.omega)** 2 - 0.0075*(self.u)**2

# # reward_function = lambda self: np.exp(-(self.th % (2 * np.pi) - np.pi) ** 2 / (2 * (np.pi / 7) ** 2)) + (1.0 if np.abs(self.th % (2 * np.pi) - np.pi) < np.pi / 2 else -0.5) - 0.001 * self.u ** 2

# # Yann best
# reward_function =  lambda self: ((np.cos(self.th-target_angle)+1.5)**2 - 0.25) + 0.00125*(((np.cos(self.th)+1)/2)*(self.omega)**2) - 0.01*((self.u)**2)

# env.change_reward_function(reward_function)

# env.set_discrete_values(discrete_size = 9, minmax = 3.0, div = 2.5, rnd = 2)
# # env.set_discrete_values_manual([-3.0, -1.5, -0.75, -0.5, -0.2, -0.1, -0.05, 0, 0.05, 0.1, 0.2, 0.5, 0.75, 1.5, 3.0])

# # gamma = 0.98 #f=)
# # batch_size = 32 #f=)
# # N_iterations = 21 #f=)
# # N_rollout = 20000 #f=)
# # N_epochs = 10 #f=)
# # N_evals = 5 #f=)
# # lr = 0.0005 #given
# gamma = 0.98 #f=)
# batch_size = 64 #f=)
# N_iterations = 41 #f=)
# N_rollout = 50000 #f=)
# N_epochs = 50 #f=)
# N_evals = 10 #f=)
# lr = 0.0005 #given

# assert isinstance(env.action_space,gym.spaces.Discrete), 'action space requires to be discrete'
# Q = Qfunction(env)
# optimizer = torch.optim.Adam(Q.parameters(),lr=lr) #low learning rate
# DQN_rollout(Q, optimizer, env, use_target_net=False, gamma=gamma, N_iterations=N_iterations, \
#             N_rollout=N_rollout, N_epochs=N_epochs, N_evals=N_evals)

In [None]:
u,theta,omega = show(Q,env,'theta')

In [None]:
# Create a figure with three subplots
fig, axs = plt.subplots(1, 3, figsize=(12, 4))

# Plot theta
axs[0].plot(theta, '.')
axs[0].set_title('Theta')
axs[0].set_xlabel('instance')
axs[0].set_ylabel('Angle')

# Plot u
axs[1].plot(u, '.')
axs[1].set_title('u')
axs[1].set_xlabel('instance')
axs[1].set_ylabel('Input voltage')

# Plot omega
axs[2].plot(omega, '.')
axs[2].set_title('omega')
axs[2].set_xlabel('instance')
axs[2].set_ylabel('Speed')

# Adjust spacing between subplots
plt.tight_layout()

# Show the plot
plt.show()

In [None]:
eval_Q(Q,env)

In [None]:
from matplotlib import pyplot as plt
Rewards = [eval_Q(Q,env) for i in range(100)]
plt.plot(Rewards,'.')
plt.title(f'mean={np.mean(Rewards)}')
plt.xlabel('instance')
plt.ylabel('Reward per episode')
plt.show()

In [None]:
def visualize_theta(env, theta, basis_fun):
    # for a given enviroment, theta matrix (Nbasis, Naction) and basis_fun(obs) -> (Nbasis,) 
    # it visualizes the max Q value in state-space.
    low, high = env.observation_space.low, env.observation_space.high
    nvec = [50,60]
    Xvec = [np.linspace(l,h,num=ni) for l,h,ni in zip(low,high,nvec)] # calculate the linspace in all directions
    c_points = np.array(np.meshgrid(*Xvec)) # meshgrid all the linspaces together (Nx, X1, X2, X3, ...) 
    c_points = np.moveaxis(c_points, 0, -1) #transform to (X1, X2, X3, ..., Nobs) 
    c_points = c_points.reshape((-1,c_points.shape[-1])) #flatten into the size (Nc, Nobs)
    maxtheta = np.array([np.max(basis_fun(ci)@theta) for ci in c_points]).reshape((nvec[1],nvec[0]))
    
    plt.contour(Xvec[0],Xvec[1],maxtheta)
    plt.xlabel('position')
    plt.ylabel('velocity')
    plt.colorbar()
    plt.show()


def make_radial_basis_network(env,nvec,scale):
    # env: is the given enviroment
    # nvec: is the given number of grid points in each dimention.
    # scale: is the sigma_c in the equation
    if isinstance(nvec,int):
        nvec = [nvec]*env.observation_space.shape[0]
    
    # This creates a grid of points c_i the lower bound to the upper bound with nvec number of samples in each dimention
    low, high = env.observation_space.low, env.observation_space.high # get upper and lower bound
    assert np.all(np.isfinite(low)) and np.all(np.isfinite(high)), f'infinite bounds on obersvation space are not permitted low={low}, high={high}'
    Xvec = [np.linspace(l,h,num=ni) for l,h,ni in zip(low,high,nvec)] # calculate the linspace in all directions
    c_points = np.array(np.meshgrid(*Xvec)) # meshgrid all the linspaces together (Nx, X1, X2, X3, ...) 
    c_points = np.moveaxis(c_points, 0, -1) #transform to (X1, X2, X3, ..., Nobs) 
    c_points = c_points.reshape((-1,c_points.shape[-1])) #flatten into the size (Nc, Nobs)
    dx = np.array([X[1]-X[0] for X in Xvec]) # spacing (related to the B matrix)
    
    def basis_fun(obs):
        #this function should return the vector containing all phi_i of all c_points
        obs = np.array(obs) #(Nobs)
        
        dis = (c_points-obs[None,:])/dx[None,:] #dim = (Nbasis, Nobs)
        exp_arg = np.sum(dis**2,axis=1)/(2*scale**2) #squared distance to every point #b)
        Z = -exp_arg+np.min(exp_arg) #b) for numerical stability you can add the minimum.
        R = np.exp(Z) #b)
        return R/np.sum(R) #b)
    
    return basis_fun #returns a basis function


In [None]:
# nvec = 10
# scale = 0.5
# basis_fun = make_radial_basis_network(env,nvec,scale=scale) #e)
# visualize_theta(env, theta, basis_fun) #d)

In [None]:
Q

In [None]:
# Save Q to a file
torch.save(Q.state_dict(), 'QfunctionYann5.pt')

In [None]:
env.action_space.n

In [None]:
# max_episode_steps = 200
# env = gym.make('unbalanced-disk-sincos-v0', dt=0.025, umax=3.)
# env = gym.wrappers.time_limit.TimeLimit(env,max_episode_steps=max_episode_steps)
# env.set_discrete_values(discrete_size = 13, minmax = 3.0, div = 2.5, rnd = 2)

# target_angle = np.pi # target set to be balanced on top

# gamma = 0.98 #f=)
# batch_size = 32 #f=)
# N_iterations = 21 #f=)
# N_rollout = 20000 #f=)
# N_epochs = 10 #f=)
# N_evals = 5 #f=)
# lr = 0.0005 #given

# a_list = [0.5,1,5]
# b_list = [0.1, 0.01, 0.001]
# c_list = [0.1, 0.01, 0.001]

# results_table = np.zeros((len(a_list),len(b_list),len(c_list)))

# best_result = -float('inf') 
# best_result_idx = [0,0,0]

# for i, a in enumerate(a_list):
#     for j, b in enumerate(b_list):
#         for k, c in enumerate(c_list):
#             print(f'Currently running a = {a}, b = {b} and c = {c}')
#             env_it = deepcopy(env)
#             reward_function =  lambda self: -(a*(np.abs(self.th)-np.abs(target_angle))**2 + b*(self.omega)**2 + c*(self.u)**2)
#             env_it.change_reward_function(reward_function)
            
#             assert isinstance(env.action_space,gym.spaces.Discrete), 'action space requires to be discrete'
#             Q = Qfunction(env_it)
#             optimizer = torch.optim.Adam(Q.parameters(),lr=lr) #low learning rate
#             DQN_rollout(Q, optimizer, env_it, use_target_net=True, gamma=gamma, N_iterations=N_iterations, \
#                         N_rollout=N_rollout, N_epochs=N_epochs, N_evals=N_evals)
            
#             result = eval_Q(Q,env_it)
#             results_table[i,j,k] = result
            
#             print(f'Result = {result}')
            
#             if result > best_result:
#                 best_result = result
#                 best_result_idx = [i,j,k]
#                 print(f'################################# \n new best',result,'saving result... \n#################################')


# print(best_result)
# print(results_table)
            