In [None]:
import numpy as np
import pybullet as p
import pandas as pd
import time
from robot_descriptions.loaders.pybullet import load_robot_description

In [None]:
p.connect(p.GUI_SERVER)
sim_timestep = 0.0001
p.setTimeStep(sim_timestep)
p.setGravity(0, 0, -9.81)

In [None]:
robot_id = load_robot_description('ur5_description', useFixedBase=True) 
joint_indices = [i for i in range(p.getNumJoints(robot_id)) if p.getJointInfo(robot_id, i)[2] == p.JOINT_REVOLUTE]
for i in range(p.getNumJoints(robot_id)):
  print(p.getJointInfo(robot_id, i))
  print(p.getJointInfo(robot_id, i)[2] == p.JOINT_REVOLUTE)

wrist3_joint = 6
eff_idx = 7
pendulum_joint_idx = 8
immovable_joints = [pendulum_joint_idx]
print("End effector joint: ", eff_idx)
print("Revolute joints: ", joint_indices)

# Disable default motor control

In [None]:
for i in joint_indices:
    p.setJointMotorControl2(
        bodyUniqueId=robot_id,
        jointIndex=i,
        controlMode=p.VELOCITY_CONTROL,
        force=0  # Disable motor forces
    )

# Feedback and Stabilization

In [None]:
def pd_control(robot_id, q_desired, v_desired, Kp, Kd):
  joint_states = p.getJointStates(robot_id, joint_indices)
  q_measured = np.array([state[0] for state in joint_states])
  v_measured = np.array([state[1] for state in joint_states])
  position_error = q_desired - q_measured
  velocity_error = v_desired - v_measured
  
  tau = Kp * position_error + Kd * velocity_error
  # add gravity compensation
  tau += np.array(p.calculateInverseDynamics(robot_id, q_measured.tolist(), [0] * len(joint_indices), [0] * len(joint_indices)))
  for i in range(len(joint_indices)):
    if joint_indices[i] in immovable_joints: # ensure pendulum and wrist 3 joint is commanded with zero torque
      tau[i] = 0
  return tau

Kp = 100 * np.ones_like(joint_indices)
Kd = 5 * np.ones_like(joint_indices)

def control_to_desired_configuration(robot_id, q_desired, v_desired=None, max_steps=5000, sleep=None):
  if v_desired is None:
    v_desired = np.zeros_like(q_desired)
  for step in range(max_steps):  # Simulation steps
    torques = pd_control(robot_id, q_desired, v_desired, Kp, Kd).tolist()
    p.setJointMotorControlArray(
      bodyUniqueId=robot_id,
      jointIndices=joint_indices,
      controlMode=p.TORQUE_CONTROL,
      forces=torques
    )
    p.stepSimulation()
    if sleep is not None:
      time.sleep(sleep)
    

# Resetting to balanced pose

In [None]:
rest_pose_balancing = [-np.pi / 18 * x for x in [0, 4, -6, 3, 0, 0]]
def reset_to_balancing_pose(robot_id):
  for i in range(len(joint_indices)): 
    p.resetJointState(robot_id, joint_indices[i], targetValue=rest_pose_balancing[i], targetVelocity=0)
    p.setJointMotorControl2(
          bodyUniqueId=robot_id,
          jointIndex=joint_indices[i],
          controlMode=p.VELOCITY_CONTROL,
          force=0  # Disable motor forces
      )

In [None]:
reset_to_balancing_pose(robot_id)
print(p.getLinkState(robot_id, eff_idx))

# Moving end-effector along a horizontal line

## Forward movement

In [None]:
# Define the start point and direction of the line
reset_to_balancing_pose(robot_id)
start_point = np.array(p.getLinkState(robot_id, eff_idx)[0])  # [x, y, z] in 3D space
direction_vector = np.array([+0.2, 0, 0])  # Direction along X-axis (horizontal)
target_positions = start_point + np.linspace(0, 1, num=50)[:, np.newaxis] * direction_vector[np.newaxis, :]
print(target_positions[0], target_positions[-1])
print(p.getJointInfo(robot_id, eff_idx))

time.sleep(1)
for target in target_positions:
  joint_angles = p.calculateInverseKinematics(robot_id,eff_idx,target)
  control_to_desired_configuration(robot_id, joint_angles)
  time.sleep(1 / 1000.)

## Backward movement

