In [12]:
import pybullet_data
import pybullet as p
import pybullet_industrial as pi
import os
import time
import numpy as np

p.connect(p.GUI)  # Use p.DIRECT for headless
#p.connect(p.DIRECT)
data_path = pybullet_data.getDataPath()
p.setAdditionalSearchPath(pybullet_data.getDataPath())
print(pybullet_data.getDataPath())
plane_id = p.loadURDF("plane.urdf")

# Get the path to the PyBullet data directory
data_path = pybullet_data.getDataPath()

# List the files in the PyBullet data directory
files = os.getcwd()
dirname = os.path.join(files,'robots')
assets = os.path.join(files,'assets')

urdf_file1 = os.path.join(dirname,'rb5_850e.urdf')
asset1 = os.path.join(assets,'specimen.urdf')
#endeffector = os.path.join(assets,'endeffector.urdf')
endeffector = os.path.join(assets,
                                'gripper_cylinder.urdf')

start_orientation = p.getQuaternionFromEuler([0, 0, 0])

cube1 = p.loadURDF("cube.urdf", np.array(
        [0, 0, 0.5]), useFixedBase=True)
robot = pi.RobotBase(urdf_file1, np.array(
        [0, 0, 1]), start_orientation)
cube2 = p.loadURDF("cube.urdf", np.array(
        [1, 0, 0.5]), useFixedBase=True)
specimen = p.loadURDF(asset1, np.array(
[1.0, 0, 1.3]), useFixedBase=True)
meshScale = [1, 1, 1]

pi.draw_robot_frames(robot, life_time=0)

tool = pi.EndeffectorTool(endeffector, np.array([2.0, 0, 1.5]), start_orientation, robot)
p.setGravity(0, 0, -9.81)

c:\Users\b34b3\AppData\Local\Programs\Python\Python312\Lib\site-packages\pybullet_data


In [13]:
def initialize():
    position_list = [1,0.16175214925291828,0.49468914613294146,1.4254185043970449,-0.350008612250527,1.5717769960272012,-0.1634941307782464,0]
    gripper_position = [0.5, 0, 1.1]
    gripper_orientation = [1, 1, 1, 1]
    #update_gripper(gripper_position, gripper_orientation)

    pose_dict = {
    "base": 0.16175214925291828,
    "shoulder": 0.49468914613294146,
    "elbow": 1.4254185043970449,
    "wrist1": -0.3500086122505276,
    "wrist2": 1.5717769960272012,
    "wrist3": -0.1634941307782464

    }

    pi.RobotBase.reset_robot(robot,[0, 0, 1], start_orientation, position_list)
    pi.RobotBase.set_joint_position(robot,pose_dict)
    for i in range(1000):
      p.stepSimulation()

def check_collision(visualize=False):
    contacts = p.getContactPoints() 
    contacts = [c for c in contacts if c[1] >= 4] # 로봇의 베이스나, 엔드이펙터와 플랜지사이의 충돌은 무시
    if contacts:
        print("TCP 링크가 무언가와 충돌 발생!")
        for contact in contacts:
            print(contact)
    
    if visualize:
        for c in contacts:
            pos_onB = c[6]
            p.removeAllUserDebugItems()
            p.addUserDebugPoints(
                pointPositions=[pos_onB],
                pointColorsRGB=[[1, 0, 0]],   # 빨간색
                pointSize=50,
                lifeTime=10
            )

def update_camera(camera_position, camera_orientation):
    camera_id = None
    for body_id in range(p.getNumBodies()):
        body_name = p.getBodyInfo(body_id)[1].decode()
        if "camera" in body_name:  # 카메라 URDF 파일명이 포함된 경우
            camera_id = body_id
            break
    pos_offset = np.array([0.1, 0, 0.04])
    ori_offset = np.array([0.5, 0.5, 0, 0])
    p.resetBasePositionAndOrientation(camera_id, camera_position + pos_offset, camera_orientation + ori_offset)

