In [None]:
import gym
from gym import error,spaces,utils
from gym.utils import seeding
from typing import Optional
from collections import deque

import os
import pybullet as p
import pybullet_data
from pybullet_utils import bullet_client as bc
import math
import numpy as np
import random
import time

import tensorflow as tf
from tensorflow.keras import layers
import matplotlib.pyplot as plt

from tensorflow import keras

# Pendulum Environment

In [None]:

DEFAULT_X = np.pi
DEFAULT_Y = 1.0

class pendulumEnv(gym.Env):
    metadata = {
        "render_modes": ["human", "rgb_array"],
        "render_fps": 30,
    }
    def __init__(self, render_mode: Optional[str] = None, g=9.81):
        self.max_speed = 8
        self.max_torque = 0.15
        self.dt = 0.25
        self.g = g
        self.m = 0.4
        self.l = 0.08
        self._render_height = 200
        self._render_width = 320
        self._physics_client_id = -1

        self.render_mode = render_mode

        self.screen_dim = 500
        self.screen = None
        self.clock = None
        self.isopen = True

        high = np.array([1.0, 1.0, self.max_speed], dtype=np.float32)
        self.action_space = spaces.Discrete(3,start=-1)
        self.observation_space = spaces.Box(low=-high, high=high, dtype=np.float32)
        
    def step(self,u):
        p = self._p
        th, thdot = self.state  

        g = self.g
        
        m = self.m
        l = self.l
        dt = self.dt

        if u==-1:
            u = -0.15
        elif u==1:
            u = 0.15
        else:
            u = 0
            
        self.last_u = u  
        #print(u)
        
        p.setJointMotorControl2(self.boxId,0,p.TORQUE_CONTROL,force=u)
        p.stepSimulation()
        
        costs = np.cos(angle_normalize(th)) + 0.01*thdot**2 + 0.001*u**2
        
        th,thdot = p.getJointState(self.boxId,0)[0:2]
        self.state = th,thdot

        if self.render_mode == "human":
            self.render()
        return self._get_obs(), -costs, False, False, {}
    
    def _get_obs(self):
        theta, thetadot = self.state
        return np.array([np.cos(theta), np.sin(theta), thetadot], dtype=np.float32)
        
        
    def reset(self):
        if self._physics_client_id < 0:
            if self.render_mode == "human":
                self._p = bc.BulletClient(connection_mode=p.GUI)
            else:
                self._p = bc.BulletClient()
                
            self._physics_client_id = self._p._client
                
            p2 = self._p
            p2.resetSimulation()
            urdfRootPath=pybullet_data.getDataPath()
            planeUid = p2.loadURDF(os.path.join(urdfRootPath,"plane.urdf"), basePosition=[0,0,0])
            cubeStartPos = [0,0,1]
            cubeStartOrientation = p2.getQuaternionFromEuler([0,0,0])
            self.boxId = p2.loadURDF(os.path.join(urdfRootPath,"sreeram/Pendulum.urdf"),cubeStartPos, cubeStartOrientation)
        
            self.timeStep = 0.25
            p2.setJointMotorControl2(self.boxId, 0, p.VELOCITY_CONTROL, force=0.01)
            p2.setGravity(0, 0, -9.8)
            p2.setTimeStep(self.timeStep)
            p2.setRealTimeSimulation(0)
            
            
        p2 = self._p
        p2.resetJointState(self.boxId, 0, 0, 0)
        self.state = p2.getJointState(self.boxId, 0)[0:2] 
        return self._get_obs()
    
    def _get_obs(self):
        theta, thetadot = self.state
        return np.array([np.cos(theta), np.sin(theta), thetadot], dtype=np.float32)
        
    
    def render(self,mode="human",close=False):
        if mode=="human":
            self._renders=True
        if mode!="rgb_array":
            return np.array([])
    
        base_pos=[0,0,0]
        self._cam_dist = 2
        self._cam_pitch = 0.3
        self._cam_yaw = 0
        if (self._physics_client_id>=0):
            view_matrix = self._p.computeViewMatrixFromYawPitchRoll(
            cameraTargetPosition=base_pos,
            distance=self._cam_dist,
            yaw=self._cam_yaw,
            pitch=self._cam_pitch,
            roll=0,
            upAxisIndex=2)
            proj_matrix = self._p.computeProjectionMatrixFOV(fov=60,
                 aspect=float(self._render_width) /
                 self._render_height,
                 nearVal=0.1,
                 farVal=100.0)
            (_, _, px, _, _) = self._p.getCameraImage(
              width=self._render_width,
              height=self._render_height,
              renderer=self._p.ER_BULLET_HARDWARE_OPENGL,
              viewMatrix=view_matrix,
              projectionMatrix=proj_matrix)
        else:
            px = np.array([[[255,255,255,255]]*self._render_width]*self._render_height, dtype=np.uint8)
        rgb_array = np.array(px, dtype=np.uint8)
        rgb_array = np.reshape(np.array(px), (self._render_height, self._render_width, -1))
        rgb_array = rgb_array[:, :, :3]
        return rgb_array