In [None]:
# Define the start point and direction of the line
reset_to_balancing_pose(robot_id)
start_point = np.array(p.getLinkState(robot_id, eff_idx)[0])  # [x, y, z] in 3D space
direction_vector = np.array([-0.3, 0, 0])  # Direction along X-axis (horizontal)
target_positions = start_point + np.linspace(0, 1, num=50)[:, np.newaxis] * direction_vector[np.newaxis, :]
print(target_positions[0], target_positions[-1])
print(p.getJointInfo(robot_id, eff_idx))

time.sleep(1)
for target in target_positions:
  joint_angles = p.calculateInverseKinematics(robot_id,eff_idx,target)
  control_to_desired_configuration(robot_id, joint_angles)
  time.sleep(1 / 1000.)

# Balancing the pendulum

In [None]:
from tqdm import tqdm
from math import fmod

class EnvRealistic:
  axis_vector = np.array([0.1, 0, 0])
  action_space = "Continuous [-1, 1]"
  def __init__(self, TIME_HORIZON=1000, sleep=None):
    self.TIME_HORIZON = TIME_HORIZON
    self.sleep=sleep
    self.print_dead = True
    self.reference_zero = 0
    self.reset()

  def rand_action(self):
    return np.random.rand() * 2 - 1

  def convert_3d_coords_to_line_position(self, world_pos):
    return world_pos[0] - self.start_point[0]

  def convert_3d_velo_to_line_velo(self, world_vel):
    return np.dot(EnvRealistic.axis_vector, world_vel)

  def get_state(self):
    (world_pos, _, _, _, _, _, world_velo, _) = p.getLinkState(robot_id, eff_idx, computeLinkVelocity=True)
    x = self.convert_3d_coords_to_line_position(np.array(world_pos))
    x_dot = self.convert_3d_velo_to_line_velo(np.array(world_velo))
    theta = fmod(p.getJointState(robot_id, pendulum_joint_idx)[0] - self.reference_zero, 2 * np.pi)
    if theta > np.pi:
      theta = theta - 2 * np.pi
    elif theta < -np.pi:
      theta = 2 * np.pi + theta
    # print(p.getJointState(robot_id, pendulum_joint_idx)[0], theta)
    theta_dot = p.getJointState(robot_id, pendulum_joint_idx)[1]
    return (x, x_dot, theta, theta_dot)

  def reset(self, randomize_theta_dot=False):
    """ 
    Returns state
    """
    self.timestep = 0
    reset_to_balancing_pose(robot_id)
    if randomize_theta_dot:
      max_theta_dot = 0.4
      max_theta_deviation = 0.03
      p.resetJointState(robot_id, pendulum_joint_idx, 
      targetValue=rest_pose_balancing[-1] + np.random.rand() * 2 * max_theta_deviation - max_theta_deviation,
      targetVelocity=np.random.rand() * 2 * max_theta_dot - max_theta_dot)
    self.start_point = p.getLinkState(robot_id, eff_idx)[0]
    self.back_point = start_point - 3 * EnvRealistic.axis_vector
    self.front_point = start_point  + 2 * EnvRealistic.axis_vector
    return (0, 0, 0, 0)

  
  def step(self, action):
    """
    Returns tuple (state, dead, truncated)
    """
    if action < -1 or action > 1:
      raise Exception("Action not in range [-1, 1]")
    current_position = p.getLinkState(robot_id, eff_idx)[0]
    target = np.array(current_position) + action * EnvRealistic.axis_vector
      
    joint_angles = p.calculateInverseKinematics(robot_id,eff_idx,target)
    control_to_desired_configuration(robot_id, joint_angles, max_steps=100, sleep=self.sleep)
    (x, x_dot, theta, theta_dot) = self.get_state()
    self.timestep += 1
    dead = False
    truncated = False
    if np.abs(theta) > 0.3:
      if self.print_dead:
        print(theta, " theta DEAD")
      dead = True
    if self.timestep == self.TIME_HORIZON:
      truncated = True
    return ((x, x_dot, theta, theta_dot), dead, truncated)


def generate_sample_of_system_dynamics_realistic(env, nb_samples):
  samples = []
  s = env.reset(randomize_theta_dot=True)
  for _ in tqdm(range(nb_samples)):
    a = env.rand_action()
    next_s, dead, trunc = env.step(a)
    samples.append({
      'x': s[0], 
      'x_dot': s[1], 
      'theta': s[2], 
      'theta_dot': s[3], 
      'u': a, # mapping inputs to {-1, 1} rather than {0, 1} is important, because otherwise the linear regression needs a non-zero constant to fit well
      'evolution_x': next_s[0], 
      'evolution_x_dot': next_s[1], 
      'evolution_theta': next_s[2],
      'evolution_theta_dot': next_s[3]})
    if dead or trunc:
      s = env.reset(randomize_theta_dot=True)
    else:
      s = next_s
  return pd.DataFrame(samples)

