In [66]:
import numpy as np
import pickle
from copy import deepcopy
import pygame
import time
import sys
from win32gui import SetWindowPos

In [2]:
_DEFAULT_VALUE_AT_MARGIN = 0.1


def _sigmoids(x, value_at_1, sigmoid):
    if sigmoid in ('cosine', 'linear', 'quadratic'):
        if not 0 <= value_at_1 < 1:
            raise ValueError('`value_at_1` must be nonnegative and smaller than 1, '
                                             'got {}.'.format(value_at_1))
    else:
        if not 0 < value_at_1 < 1:
            raise ValueError('`value_at_1` must be strictly between 0 and 1, '
                                             'got {}.'.format(value_at_1))

    if sigmoid == 'gaussian':
        scale = np.sqrt(-2 * np.log(value_at_1))
        return np.exp(-0.5 * (x*scale)**2)

    elif sigmoid == 'hyperbolic':
        scale = np.arccosh(1/value_at_1)
        return 1 / np.cosh(x*scale)

    elif sigmoid == 'long_tail':
        scale = np.sqrt(1/value_at_1 - 1)
        return 1 / ((x*scale)**2 + 1)

    elif sigmoid == 'reciprocal':
        scale = 1/value_at_1 - 1
        return 1 / (abs(x)*scale + 1)

    elif sigmoid == 'cosine':
        scale = np.arccos(2*value_at_1 - 1) / np.pi
        scaled_x = x*scale
        with warnings.catch_warnings():
            warnings.filterwarnings(
                    action='ignore', message='invalid value encountered in cos')
            cos_pi_scaled_x = np.cos(np.pi*scaled_x)
        return np.where(abs(scaled_x) < 1, (1 + cos_pi_scaled_x)/2, 0.0)

    elif sigmoid == 'linear':
        scale = 1-value_at_1
        scaled_x = x*scale
        return np.where(abs(scaled_x) < 1, 1 - scaled_x, 0.0)

    elif sigmoid == 'quadratic':
        scale = np.sqrt(1-value_at_1)
        scaled_x = x*scale
        return np.where(abs(scaled_x) < 1, 1 - scaled_x**2, 0.0)

    elif sigmoid == 'tanh_squared':
        scale = np.arctanh(np.sqrt(1-value_at_1))
        return 1 - np.tanh(x*scale)**2

    else:
        raise ValueError('Unknown sigmoid type {!r}.'.format(sigmoid))


def tolerance(x, bounds=(0.0, 0.0), margin=0.0, sigmoid='gaussian',
                            value_at_margin=_DEFAULT_VALUE_AT_MARGIN):
    lower, upper = bounds
    if lower > upper:
        raise ValueError('Lower bound must be <= upper bound.')
    if margin < 0:
        raise ValueError('`margin` must be non-negative.')

    in_bounds = np.logical_and(lower <= x, x <= upper)
    if margin == 0:
        value = np.where(in_bounds, 1.0, 0.0)
    else:
        d = np.where(x < lower, lower - x, x - upper) / margin
        value = np.where(in_bounds, 1.0, _sigmoids(d, value_at_margin, sigmoid))

    return float(value) if np.isscalar(x) else value

def wrap(x):
    return ((x + np.pi) % (2 * np.pi)) - np.pi

def rect_points(center, length, width, ang, scaling, offset):
    points = []
    diag = np.sqrt(length**2+width**2)/2
    ang1 = 2*np.arctan2(width,length)
    ang2 = 2*np.arctan2(length,width)

    points.append((center[0]+np.sin(ang+ang1/2)*diag, center[1]+np.cos(ang+ang1/2)*diag))   
    
    points.append((center[0]+np.sin(ang+ang1/2+ang2)*diag, center[1]+np.cos(ang+ang1/2+ang2)*diag))
    
    points.append((center[0]+np.sin(ang+ang1*1.5+ang2)*diag, center[1]+np.cos(ang+ang1*1.5+ang2)*diag))
    
    points.append((center[0]+np.sin(ang+ang1*1.5+2*ang2)*diag, center[1]+np.cos(ang+ang1*1.5+2*ang2)*diag))
    
    return [pygame_transform(point, scaling, offset) for point in points]

def pygame_transform(point, scaling, offset):
    # Pygame's y axis points downwards. Hence invert y coordinate alone before offset.
    return (offset[0]+scaling*point[0],offset[1]-scaling*point[1]) 

def create_background(length, width):
    background = pygame.Surface((length, width))
    pygame.draw.rect(background, (0,0,0), pygame.Rect(0, 0, length, width))
    return background