def update_specimen(specimen_position, specimen_orientation):
    specimen_id = None
    for body_id in range(p.getNumBodies()):
        body_name = p.getBodyInfo(body_id)[1].decode()
        if "my_specimen" in body_name:  # my_specimen URDF 파일명이 포함된 경우
            specimen_id = body_id
            break
    p.resetBasePositionAndOrientation(specimen_id, specimen_position, specimen_orientation)

def update_gripper(gripper_position, gripper_orientation):
    gripper_id = None
    for body_id in range(p.getNumBodies()):
        body_name = p.getBodyInfo(body_id)[1].decode()
        if "gripper" in body_name:  # gripper URDF 파일명이 포함된 경우
            gripper_id = body_id
            break
    p.resetBasePositionAndOrientation(gripper_id, gripper_position, gripper_orientation)

def show_point(tool_pos):
    p.addUserDebugPoints(
    pointPositions=[tool_pos],
    pointColorsRGB=[[1, 0, 1]],  
    pointSize=10,
    lifeTime=1
)
    
        

In [14]:
initialize()

urdf_file3 = os.path.join(assets, 'camera.urdf')
camera_parameters = {'width': 480, 'height': 240, 'fov': 60,
                        'aspect ratio': 1, 'near plane distance': 0.01, 'far plane distance': 2}
camera_orientation = p.getQuaternionFromEuler([1.57, 0, 1.57])
camera_position = tool.get_tool_pose()[0] +  np.array([0, 0, 0.1])
camera = pi.Camera(urdf_file3, camera_position,
                camera_orientation, camera_parameters)

In [15]:
base_pose = np.array([0, 0, 0.5])
set_pose = np.array([0.700, 0, 0.81])
base_orientation = np.array([1.57, 0, 1.57])

target_pose = base_pose + set_pose


specimen_position = np.array([1.0, 0, 1.30])
ring_offset = np.array([0, 0, 0.015])
specimen_orientation_euler = np.array([0, 0, 0])
specimen_orientation = p.getQuaternionFromEuler(specimen_orientation_euler)
update_specimen(specimen_position, specimen_orientation)

for _ in range(24 * 1):  # 240 Hz simulation frequency
    tool.set_tool_pose(target_pose, p.getQuaternionFromEuler(base_orientation))
    img = camera.get_image()
    p.stepSimulation()
    camera_orientation = p.getQuaternionFromEuler([1.57, 0, 1.57])
    camera_position = tool.get_tool_pose()[0] +  np.array([0, 0, 0])
    update_camera(camera_position, camera_orientation)
    show_point(tool.get_tool_pose()[0]+np.array([0.135,0,0]))
    show_point(specimen_position+ring_offset)
    #time.sleep(1./24.)

check_collision(visualize=True)

In [None]:
import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.keras.layers import Input, Conv2D, Flatten, Dense
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.python.client import device_lib
import gym
import numpy as np
import random as rand

LOSS_CLIPPING = 0.2

class ModelParam:
    pass

def AI_init():
    model_param = ModelParam()
    # state and action space
    model_param.state_size = 4     # [pos,vel,angle,angular_vel] 
    model_param.img_height = 240
    model_param.img_width = 480
    model_param.img_channels = 3
    model_param.action_size = 12   # [x+,x-,y+,y-,z+,z-,roll+,roll-,pitch+,pitch-,yaw+,yaw-]
    model_param.value_size = 1

    # AI hyperparameters
    model_param.learning_rate_actor = 0.0005
    model_param.learning_rate_critic = 0.0005
    model_param.epochs_cnt = 5
    model_param.discount_rate = 0.98
    model_param.smooth_rate = 0.95
    model_param.penalty = -400
    model_param.episode_num = 500
    model_param.mini_batch_step_size = 32        
    model_param.node_num = 256     # used after flatten

    # Debug variables
    model_param.moving_avg_size = 20

    # model Initialization
    model_param.model_actor = build_model_actor(model_param)
    model_param.model_critic = build_model_critic(model_param)

    # reward buffer
    model_param.reward_list= []
    model_param.count_list = []
    model_param.moving_avg_list = []

    model_param.states, model_param.states_next, model_param.action_matrixs = state =np.zeros(4),[],[]
    model_param.dones, model_param.action_probs, model_param.rewards = [],[],[]

    # dummy data
    model_param.DUMMY_ACTION_MATRIX = np.zeros((1,1,model_param.action_size))
    model_param.DUMMY_ADVANTAGE = np.zeros((1,1,model_param.value_size))
    return model_param