In [None]:
env = EnvRealistic(sleep=0.0001)
env.print_dead = False
samples = generate_sample_of_system_dynamics_realistic(env, 5000)

In [None]:
print(samples.describe())

## Fit a Linear regression on the data

In [None]:
from statsmodels.api import OLS

small_angle_samples = samples
predictors = small_angle_samples[['x', 'x_dot', 'theta', 'theta_dot', 'u']]
outcomes = ['evolution_x_dot', 'evolution_theta_dot', 'evolution_x', 'evolution_theta']
params = {}
for o in outcomes:
  model = OLS(small_angle_samples[o], predictors)
  fit = model.fit()
  print(f"{o} linear model:")
  print(fit.params)
  print(f"Mean squared error of {o} fit", fit.mse_resid)
  params[o] = fit.params.to_numpy()

## Computing a state-feedback controller according to a discrete-time linear quadratic regulator design

In [None]:
from control import dlqr

dT = 0.0001
A = np.array([
  params['evolution_x'][:4], #x
  params['evolution_x_dot'][:4], #x_dot
  params['evolution_theta'][:4], #theta
  params['evolution_theta_dot'][:4] #theta_dot
])
B = np.array([
  [params['evolution_x'][4]], #x
  [params['evolution_x_dot'][4]], #x_dot
  [params['evolution_theta'][4]], #theta
  [params['evolution_theta_dot'][4]] #theta_dot
])
Q = np.diag([125, 50, 1250, 25])
R = np.diag([1.5])

K, S, E = dlqr(A, B, Q, R)
print(K)

# Trying out the learned policy

In [None]:
s = env.reset()
time.sleep(1)
while True:
  a = np.clip(np.dot(-K, s), -1, 1)
  print(a)
  s, dead, trunc = env.step(a)
  if dead or trunc:
    print(f"Successfull balancing for {env.timestep} timesteps")
    break

## Reseting to swing pose

In [None]:
rest_pose_swing = rest_pose_balancing.copy()
rest_pose_swing[-1] = np.pi
def reset_to_swing_pose(robot_id):
  for i in range(len(joint_indices)): 
    p.resetJointState(robot_id, joint_indices[i], targetValue=rest_pose_swing[i], targetVelocity=0)
    p.setJointMotorControl2(
          bodyUniqueId=robot_id,
          jointIndex=joint_indices[i],
          controlMode=p.VELOCITY_CONTROL,
          force=0  # Disable motor forces
      )

In [None]:
reset_to_swing_pose(robot_id)

## Read imitations file

In [None]:
imitation = []
with open("u_opt.txt", "r") as f:
  for line in f.readlines():
    imitation.append(float(line))

## Perform imitation

In [None]:
reset_to_swing_pose(robot_id)
for a in imitation:
  (x, x_dot, theta, theta_dot), _, _ = env.step(a)

## Hard coded imitation

In [None]:
hard_coded_imitation = [-0.7] * 30 + [0] * 50 + [0.75] * 30 + [0] * 35 + [-1] * 25 + [0] * 30 + [0.575] * 41 + [0] * 100
reset_to_swing_pose(robot_id)
for a in hard_coded_imitation:
  (x, x_dot, theta, theta_dot), _, _ = env.step(a)


## Perform swing up + balancing

In [None]:
def should_switch_to_balancing(x, x_dot, theta, theta_dot):
  return abs(theta) < 0.5 and abs(theta_dot) < 0.27

reset_to_swing_pose(robot_id)
env.print_dead = False
s = None
for a in hard_coded_imitation:
  (x, x_dot, theta, theta_dot), _, _ = env.step(a)
  if should_switch_to_balancing(x, x_dot, theta, theta_dot):
    s = (x, x_dot, theta, theta_dot)
    print(x, x_dot, theta, theta_dot)
    break

env.reference_zero = p.getJointState(robot_id, pendulum_joint_idx)[0]
print(env.reference_zero)
time.sleep(1)
while True:
  a = np.clip(np.dot(-K, s), -1, 1)
  s, dead, trunc = env.step(a)