In [215]:
class CartPole:
    def __init__(self):

        m1 = 1
        l1, r1, I1 = 0, 0, 0 # dummy
        
        m2 = 0.1
        # m2 = 1e-10
        l2 = 1
        r2 = l2/2
        I2 = m2 * l2**2 / 12
        
        g = 9.8

        m = [m1,m2]
        l = [l1,l2]
        r = [r1,r2]
        I = [I1,I2]

        self.name = "Cart Pole"
        self.n = 2
        self.obs_size = 5
        self.action_size = 1
        self.inertials = m+l+r+I+[g]
        self.a_scale = np.array([10.0])

        # For rendering
        self.display = False
        self.screen_width = 500
        self.screen_height = 500
        self.offset = [250, 250]
        self.scaling = 75
        self.x_limit = 2.0

        
        self.link_length = 1.0
        self.link_width = 0.2
        self.link_color = (72,209,204) # medium turquoise

        self.joint_radius = self.link_width/1.8
        self.joint_color = (205,55,0) # orange red

        self.cart_length = 5*self.link_width
        self.cart_width = 2*self.link_width
        self.cart_color = (200,255,0) # yellow

        self.rail_length = 2*self.x_limit
        self.rail_width = self.link_width/2.5
        self.rail_color = (150,150,150) # gray



        self.dt = 0.01 # time per simulation step (in seconds)
        self.t = 0 # elapsed simulation steps
        self.t_max = 10000000 # max simulation steps
        self.state = np.zeros(self.n)
        self.ang_vel_limit = 20.0
        with open("robot.p", "rb") as inf:
            funcs = pickle.load(inf)                
        self.kinematics = funcs['kinematics']
        self.dynamics = funcs['dynamics']
        pass

    
    def wrap_state(self):
        self.state[1] = wrap(self.state[1])

    def reset_state(self):
        self.state = np.array([0.01*np.random.randn(),
                               np.pi + 0.01*np.random.randn(),
                               0,0])

    def get_A(self, a):
        a_1, = np.clip(a, -1.0, 1.0)*self.a_scale
        a_2 = 0.0
        return np.array([a_1,a_2])

    def get_obs(self):
        return np.array([self.state[0],
                        np.cos(self.state[1]),np.sin(self.state[1]),
                        self.state[2],
                        self.state[3]
                        ])

    def get_reward(self):
        upright = (np.array([np.cos(self.state[1])])+1)/2

        if np.abs(self.state[0]) <= self.x_limit:
            centered = tolerance(self.state[0], margin=self.x_limit)
            centered = (1 + centered) / 2
        else:
            centered = 0.1

        qdot = self.state[self.n:]
        ang_vel = qdot
        small_velocity = tolerance(ang_vel[1:], margin=self.ang_vel_limit/2).min()
        small_velocity = (1 + small_velocity) / 2

        reward = upright.mean() * small_velocity * centered

        return reward  

    def reset(self):
        self.reset_state()
        self.wrap_state()
        self.geo = self.kinematics(self.inertials+self.state.tolist())
        self.t = 0

        return self.get_obs(), 0.0, False

    def step(self, a):
        self.state = self.rk4(self.state, self.get_A(a))
        self.wrap_state()
        self.geo = self.kinematics(self.inertials+self.state.tolist())
        
        self.t += 1
        if self.t >= self.t_max: # infinite horizon formulation, no terminal state, similar to dm_control
            done = True
        else:
            done = False

        if np.abs(self.state[0]) >= self.x_limit: done = True

        return self.get_obs(), self.get_reward(), done

    def F(self, s, a):
        M, C, G = self.dynamics(self.inertials+s.tolist())
        qdot = s[self.n:]
        qddot = np.linalg.inv(M+1e-6*np.eye(self.n)) @ (a - C @ qdot - G.flatten()) 
        
        return np.concatenate((qdot,qddot))

    def rk4(self, s, a):
        s0 = deepcopy(s)
        k = []
        for l in range(4):
            if l > 0:
                if l == 1 or l == 2:
                    dt = self.dt/2
                elif l == 3:
                    dt = self.dt
                s = s0 + dt * k[l-1]
            k.append(self.F(s, a))
        s = s0 + (self.dt/6.0) * (k[0] + 2 * k[1] + 2 * k[2] + k[3])
        
        return s
    
    def render(self):
        if self.display:
            self.screen.blit(self.background, (0, 0))
            self.draw()
            time.sleep(0.006)
            pygame.display.flip()
        else:
            self.display = True
            pygame.init()
            self.screen = pygame.display.set_mode((self.screen_width, self.screen_height))
            SetWindowPos(pygame.display.get_wm_info()['window'], -1, 200, 200, 0, 0, 1)
            pygame.display.set_caption(self.name)
            self.background = create_background(self.screen_width, self.screen_height)


    def draw(self):        
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                sys.exit()
        centers, joints, angles = self.geo

        #horizontal rail
        pygame.draw.polygon(self.screen, self.rail_color, rect_points([0,0], self.rail_length, self.rail_width, np.pi/2, self.scaling, self.offset)) 

        plot_x = ((centers[0,0] + self.x_limit) % (2 * self.x_limit)) - self.x_limit
        link1_points = rect_points([plot_x,0], self.cart_length, self.cart_width, np.pi/2, self.scaling, self.offset) 
        pygame.draw.polygon(self.screen, self.cart_color, link1_points)
        
        offset = np.array([plot_x-centers[0,0],0])
        for j in range(1,self.n):
            link_points = rect_points(centers[j]+offset, self.link_length, self.link_width, angles[j,0],self.scaling,self.offset)
            pygame.draw.polygon(self.screen, self.link_color, link_points)
        
            joint_point = pygame_transform(joints[j]+offset,self.scaling,self.offset)
            pygame.draw.circle(self.screen, self.joint_color, joint_point, self.scaling*self.joint_radius)
        pygame.display.update()

