In [2]:
### make environment for reinforcement learning ###
import numpy as np
import scipy.stats as stats
import matplotlib.pyplot as plt
from scipy.integrate import ode
import gym
from gym import spaces

In [5]:
# example of scipy ode solver
def f(t, y, params):
    dydt = np.zeros(2)
    dydt[0] = y[1]
    dydt[1] = -params[0]*y[0] - params[1]*y[1]
    return dydt

def solve_ode(params):
    t0 = 0
    t1 = 0.01
    dt = 0.01
    y0 = [1, 0]
    r = ode(f).set_integrator('dopri5')
    r.set_initial_value(y0, t0).set_f_params(params)
    t = []
    y = []
    while r.successful() and r.t < t1:
        r.integrate(r.t+dt)
        t.append(r.t)
        y.append(r.y)
    return np.array(t), np.array(y)




In [28]:
# python env for mass spring damper from gym
class MassSpringDamperEnv(gym.Env):
    def __init__(self) -> None:
        super().__init__()
        self.spring_constant = 1
        self.damping_constant = 0.1
        self.mass = 1
        self.gravity = 9.8
        self.dt = 0.01
        self.max_t = 10
        self.max_x = 10
        self.max_v = 10
        self.max_u = 10
        self.action_space = spaces.Box(low=-self.max_u, high=self.max_u, shape=(1,))
        self.observation_space = spaces.Box(low=np.array([-self.max_x, -self.max_v]), high=np.array([self.max_x, self.max_v]), shape=(2,))
        self.reset()

    def reset(self):
        self.t = 0
        self.x = 0
        self.v = 0
        return np.array([self.x, self.v])
    
    def step(self, u): # state, reward, done, info
        self.t += self.dt
        self.x += self.v*self.dt
        self.v += (self.gravity - self.damping_constant*self.v - self.spring_constant*self.x/self.mass + u/self.mass)*self.dt

        if self.t >= self.max_t:
            done = True
        else:
            done = False
        
        return np.array([self.x, self.v, 0, done, {}], dtype=object)


    
    def render(self):
        pass

    def close(self):
        pass

    


In [34]:
# python env for mass spring damper check #
if __name__ == "__main__":
    env = MassSpringDamperEnv()
    env.reset()
    for i in range(1000):
        u = np.random.randn(1)
        result = env.step(u)
        print(result)
        state = result[0]
        reward = result[1]
        done = result[2]
        info = result[3]
        print(env.step(u))
    env.close()

In [36]:
# train withh pytorch #
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import random
from collections import deque
import matplotlib.pyplot as plt
import numpy as np
import gym
import time

# hyperparameters
learning_rate = 0.0005
gamma = 0.98
buffer_limit = 50000
batch_size = 32

# Q-network
class Qnet(nn.Module):
    def __init__(self):
        super(Qnet, self).__init__()
        self.fc1 = nn.Linear(2, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 1)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        q = self.fc3(x)
        return q

    def sample_action(self, obs, epsilon):
        out = self.forward(obs)
        coin = random.random()
        if coin < epsilon:
            return random.randint(-10, 10)
        else:
            return out.item()
        
# replay buffer
class ReplayBuffer():
    def __init__(self):
        self.buffer = deque(maxlen=buffer_limit)
    
    def put(self, data):
        self.buffer.append(data)
    
    def sample(self, n):
        mini_batch = random.sample(self.buffer, n)
        s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []
        
        for transition in mini_batch:
            s, a, r, s_prime, done_mask = transition
            s_lst.append(s)
            a_lst.append([a])
            r_lst.append([r])
            s_prime_lst.append(s_prime)
            done_mask_lst.append([done_mask])
        
        return torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
            torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch.float), \
                torch.tensor(done_mask_lst)

    def size(self):
        return len(self.buffer)
    
# train
def train(q, q_target, memory):
    for i in range(10):
        s,a,r,s_prime,done_mask = memory.sample(batch_size)
        
        q_out = q(s)
        q_a = q_out.gather(1,a)
        max_q_prime = q_target(s_prime).max(1)[0].unsqueeze(1)
        target = r + gamma * max_q_prime * done_mask
        loss = F.smooth_l1_loss(q_a, target)
        
        q.optimizer.zero_grad()
        loss.backward()
        q.optimizer.step()

# main
def main():
    env = MassSpringDamperEnv()
    q = Qnet()
    q_target = Qnet()
    q_target.load_state_dict(q.state_dict())
    memory = ReplayBuffer()

    score = 0.0
    print_interval = 20

    for n_epi in range(10000):
        epsilon = max(0.01, 0.08 - 0.01*(n_epi/200)) # linear annealing from 8% to 1%
        s = env.reset()
        done = False

        while not done:
            a = q.sample_action(torch.from_numpy(s).float(), epsilon)
            result = env.step(a)
            s_prime = result[0]
            r = result[1]
            done = result[2]
            info = result[3]
            done_mask = 0.0 if done else 1.0
            memory.put((s,a,r/100.0,s_prime, done_mask))
            s = s_prime

            score += r
            if done:
                break
        
        if memory.size()>2000:
            train(q, q_target, memory)
        
        if n_epi%print_interval==0 and n_epi!=0:
            q_target.load_state_dict(q.state_dict())
            print("# of episode :{}, avg score : {:.1f}, buffer size : {}, epsilon : {:.1f}%".format(
                n_epi, score/print_interval, memory.size(), epsilon*100))
            score = 0.0
    env.close()

if __name__ == '__main__':
    main()