# 1. Test Random Environment with OpenAI Gym

In [1]:
from gym import Env
from gym.spaces import Discrete, Box
import numpy as np
import random

In [2]:
class ShowerEnv(Env):
    def __init__(self):
        # Actions we can take, down, stay, up
        self.action_space = Discrete(3)
        # Temperature array
        self.observation_space = Box(low=np.array([0]), high=np.array([100]))
        # Set start temp
        self.state = 38 + random.randint(-3,3)
        # Set shower length
        self.shower_length = 60
        
    def step(self, action):
        # Apply action
        # 0 -1 = -1 temperature
        # 1 -1 = 0 
        # 2 -1 = 1 temperature 
        self.state += action -1 
        # Reduce shower length by 1 second
        self.shower_length -= 1 
        
        # Calculate reward
        if self.state >=37 and self.state <=39: 
            reward =1 
        else: 
            reward = -1 
        
        # Check if shower is done
        if self.shower_length <= 0: 
            done = True
        else:
            done = False
        
        # Apply temperature noise
        #self.state += random.randint(-1,1)
        # Set placeholder for info
        info = {}
        
        # Return step information
        return self.state, reward, done, info

    def render(self):
        # Implement viz
        pass
    
    def reset(self):
        # Reset shower temperature
        self.state = 38 + random.randint(-3,3)
        # Reset shower time
        self.shower_length = 60 
        return self.state
    

In [113]:
__credits__ = ["Carlos Luis"]

from os import path
from typing import Optional

import numpy as np

import gym
from gym import spaces
from gym.envs.classic_control import utils
from gym.error import DependencyNotInstalled
from numba import jit

DEFAULT_X = np.pi
DEFAULT_Y = 1.0