In [31]:
cp = CartPole()
cp.reset_state()
cp.state[0] = 0
cp.state[1] = 0
cp.state[3] = 0.1

# cp.state[2] = 0
print(cp.state[0])
for i in range(100):
    print(cp.step(0.001*np.cos(i))[1])
    cp.render()

pygame.quit()
# print(cp.step(0)[0])
# print(cp.step(0)[0])
# print(cp.step(0)[0])
# print(cp.step(0)[0])
# print(cp.step(0)[0])
# print(cp.step(0)[0])
# print(cp.step(0)[0])
# print(cp.step(-1)[0])
# print(cp.step(-1)[0])




0.0
0.9999710075929191
0.99997016722535
0.9999686499946226
0.9999664757631976
0.999963706114497
0.9999603773549706
0.999956451414823
0.9999518299099678
0.9999464193351973
0.9999401890148919
0.9999331697765069
0.9999253923092746
0.9999168184845828
0.9999073286554322
0.999896778446854
0.9998850747430672
0.9998722021832472
0.9998581732440035
0.9998429429326232
0.9998263652524819
0.999808234003744
0.9997883732750561
0.9997666959132209
0.999743173495856
0.9997177388882377
0.999690207583729
0.9996602926726034
0.9996277038774397
0.9995922419454584
0.9995537989965914
0.9995122562804135
0.9994673668617733
0.9994187324079477
0.9993659009211315
0.9993084993943834
0.999246277322881
0.9991790112193497
0.999106347438129
0.9990277262670012
0.9989424632207716
0.9988499186504378
0.9987495987230106
0.9986410836805315
0.9985238340415729
0.9983970464677832
0.9982596971135625
0.998110740622423
0.9979492826055737
0.997774554747989
0.997585694642416
0.9973815190523169
0.9971605010655917
0.9969209817001267
0.

In [9]:
pygame.quit()

In [249]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import random
from collections import deque

# Hyperparameters
STATE_DIM = 5  # [cart position, cos(angle), sin(angle), velocity, angular velocity]
ACTION_SPACE = [-1.0, 0.0, 1.0]  # Discretized action space
GAMMA = 0.99
LR = 1e-3
BATCH_SIZE = 64
EPSILON_START = 1.0
EPSILON_END = 0.1
EPSILON_DECAY = 5000
MEMORY_CAPACITY = 10000
TARGET_UPDATE_FREQ = 500
MAX_EPISODES = 500
MAX_STEPS = 400

class DQNetwork(nn.Module):
    def __init__(self, state_dim):
        super(DQNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.out = nn.Linear(128, 1)  # Output a single action in [-1, 1]

    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        action = torch.tanh(self.out(x))  # Ensure action is in [-1, 1]
        return action

# Replay Buffer
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    
    def add(self, experience):
        self.buffer.append(experience)
    
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (
            np.array(states),
            np.array(actions),
            np.array(rewards, dtype=np.float32),
            np.array(next_states),
            np.array(dones, dtype=np.float32),
        )
    
    def size(self):
        return len(self.buffer)

def select_action(state, noise_std=0.1):
    state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
    action = policy_net(state).item()
    action += np.random.normal(0, noise_std)  # Add Gaussian noise for exploration
    return np.clip(action, -1, 1)  # Clamp action to [-1, 1]


env = CartPole()
policy_net = DQNetwork(STATE_DIM)
target_net = DQNetwork(STATE_DIM)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=LR)
replay_buffer = ReplayBuffer(MEMORY_CAPACITY)
epsilon = EPSILON_START
epsilon_decay = (EPSILON_START - EPSILON_END) / EPSILON_DECAY