def angle_normalize(x):
    return ((x + np.pi) % (2 * np.pi)) - np.pi


In [None]:
env = pendulumEnv(render_mode="human")

print(env.action_space.n)

num_actions = env.action_space.n
num_states = env.observation_space.shape[0]
print("Size of State Space ->  {}".format(num_states))


# DQN Algortihm

In [None]:
def create_q_model():
    
    inputs = layers.Input(shape=(num_states,))
    layer1 = layers.Dense(64,activation="relu")(inputs)
    layer2 = layers.Dense(32,activation="relu")(layer1)
    layer3 = layers.Dense(16,activation="relu")(layer2)
    layer4 = layers.Dense(8,activation="relu")(layer3)
    action = layers.Dense(num_actions,activation="linear")(layer4)

    return keras.Model(inputs=inputs, outputs=action)


model = create_q_model()

model_target = create_q_model()


In [None]:
optimizer = keras.optimizers.Adam(learning_rate=0.001)


action_history = []
state_history = []

sine_angle_history = []
cos_angle_history = []
velocity_history = []

state_next_history = []
rewards_history = []
done_history = []
episode_reward_history = []
running_reward = 0
episode_count = 0

max_memory_length = 100000

loss_function = keras.losses.Huber()

In [None]:
gamma = 0.99  
epsilon = 1.0  
min_epsilon = 0.01  
max_epsilon = 1.0   
batch_size = 32
decay_rate = 0.001

# Training

In [None]:
def remember(state,action,reward,next_state,done):
    action_history.append(action)
    state_history.append(state)
    state_next_history.append(next_state)
    done_history.append(done)
    rewards_history.append(reward)
    
    sine_angle_history.append(state[1])
    cos_angle_history.append(state[0])
    velocity_history.append(state[2])
    
def update(batch_size):
    if len(done_history)>batch_size:
        indices = np.random.choice(range(len(done_history)), size=batch_size)
    
        state_sample = np.array([state_history[i] for i in indices])
        state_next_sample = np.array([state_next_history[i] for i in indices])
        rewards_sample = [rewards_history[i] for i in indices]
        action_sample = [action_history[i] for i in indices]
        done_sample = tf.convert_to_tensor(
                [float(done_history[i]) for i in indices]
            )
            
        future_rewards = model_target(state_next_sample)
        updated_q_values = rewards_sample + gamma * tf.reduce_max(future_rewards, axis=1)

        updated_q_values = updated_q_values * (1 - done_sample) - done_sample

        masks = tf.one_hot(action_sample, num_actions)

        with tf.GradientTape() as tape:
            q_values = model(state_sample)
            q_action = tf.reduce_sum(tf.multiply(q_values, masks), axis=1)
            loss = loss_function(updated_q_values, q_action)

            grads = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))

        model_target.set_weights(model.get_weights())

    

In [None]:

for _ in range(1000):
    
    state = env.reset()
    episode_reward = 0

    for timestep in range(1,500):
        
        rand_num = random.uniform(0,1)
        if rand_num<=epsilon:
            action = env.action_space.sample()
            
        else:
            state_tensor = tf.convert_to_tensor(state)
            state_tensor = tf.expand_dims(state_tensor, 0)
            action_probs = model(state_tensor)
            action = tf.argmax(action_probs[0]).numpy()
            action = action-1

 
        epsilon = min_epsilon + (max_epsilon - min_epsilon) * np.exp(-decay_rate * episode_count)

        
        state_next, reward, done, _,temp = env.step(action)

        episode_reward += reward

        # Save actions and states in replay buffer
        remember(state,action,reward,state_next,done)
        state = state_next

        if timestep%50 == 0:
            update(batch_size)

        if len(rewards_history) > max_memory_length:
            del rewards_history[:1]
            del state_history[:1]
            del state_next_history[:1]
            del action_history[:1]
            del done_history[:1]

        if done:
            break
            
    print("Reward for episode {} is {}".format(episode_count,episode_reward))

    episode_reward_history.append(episode_reward)
    if len(episode_reward_history) > 100:
        del episode_reward_history[:1]
    running_reward = np.mean(episode_reward_history)

    episode_count += 1


In [None]:
env.close()

# Testing

In [None]:
#env = pendulumEnv(render_mode="human")

In [None]:
"""
for _ in range(5):
    
    state = env.reset()
    episode_reward = 0

    for timestep in range(1,500):
        
        state_tensor = tf.convert_to_tensor(state)
        state_tensor = tf.expand_dims(state_tensor, 0)
        action_probs = model(state_tensor)
        action = tf.argmax(action_probs[0]).numpy()
        action = action-1
        #print(action)
        #time.sleep(0.25)
        
        

        state_next, reward, done, _,temp = env.step(action)

        episode_reward += reward

        state = state_next


        if done:
            break
            
    print("Reward for episode {} is {}".format(episode_count,episode_reward))

    episode_reward_history.append(episode_reward)
    if len(episode_reward_history) > 100:
        del episode_reward_history[:1]
    running_reward = np.mean(episode_reward_history)

    episode_count += 1
"""