class PMSMEnv(gym.Env):

    metadata = {
        "render_modes": ["human", "rgb_array"],
        "render_fps": 30,
    }

    def __init__(self, render_mode: Optional[str] = None, parameters: Optional[dict] = None):

        
        self.Rs = parameters.get("Rs") if "Rs" in parameters else np.array(1.) 
        self.Ld = parameters.get("Ld") if "Ld" in parameters else np.array(1.)
        self.Lq = parameters.get("Lq") if "Lq" in parameters else np.array(1.)
        self.p = parameters.get("p") if "p" in parameters else np.array(1.)
        self.psi_f = parameters.get("psi_f") if "psi_f" in parameters else np.array(1.)
        self.Bm = parameters.get("Bm") if "Bm" in parameters else np.array(1.)
        self.J = parameters.get("J") if "J" in parameters else np.array(1.)

        self.max_speed = parameters.get("max_speed") if "max_speed" in parameters else np.array(1.)
        self.max_torque = parameters.get("max_torque") if "max_torque" in parameters else np.array(1.)
        self.max_voltage = parameters.get("max_voltage") if "max_voltage" in parameters else np.array(1.)
        self.max_current = parameters.get("max_current") if "max_current" in parameters else np.array(1.)
        
        self.dt = parameters.get("sample_time") if "sample_time" in parameters else np.array(0.001)
        self.simTime = parameters.get("simulation_time") if "simulation_time" in parameters else np.array(1.) 
        
        self.render_mode = render_mode

        self.screen_dim = 500
        self.screen = None
        self.clock = None
        self.isopen = True

        min_action = np.array([-self.max_voltage, -self.max_voltage], dtype=np.float32)
        max_action = np.array([self.max_voltage, self.max_voltage], dtype=np.float32)

        min_observation = np.array([-self.max_current, -self.max_current, -self.max_speed], dtype=np.float32)
        max_observation = np.array([self.max_current, self.max_current, self.max_speed], dtype=np.float32)

        self.action_space = spaces.Box(low=min_action, high=max_action, shape=(2,), dtype=np.float32)
        self.observation_space = spaces.Box(low=min_observation, high=max_observation, shape=(3,), dtype=np.float32)

        self.state = None
    
    def rungekutta4_step(self):
        k1 = self.Xdot(self.x_prev)
        k2 = self.Xdot(self.x_prev + k1 * self.dt/2.)
        k3 = self.Xdot(self.x_prev + k2 * self.dt/2.)
        k4 = self.Xdot(self.x_prev + k3 * self.dt)
        return self.x_prev + (self.dt/6.) * (k1 + 2*k2 + 2*k3 + k4)
        
    def step(self, u, id_ref, omega_ref, T_load):
        
        self.T_load = np.clip(T_load, -self.max_torque, self.max_torque)
        self.vd = np.clip(u[0], -self.max_voltage, self.max_voltage)
        self.vq = np.clip(u[1], -self.max_voltage, self.max_voltage)
        self.x_prev = np.array([self.id,self.iq,self.omega])

        # Get new states
        self.id, self.iq, self.omega = self.rungekutta4_step().tolist()
        self.x_current = np.array([self.id,self.iq,self.omega])
        self.id_dot, self.iq_dot, self.omega_dot = self.Xdot(self.x_current).tolist()

        self.lambda_d = self.Ld*self.id + self.psi_f
        self.lambda_q = self.Lq*self.iq
        self.Te = (3/2)*(self.p)*(self.lambda_d*self.iq - self.lambda_q*self.id)
        
        if self.render_mode == "human":
            self.render()
        
        # reduce time by one stimestep
        self.simTime -= self.dt

        # calculate error
        self.id_error = self.id - np.array(id_ref)
        self.omega_error = self.omega - np.array(omega_ref)

        # calculate error
        reward = -np.abs( self.id_error) -np.abs(self.omega_error)

        # check if episode is done
        done = self.simTime<=0

        # set placeholder for info
        info = {}


        return self.get_observations, reward, done, info

    
    def Xdot(self, xdot):
        id_, iq_, omega_ = xdot.tolist()
        omegae_ = omega_[0]*self.p

        A = np.array([[-self.Rs/self.Ld,  omegae_*self.Lq/self.Ld], 
                      [-omegae_*self.Ld/self.Lq, -self.Rs/self.Lq]])

        B = np.array([[1/self.Ld,  0], 
                      [0,  1/self.Lq]])

        v = np.array([[0],[-omegae_*self.psi_f/self.Lq]])
        x = np.array([id_, iq_])
        u = np.array([[self.vd],[self.vq]])
       
        idq_dot = np.matmul(A,x) + np.matmul(B,u) + v

        id_dot = idq_dot[0]
        iq_dot = idq_dot[1]

        lambda_d_ = self.Ld*id_ + self.psi_f
        lambda_q_ = self.Lq*iq_

        Te_ = (3/2)*(self.p)*(lambda_d_*iq_ - lambda_q_*id_)
        omega_dot = (Te_-(self.T_load + self.Bm*omega_))/self.J

        return np.array([id_dot, iq_dot, omega_dot])


    def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
        super().reset(seed=seed)
        if options is None:
            self.id_dot = np.array([0.])
            self.iq_dot = np.array([0.])
            self.omega_dot = np.array([0.])            
            self.id = np.array([0.])
            self.iq = np.array([0.])
            self.omega = np.array([0.])
        else:
            # Note that if you use custom reset bounds, it may lead to out-of-bound
            # state/observations.
            self.id = options.get("id_init") if "id_init" in options else np.array([0.])
            self.iq = options.get("iq_init") if "iq_init" in options else np.array([0.])
            self.omega = options.get("omega_init") if "omega_init" in options else np.array([0.])
            self.id_dot = np.array([0.])
            self.iq_dot = np.array([0.])
            self.omega_dot = np.array([0.])

        if self.render_mode == "human":
            self.render()
        return self.get_observations(), {}

    def get_observations(self):
        # sensor measurement noise can be implemented here
         observations = np.array([self.id, self.iq, self.omega], dtype=np.float32)
         return observations
   
    def render(self):
        # Implement visualization
        pass
        
    def close(self):
        pass