class MyModel(tf.keras.Model):
        def train_step(self, data):
            in_datas, out_action_probs = data
            states, action_matrixs, advantages = in_datas[0], in_datas[1], in_datas[2]
            with tf.GradientTape() as tape:
                y_pred = self(states, training=True)
                new_policy = K.max(action_matrixs*y_pred, axis=-1)   
                old_policy = K.max(action_matrixs*out_action_probs, axis=-1)   
                r = new_policy/(old_policy + 1e-10)
                clipped = K.clip(r, 1-LOSS_CLIPPING, 1+LOSS_CLIPPING)
                loss = -K.minimum(r*advantages, clipped*advantages)
            
            trainable_vars = self.trainable_variables
            gradients = tape.gradient(loss, trainable_vars)
            self.optimizer.apply_gradients(zip(gradients, trainable_vars))

def build_model_actor(model_param):
     # 1) The main input: an RGB image
    input_states = Input(shape=(model_param.img_height, model_param.img_width, model_param.img_channels),
                         name='input_states')

    # 2) Additional inputs (same as your old code):
    input_action_matrixs = Input(shape=(1, model_param.action_size), name='input_action_matrixs')
    input_advantages = Input(shape=(1, model_param.value_size), name='input_advantages')

    # 3) Build a small CNN
    x = Conv2D(32, kernel_size=8, strides=4, activation='relu')(input_states)
    x = Conv2D(64, kernel_size=4, strides=2, activation='relu')(x)
    x = Conv2D(64, kernel_size=3, strides=1, activation='relu')(x)
    x = Flatten()(x)
    x = Dense(model_param.node_num, activation='relu')(x)

    # 4) Output: policy over model_param.action_size
    out_actions = Dense(model_param.action_size, activation='softmax', name='output')(x)

    # 5) Create a MyModel with multi-input
    model = MyModel(inputs=[input_states, input_action_matrixs, input_advantages],
                    outputs=out_actions)
    model.compile(optimizer=Adam(learning_rate=model_param.learning_rate_actor))

    model.summary()
    return model

def build_model_critic(model_param):
    # Takes image input: (height, width, channels)
    input_states = Input(shape=(model_param.img_height, model_param.img_width, model_param.img_channels),
                         name='input_states')

    x = Conv2D(32, kernel_size=8, strides=4, activation='relu')(input_states)
    x = Conv2D(64, kernel_size=4, strides=2, activation='relu')(x)
    x = Conv2D(64, kernel_size=3, strides=1, activation='relu')(x)
    x = Flatten()(x)
    x = Dense(model_param.node_num, activation='relu')(x)

    out_values = Dense(model_param.value_size, activation='linear', name='output')(x)

    model = tf.keras.models.Model(inputs=[input_states], outputs=[out_values])
    model.compile(optimizer=Adam(learning_rate=model_param.learning_rate_critic),
                  loss='mean_squared_error')
    model.summary()
    return model

def norm(pointA, pointB):
    return np.sqrt((pointA[0] - pointB[0])**2 + (pointA[1] - pointB[1])**2 + (pointA[2] - pointB[2])**2)