In [251]:
MAX_STEPS = 400

steps_done = 0

for episode in range(200):
    state, reward, done = env.reset()
    total_reward = 0

    for t in range(MAX_STEPS):
        # Select action
        action = select_action(state, epsilon)
        
        # Perform action
        next_state, reward, done = env.step(action)
        # print(next_state.shape)
        replay_buffer.add((state, action, reward, next_state, done))
        state = next_state
        total_reward += reward
        steps_done += 1

        # Decrease epsilon
        epsilon = max(EPSILON_END, epsilon - epsilon_decay)

        # Train the network
        if replay_buffer.size() >= BATCH_SIZE:
            # print(replay_buffer.sample(BATCH_SIZE))
            states, actions, rewards, next_states, dones = replay_buffer.sample(BATCH_SIZE)
            
            states_tensor = torch.FloatTensor(states)
            actions_tensor = torch.LongTensor(actions).unsqueeze(1)
            rewards_tensor = torch.FloatTensor(rewards)
            next_states_tensor = torch.FloatTensor(next_states)
            dones_tensor = torch.FloatTensor(dones)

            # Q-values for the current states
            predicted_actions = policy_net(states_tensor).squeeze()
            with torch.no_grad():
                target_actions = target_net(next_states_tensor).squeeze()
                target_values = rewards_tensor + GAMMA * (1 - dones_tensor) * target_actions

            loss = F.mse_loss(predicted_actions, target_values)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Update target network
        if steps_done % TARGET_UPDATE_FREQ == 0:
            target_net.load_state_dict(policy_net.state_dict())
        
        if done:
            break

    print(f"Episode {episode + 1}, Total Reward: {total_reward}")

print("Training Complete!")



Episode 1, Total Reward: 7.117726845378057
Episode 2, Total Reward: 7.073973744688231
Episode 3, Total Reward: 7.099217249753773
Episode 4, Total Reward: 7.298702605703624
Episode 5, Total Reward: 6.988197217073682
Episode 6, Total Reward: 7.125907681304353
Episode 7, Total Reward: 6.9289337793996735
Episode 8, Total Reward: 6.991233270456222
Episode 9, Total Reward: 7.290256011958554
Episode 10, Total Reward: 7.116665355714294
Episode 11, Total Reward: 7.186852529143123
Episode 12, Total Reward: 7.049926426751801
Episode 13, Total Reward: 7.215255051750727
Episode 14, Total Reward: 7.033314948448251
Episode 15, Total Reward: 7.088752036651637
Episode 16, Total Reward: 7.173305841726743
Episode 17, Total Reward: 7.103817348188387
Episode 18, Total Reward: 7.1727839109654745
Episode 19, Total Reward: 7.189483936064566
Episode 20, Total Reward: 7.235369060389344
Episode 21, Total Reward: 7.219348625668055
Episode 22, Total Reward: 7.063798217764215
Episode 23, Total Reward: 7.08461237280

In [248]:
import torch
import numpy as np

env.display = False
# Function to select the best action (exploitation only, no Îµ-greedy)
def select_best_action(state, policy_net):
    state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
    action = policy_net(state).item()
    return np.clip(action, -1, 1)  # Clamp action to [-1, 1]

# Run the model and render the results
def run_trained_model(policy_net, env, max_steps=MAX_STEPS):
    state, reward, done = env.reset()
    total_reward = 0

    for t in range(max_steps):
        env.render()  # Display the environment using pygame
        action = select_best_action(state, policy_net)
        state, reward, done = env.step(action)
        total_reward += reward

        if done:
            print(f"Episode finished after {t + 1} steps with total reward: {total_reward}")
            break

    # env.close()
    return total_reward

env.reset()
# Run the model
total_reward = run_trained_model(policy_net, env)
print(f"Total Reward from the trained model: {total_reward}")
pygame.quit()

Episode finished after 67 steps with total reward: 7.137649269512246
Total Reward from the trained model: 7.137649269512246


In [51]:
pygame.quit()

In [206]:
EPSILON_END = 0.01