In [114]:
env = PMSMEnv(dt=0.001, simTime=20)

In [115]:
env.observation_space.sample()

array([0.43773526, 0.03539525, 1.0288391 ], dtype=float32)

In [117]:
env.action_space.sample()

array([0.7867692 , 0.50231093], dtype=float32)

In [None]:

Ts = 1e-5;  
Vm = 230;
f  = 50;
T_load = 0;


Rs = 0.5;

Ld0 = 3.5e-3;
Ld1 = 3.5e-3;

Lq0 = 5e-3;
Lq1 = 5e-3;

P = 6;
p = 6/2;

phi_f0 = 0.33;

    
J  = 0.004;
Bm = 0.0028;

kp_i = 100;
ki_i = 4000;

fsat_d = 1e-3;
fsat_q = 1e-3;


kp_s = 1;
ki_s = 3;

%observer design
Q = [1 0 0 0;
     0 1 0 0;
     0 0 100 0;
     0 0 0 100;];

R = diag([1 1]); %[1 0;0 1;];

C = [1 0 0 0;
    0 1 0 0;];

    
P_init = 1e-3*eye(4)

% open('PMSM_Torque_Observer_Project.slx')
% sim('PMSM_Torque_Observer_Project.slx')

In [116]:
episodes = 10
for episode in range(1, episodes+1):
    initial_conditions = {  "id_init": np.array([0.]),
                            "iq_init": np.array([3.]),                            
                            "omega_init": np.array([0.])
                         }
    state = env.reset(options=initial_conditions)
    done = False
    score = 0 
    
    while not done:
        #env.render()
        action = env.action_space.sample()
        id_ref = 0
        omega_ref = 2
        T_load = 0

        n_state, reward, done, info = env.step(action, id_ref, omega_ref, T_load)
        score+=reward
    print('Episode:{} Score:{}'.format(episode, score))

Episode:1 Score:[-40056.723438]
Episode:2 Score:[-1.99679729]
Episode:3 Score:[-1.99646119]
Episode:4 Score:[-1.99571589]
Episode:5 Score:[-1.99571358]
Episode:6 Score:[-1.99613282]
Episode:7 Score:[-1.99662713]
Episode:8 Score:[-1.99633854]
Episode:9 Score:[-1.9966823]
Episode:10 Score:[-1.99572764]


# 2. Create a Deep Learning Model with Keras

In [None]:
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.optimizers import Adam

In [None]:
states = env.observation_space.shape
actions = env.action_space.n

In [None]:
actions

3

In [None]:
def build_model(states, actions):
    model = Sequential()    
    model.add(Dense(24, activation='relu', input_shape=states))
    model.add(Dense(24, activation='relu'))
    model.add(Dense(actions, activation='linear'))
    return model

In [None]:
del model 

In [None]:
model = build_model(states, actions)

In [None]:
model.summary()

Model: "sequential_3"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_9 (Dense)              (None, 24)                48        
_________________________________________________________________
dense_10 (Dense)             (None, 24)                600       
_________________________________________________________________
dense_11 (Dense)             (None, 3)                 75        
Total params: 723
Trainable params: 723
Non-trainable params: 0
_________________________________________________________________


# 3. Build Agent with Keras-RL

In [None]:
from rl.agents import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory

In [None]:
def build_agent(model, actions):
    policy = BoltzmannQPolicy()
    memory = SequentialMemory(limit=50000, window_length=1)
    dqn = DQNAgent(model=model, memory=memory, policy=policy, 
                  nb_actions=actions, nb_steps_warmup=10, target_model_update=1e-2)
    return dqn

In [None]:
dqn = build_agent(model, actions)
dqn.compile(Adam(lr=1e-3), metrics=['mae'])
dqn.fit(env, nb_steps=50000, visualize=False, verbose=1)