def get_state(prv_states):
    # init states[4]
    dt = 1/240
    states = np.zeros(4)
    target_position = specimen_position+ring_offset
    #print(target_position)
    end_effector_position = tool.get_tool_pose()[0]+np.array([0.135,0,0])
    #print(end_effector_position)
    end_effector_orientation = p.getEulerFromQuaternion(tool.get_tool_pose()[1]) + np.array([-np.pi/2,0,-np.pi/2])
    states[0] = norm(end_effector_position,target_position) # pos diff
    states[1] = (prv_states[0]-states[0])/dt # velocity
    states[2] = norm(end_effector_orientation,specimen_orientation_euler)# angle diff
    states[3] = (prv_states[2]-states[2])/dt # angular velocity
    #print states in a row
    print("pos diff: ",states[0]," velocity: ",states[1]," angle diff: ",states[2]," angular velocity: ",states[3])
    print("angle: ",end_effector_orientation, "angle_specimen"  ,specimen_orientation_euler)
    return states

def train(model_param):
    for episode in range(model_param.episode_num):
        initialize()
        model_param.states = np.zeros(4)
        model_param.states = get_state(model_param.states)
        count, reward_tot = make_memory(model_param)

def make_memory(model_param):
        reward_tot = 0
        count = 0
        reward = np.zeros(model_param.value_size)
        #advantage = np.zeros(model_param.value_size)
        #target = np.zeros(model_param.value_size)
        action_matrix = np.zeros(model_param.action_size)
        done = False
        
        while not done:
            count+=1

            #state_t = np.reshape(state,[1, 1, model_param.state_size])
            #action_matrix_t = np.reshape(action_matrix,[1, 1, model_param.action_size])

            # 1. take a picture
            img = camera.get_image() # now img is the state
            img_uint8 = img.astype(np.uint8)
            rgb = img_uint8[:,:,:3]
            rgb = np.expand_dims(rgb, axis=0)
            
            # 2. calculate probabilty and predict action
            action_prob = model_param.model_actor.predict([rgb, model_param.DUMMY_ACTION_MATRIX, model_param.DUMMY_ADVANTAGE])
            action = np.random.choice(model_param.action_size, 1, p=action_prob[0][0])[0]
            action_matrix = np.zeros(model_param.action_size) #초기화
            action_matrix[action] = 1

            # 3. move

            # 4. calculate reward

            # 5. judge if finished or not



            state_next, reward, done, none, none2 = model_param.env.step(action)
            
            state_next_t = np.reshape(state_next,[1, 1, model_param.state_size])
            
            if count < 500 and done:
                reward = model_param.penalty 
        
            model_param.states.append(np.reshape(state_t, [1,model_param.state_size]))
            model_param.states_next.append(np.reshape(state_next_t, [1,model_param.state_size]))
            model_param.action_matrixs.append(np.reshape(action_matrix, [1,model_param.action_size]))
            model_param.dones.append(np.reshape(0 if done else 1, [1,model_param.value_size]))
            model_param.action_probs.append(np.reshape(action_prob, [1,model_param.action_size]))
            model_param.rewards.append(np.reshape(reward, [1,model_param.value_size]))
            
            if(count % model_param.mini_batch_step_size == 0):
                model_param.train_mini_batch()
                model_param.clear_memory()

            reward_tot += reward
            state = state_next
            
        return count, reward_tot


model_param = AI_init()


In [23]:


print(action_prob[0][0])


2.0656309e-24


In [9]:
del model_param

In [29]:
import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.keras.layers import Input, Dense, Flatten
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.python.client import device_lib
import gym
import numpy as np
import random as rand

LOSS_CLIPPING = 0.2
class Agent(object):
    def __init__(self):
        self.env = gym.make('CartPole-v1')
        self.state_size = self.env.observation_space.shape[0]
        self.action_size = self.env.action_space.n
        self.value_size = 1
        
        self.node_num = 24
        self.learning_rate_actor = 0.0005
        self.learning_rate_critic = 0.0005
        self.epochs_cnt = 5
        self.model_actor = self.build_model_actor()
        self.model_critic = self.build_model_critic()
        
        self.discount_rate = 0.98
        self.smooth_rate = 0.95
        self.penalty = -400
        
        self.episode_num = 500
        self.mini_batch_step_size = 32        
        
        self.moving_avg_size = 20
        self.reward_list= []
        self.count_list = []
        self.moving_avg_list = []
        
        self.states, self.states_next, self.action_matrixs = [],[],[]
        self.dones, self.action_probs, self.rewards = [],[],[]
        self.DUMMY_ACTION_MATRIX = np.zeros((1,1,self.action_size))
        self.DUMMY_ADVANTAGE = np.zeros((1,1,self.value_size))
        
    class MyModel(tf.keras.Model):
        def train_step(self, data):
            in_datas, out_action_probs = data
            states, action_matrixs, advantages = in_datas[0], in_datas[1], in_datas[2]
            with tf.GradientTape() as tape:
                y_pred = self(states, training=True)
                new_policy = K.max(action_matrixs*y_pred, axis=-1)   
                old_policy = K.max(action_matrixs*out_action_probs, axis=-1)   
                r = new_policy/(old_policy)
                clipped = K.clip(r, 1-LOSS_CLIPPING, 1+LOSS_CLIPPING)
                loss = -K.minimum(r*advantages, clipped*advantages)
            
            trainable_vars = self.trainable_variables
            gradients = tape.gradient(loss, trainable_vars)
            self.optimizer.apply_gradients(zip(gradients, trainable_vars))
            
    def build_model_actor(self):
        input_states = Input(shape=(1,self.state_size), name='input_states')
        input_action_matrixs = Input(shape=(1,self.action_size), name='input_action_matrixs')
        input_advantages = Input(shape=(1,self.value_size), name='input_advantages')

        x = (input_states)
        x = Dense(self.node_num, activation='relu')(x)
        out_actions = Dense(self.action_size, activation='softmax', name='output')(x)
        
        model = self.MyModel(inputs=[input_states, input_action_matrixs, input_advantages], outputs=out_actions)
        model.compile(optimizer=Adam(learning_rate=self.learning_rate_actor))
        
        model.summary()
        return model
    
    def build_model_critic(self):
        input_states = Input(shape=(1,self.state_size), name='input_states')
        x = (input_states)
        x = Dense(self.node_num, activation='relu')(x)
        out_values = Dense(self.value_size, activation='linear', name='output')(x)
        
        model = tf.keras.models.Model(inputs=[input_states], outputs=[out_values])
        model.compile(optimizer=Adam(learning_rate=self.learning_rate_critic),
                      loss='mean_squared_error'
                     )
        model.summary()
        return model

    def train(self):
        for episode in range(self.episode_num):

            state = self.env.reset()
            state = state[0]
            self.env.max_episode_steps = 500

            count, reward_tot = self.make_memory(episode, state)
            self.train_mini_batch()
            self.clear_memory()
            
            if count < 500:
                reward_tot = reward_tot-self.penalty
                
            self.reward_list.append(reward_tot)
            self.count_list.append(count)
            self.moving_avg_list.append(self.moving_avg(self.count_list,self.moving_avg_size))                
            
            if(episode % 10 == 0):
                print("episode:{}, moving_avg:{}, rewards_avg:{}".format(episode, self.moving_avg_list[-1], np.mean(self.reward_list)))
        self.save_model()
        
    def make_memory(self, episode, state):
        reward_tot = 0
        count = 0
        reward = np.zeros(self.value_size)
        advantage = np.zeros(self.value_size)
        target = np.zeros(self.value_size)
        action_matrix = np.zeros(self.action_size)
        done = False
        
        while not done:
            count+=1

            state_t = np.reshape(state,[1, 1, self.state_size])
            action_matrix_t = np.reshape(action_matrix,[1, 1, self.action_size])
            
            action_prob = self.model_actor.predict([state_t, self.DUMMY_ACTION_MATRIX, self.DUMMY_ADVANTAGE])

            action = np.random.choice(self.action_size, 1, p=action_prob[0][0])[0]
            action_matrix = np.zeros(self.action_size) #초기화
            action_matrix[action] = 1

            state_next, reward, done, none, none2 = self.env.step(action)
            
            state_next_t = np.reshape(state_next,[1, 1, self.state_size])
            
            if count < 500 and done:
                reward = self.penalty 
        
            self.states.append(np.reshape(state_t, [1,self.state_size]))
            self.states_next.append(np.reshape(state_next_t, [1,self.state_size]))
            self.action_matrixs.append(np.reshape(action_matrix, [1,self.action_size]))
            self.dones.append(np.reshape(0 if done else 1, [1,self.value_size]))
            self.action_probs.append(np.reshape(action_prob, [1,self.action_size]))
            self.rewards.append(np.reshape(reward, [1,self.value_size]))
            
            if(count % self.mini_batch_step_size == 0):
                self.train_mini_batch()
                self.clear_memory()

            reward_tot += reward
            state = state_next
            
        return count, reward_tot
        
    def make_gae(self, values, values_next, rewards, dones):
        delta_adv, delta_tar, adv, target = 0, 0, 0, 0
        advantages = np.zeros(np.array(values).shape)
        targets = np.zeros(np.array(values).shape)
        for t in reversed(range(0, len(rewards))):
            delta_adv = rewards[t] + self.discount_rate * values_next[t] * dones[t] - values[t]
            delta_tar = rewards[t] + self.discount_rate * values_next[t] * dones[t]
            adv = delta_adv + self.smooth_rate*self.discount_rate * dones[t] * adv
            target = delta_tar + self.smooth_rate*self.discount_rate * dones[t] * target
            advantages[t] = adv
            targets[t] = target
        return advantages, targets

    def train_mini_batch(self):
        
        if len(self.states) == 0:
            return
        
        states_t = np.array(self.states)
        states_next_t = np.array(self.states_next)
        action_matrixs_t = np.array(self.action_matrixs)
        action_probs_t = np.array(self.action_probs)
        rewards_t = np.array(self.rewards)

        values = self.model_critic.predict(states_t)
        values_next = self.model_critic.predict(states_next_t)
        
        advantages, targets = self.make_gae(values, values_next, self.rewards, self.dones)
        advantages_t = np.array(advantages)
        targets_t = np.array(targets)

        self.model_actor.fit([states_t, action_matrixs_t, advantages_t], [action_probs_t], epochs=self.epochs_cnt, verbose=0)
        self.model_critic.fit(states_t, targets_t, epochs=self.epochs_cnt, verbose=0)       
 
    def clear_memory(self):
        self.states, self.states_next, self.action_matrixs = [],[],[]
        self.dones, self.action_probs, self.rewards = [],[],[]
        
    def moving_avg(self, data, size=10):
        if len(data) > size:
            c = np.array(data[len(data)-size:len(data)]) 
        else:
            c = np.array(data) 
        return np.mean(c)
    
    def save_model(self):
        self.model_actor.save("./model/ppo")
        print("*****end learing")

if __name__ == "__main__":
    agent = Agent()
    agent.train()

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 96ms/step


AttributeError: module 'numpy' has no attribute 'bool8'

In [None]:
env = gym.make('CartPole-v1')
env.observation_space.shape[0] # 위치, 속도, 각도, 각속도?
env.action_space.n # 좌우 2개

2

In [None]:
# get distance between pointA and pointB (3dim)


In [79]:
if 'state' not in locals():
    state =np.zeros(4)

state = get_state(state)


# if variable state does not exsit


pos diff:  0.16357298126706452  velocity:  0.0  angle diff:  0.02465193761612129  angular velocity:  0.0
angle:  [-0.02178325 -0.01130558 -0.00232204] angle_specimen [0 0 0]