Training for 50000 steps ...
Interval 1 (0 steps performed)
166 episodes - episode_reward: -38.000 [-60.000, 32.000] - loss: 1.235 - mae: 6.439 - mean_q: -8.204

Interval 2 (10000 steps performed)
167 episodes - episode_reward: -30.263 [-60.000, 36.000] - loss: 2.347 - mae: 11.012 - mean_q: -15.812

Interval 3 (20000 steps performed)
167 episodes - episode_reward: -27.964 [-60.000, 36.000] - loss: 2.621 - mae: 11.725 - mean_q: -16.873

Interval 4 (30000 steps performed)
166 episodes - episode_reward: -28.916 [-60.000, 42.000] - loss: 2.326 - mae: 10.960 - mean_q: -15.735

Interval 5 (40000 steps performed)

In [None]:
scores = dqn.test(env, nb_episodes=100, visualize=False)
print(np.mean(scores.history['episode_reward']))

Testing for 100 episodes ...
Episode 1: reward: -56.000, steps: 60
Episode 2: reward: -60.000, steps: 60
Episode 3: reward: -50.000, steps: 60
Episode 4: reward: -60.000, steps: 60
Episode 5: reward: -56.000, steps: 60
Episode 6: reward: -52.000, steps: 60
Episode 7: reward: -60.000, steps: 60
Episode 8: reward: -50.000, steps: 60
Episode 9: reward: -52.000, steps: 60
Episode 10: reward: -56.000, steps: 60
Episode 11: reward: -60.000, steps: 60
Episode 12: reward: -60.000, steps: 60
Episode 13: reward: -52.000, steps: 60
Episode 14: reward: -52.000, steps: 60
Episode 15: reward: -58.000, steps: 60
Episode 16: reward: -50.000, steps: 60
Episode 17: reward: -54.000, steps: 60
Episode 18: reward: -58.000, steps: 60
Episode 19: reward: -60.000, steps: 60
Episode 20: reward: -56.000, steps: 60
Episode 21: reward: -56.000, steps: 60
Episode 22: reward: -52.000, steps: 60
Episode 23: reward: -60.000, steps: 60
Episode 24: reward: -56.000, steps: 60
Episode 25: reward: -58.000, steps: 60
Episo

In [None]:
_ = dqn.test(env, nb_episodes=15, visualize=True)

Testing for 15 episodes ...
Episode 1: reward: 200.000, steps: 200
Episode 2: reward: 200.000, steps: 200
Episode 3: reward: 200.000, steps: 200
Episode 4: reward: 200.000, steps: 200
Episode 5: reward: 200.000, steps: 200
Episode 6: reward: 200.000, steps: 200
Episode 7: reward: 200.000, steps: 200
Episode 8: reward: 200.000, steps: 200
Episode 9: reward: 200.000, steps: 200
Episode 10: reward: 200.000, steps: 200
Episode 11: reward: 200.000, steps: 200
Episode 12: reward: 200.000, steps: 200
Episode 13: reward: 200.000, steps: 200
Episode 14: reward: 200.000, steps: 200
Episode 15: reward: 200.000, steps: 200


# 4. Reloading Agent from Memory

In [None]:
dqn.save_weights('dqn_weights.h5f', overwrite=True)

In [None]:
del model
del dqn
del env

In [None]:
env = gym.make('CartPole-v0')
actions = env.action_space.n
states = env.observation_space.shape[0]
model = build_model(states, actions)
dqn = build_agent(model, actions)
dqn.compile(Adam(lr=1e-3), metrics=['mae'])

In [None]:
dqn.load_weights('dqn_weights.h5f')

In [None]:
_ = dqn.test(env, nb_episodes=5, visualize=True)

Testing for 5 episodes ...
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
Episode 1: reward: 200.000, steps: 200
Episode 2: reward: 200.000, steps: 200
Episode 3: reward: 200.000, steps: 200
Episode 4: reward: 200.000, steps: 200
Episode 5: reward: 200.000, steps: 200
