In [None]:
from magpie_control import ur5  
import numpy as np
import time

In [None]:
robot = ur5.UR5_Interface(provide_ft_sensor=True)
robot.start()

In [None]:
robot.start_ft_sensor(ip_address="192.168.0.5", poll_rate=100)
robot.debug = False
print(f"{robot.get_ft_data()=}")
force = np.array([0, 1, -5])
goal_delta = np.array([0, 0, 0.09])
wrench_goal = np.hstack((force, np.zeros(3)))
init_cmd = np.hstack((force / -300, np.zeros(3)))
duration = 4
max_force = 5.0 # maximum force in Newtons
P = 0.0005 # stiffness gain, default is 0.0005 <--play with this for compliance sensitivity
robot.force_position_control(wrench=wrench_goal, init_cmd=init_cmd, goal_delta=goal_delta, 
                             duration=duration, max_force=max_force, p=P, tolerance=0.015)


In [None]:
robot.stop()

In [None]:
pose_peg_prep = np.array([
    [-0.999, -0.019, -0.041, -0.448],
    [ 0.026,  0.499, -0.866, -0.473],
    [ 0.037, -0.867, -0.498,  0.243],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)

pose_peg_go = np.array([
    [-0.999, -0.019, -0.041, -0.448],
    [ 0.026,  0.499, -0.866, -0.493],
    [ 0.037, -0.867, -0.498,  0.243],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)

pose_peg_n = np.array([
    [-0.999, -0.019, -0.041, -0.448],
    [ 0.026,  0.499, -0.866, -0.487],
    [ 0.037, -0.867, -0.498,  0.243],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)

pose_peg_lift = np.array([
    [-0.999, -0.019, -0.041, -0.448],
    [ 0.026,  0.499, -0.866, -0.435],
    [ 0.037, -0.867, -0.498,  0.273],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)

pose_peg_put = np.array([
    [-0.999, -0.019, -0.041, -0.448],
    [ 0.026,  0.499, -0.866, -0.487],
    [ 0.037, -0.867, -0.498,  0.243],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)

In [None]:
pose_peg_prep = np.array([
    [-0.999, -0.019, -0.041, -0.448],
    [ 0.026,  0.499, -0.866, -0.463],
    [ 0.037, -0.867, -0.498,  0.243],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)

pose_peg_go = np.array([
    [-0.999, -0.019, -0.041, -0.448],
    [ 0.026,  0.499, -0.866, -0.483],
    [ 0.037, -0.867, -0.498,  0.243],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)

pose_peg_n = np.array([
    [-0.999, -0.019, -0.041, -0.448],
    [ 0.026,  0.499, -0.866, -0.475],
    [ 0.037, -0.867, -0.498,  0.243],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)

pose_peg_lift = np.array([
    [-0.999, -0.019, -0.041, -0.448],
    [ 0.026,  0.499, -0.866, -0.475],
    [ 0.037, -0.867, -0.498,  0.450],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)

pose_peg_put = np.array([
    [-0.999, -0.019, -0.041, -0.448],
    [ 0.026,  0.499, -0.866, -0.450],
    [ 0.037, -0.867, -0.498,  0.270],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)

In [None]:
# Step 2: move back to home
robot.moveL(pose_peg_lift, linSpeed=0.5, linAccel=0.75, asynch=False)

In [None]:
pose_1 = np.array([[-0.582,  0.812, -0.048, -0.249],
          [ 0.813,  0.582, -0.004, -0.395],
          [ 0.025, -0.041, -0.999,  0.263],
          [ 0.   ,  0.   ,  0.   ,  1.   ]])
pose_2 = np.array([[-0.582,  0.812, -0.048, -0.249],
          [ 0.813,  0.582, -0.004, -0.395],
          [ 0.025, -0.041, -0.999,  0.463],
          [ 0.   ,  0.   ,  0.   ,  1.   ]])
pose_3 = np.array([[-0.582,  0.812, -0.048, -0.307],
       [ 0.813,  0.582, -0.004, -0.314],
       [ 0.025, -0.041, -0.999,  0.466],
       [ 0.   ,  0.   ,  0.   ,  1.   ]])

pose_precision_prep = np.array([
    [-0.999,  0.005,  -0.034, -0.431],
    [ 0.033,  0.346,  -0.938, -0.390],
    [ 0.007, -0.938,  -0.346,  0.183],
    [ 0.,     0. ,    0. ,    1.   ]
], dtype=float)

pose_precision_end = np.array([
    [-0.999,  0.005,  -0.034, -0.431],
    [ 0.033,  0.346,  -0.938, -0.390],
    [ 0.007, -0.938,  -0.346,  0.300],
    [ 0.,     0. ,    0. ,    1.   ]
], dtype=float)

pose_precision_shake = np.array([
    [-0.333,  0.943, -0.029, -0.431],
    [ 0.342,  0.091, -0.935, -0.390],
    [ -0.879, -0.321, -0.353,  0.300],
    [ 0.,     0. ,    0. ,    1.   ]
], dtype=float)

# [[-0.333  0.943 -0.029 -0.43 ]
#  [ 0.342  0.091 -0.935 -0.389]
#  [-0.879 -0.321 -0.353  0.3  ]
#  [ 0.     0.     0.     1.   ]]

pose_tripod_prep = np.array([
    [-0.993,  0.091, -0.071, -0.373],
    [ 0.112,  0.621, -0.776, -0.366],
    [-0.026, -0.779, -0.627,  0.275],#close to table 0.246, now higher
    [ 0.,     0.,     0.,     1.   ]
], dtype=float)

pose_tripod_end = np.array([
    [-0.993,  0.091, -0.071, -0.373],
    [ 0.112,  0.621, -0.776, -0.366],
    [-0.026, -0.779, -0.627,  0.400],
    [ 0.,     0.,     0.,     1.   ]
], dtype=float)

pose_tripod_shake = np.array([
    [-0.286,  0.956, -0.065, -0.373],
    [ 0.62,  0.133, -0.773, -0.366],
    [-0.73, -0.262, -0.631,  0.400],
    [ 0.,     0.,     0.,     1.   ]
], dtype=float)
# [[-0.286  0.956 -0.065 -0.376]
#  [ 0.62   0.133 -0.773 -0.367]
#  [-0.73  -0.262 -0.631  0.395]
#  [ 0.     0.     0.     1.   ]]

In [None]:
# Step 0: move to home
# robot.moveL(pose_precision_prep)
# robot.moveL(pose_tripod_prep)

In [None]:
# Step 1: move to grasp
# robot.moveL(pose_precision_end)
# robot.moveL(pose_tripod_end)

In [None]:
# Step 3: shake back and forth
for _ in range(2):
    robot.moveL(pose_tripod_end, linSpeed=1, linAccel=5, asynch=False)
    robot.moveL(pose_tripod_shake, linSpeed=1, linAccel=5, asynch=False)

In [None]:
pose = robot.get_tcp_pose()
# robot.moveL_delta([0.1, 0.0, 0], frame='wrist')
# robot.toggle_teach_mode()
# robot.moveL(pose_1)
print(pose)

In [None]:
pose = robot.get_tcp_pose()
print(pose)

In [1]:
import time

import csv
import threading
import numpy as np
from magpie_control import ur5

robot = ur5.UR5_Interface(provide_ft_sensor=True)
robot.start()
pose_peg_lift = np.array([
    [-0.999, -0.019, -0.041, -0.450],
    [ 0.026,  0.499, -0.866, -0.445],
    [ 0.037, -0.867, -0.498,  0.290],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)
robot.moveL(pose_peg_lift, linSpeed=0.5, linAccel=0.75)

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [1]:
import time
import csv
import threading
import numpy as np
from magpie_control import ur5

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [2]:


# --- 1. Setup & Connection ---
# Make sure your poses (pose_peg_prep, etc.) are defined in a previous cell!
ROBOT_IP = "192.168.0.5" 
OUTPUT_CSV = f"ur5_task_data_{int(time.time())}.csv"

try:
    # Check if robot is already connected to avoid re-initialization errors
    if 'robot' not in locals():
        robot = ur5.UR5_Interface()
        robot.start()
        robot.start_ft_sensor(ip_address=ROBOT_IP, poll_rate=100)
        print("UR5 Connected & F/T Sensor Started.")
    else:
        print("Using existing robot connection.")
except Exception as e:
    print(f"Connection Error: {e}")

# --- 2. Background Recording Setup ---
data_log = []
recording_active = False
pose_peg_prep = np.array([
    [-0.999, -0.019, -0.041, -0.450],
    [ 0.026,  0.499, -0.866, -0.443],
    [ 0.037, -0.867, -0.498,  0.243],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)

pose_peg_go = np.array([
    [-0.999, -0.019, -0.041, -0.450],
    [ 0.026,  0.499, -0.866, -0.485],
    [ 0.037, -0.867, -0.498,  0.243],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)

pose_peg_n = np.array([
    [-0.999, -0.019, -0.041, -0.450],
    [ 0.026,  0.499, -0.866, -0.480],
    [ 0.037, -0.867, -0.498,  0.243],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)

pose_peg_lift = np.array([
    [-0.999, -0.019, -0.041, -0.450],
    [ 0.026,  0.499, -0.866, -0.480],
    [ 0.037, -0.867, -0.498,  0.290],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)

pose_peg_put = np.array([
    [-0.999, -0.019, -0.041, -0.450],
    [ 0.026,  0.499, -0.866, -0.450],
    [ 0.037, -0.867, -0.498,  0.270],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)
def background_recorder():
    """Runs in the background to record data while the robot moves."""
    global data_log, recording_active
    print("Background recording started...")
    
    while recording_active:
        # Capture precise Wall-Clock Time (Unix Epoch) for sync
        loop_rate = 0.01 # Hz
        timestamp = time.time()
        
        # Capture Force
        wrist_wrench = robot.get_ft_data()
        if wrist_wrench and len(wrist_wrench) >= 3:
            f_norm = np.linalg.norm(wrist_wrench[:3])
        else:
            f_norm = 0.0
            
        data_log.append([timestamp, f_norm])
        # time.sleep(0.01) # Sample at ~100Hz
        elapsed = time.time() - timestamp
        remaining = loop_rate - elapsed
        if remaining > 0:
            time.sleep(remaining)

# --- 3. Execution Sequence ---
try:
    # A. Start Recording Thread
    recording_active = True
    recorder_thread = threading.Thread(target=background_recorder)
    recorder_thread.start()
    
    # B. Perform Robot Movements (Blocking)
    print("Executing movement sequence...")
    
    print("Moving to Prep...")
    robot.moveL(pose_peg_prep, linSpeed=0.5, linAccel=0.75, asynch=False)
    
    print("Moving to Go...")
    robot.moveL(pose_peg_go, linSpeed=0.5, linAccel=0.75, asynch=False)
    robot.moveL(pose_peg_n, linSpeed=0.5, linAccel=0.75, asynch=False)
    
    print("Pausing 8s for grip...")
    time.sleep(10) # Data recording continues during this pause
    
    print("Moving to N...")
    # 
    
    print("Lifting...")
    robot.moveL(pose_peg_lift, linSpeed=0.5, linAccel=0.75, asynch=False)
    
    print("Prep putting...")
    robot.moveL(pose_peg_put, linSpeed=0.5, linAccel=0.75, asynch=False)

    force = np.array([0, 1, -3.75])
    goal_delta = np.array([0, 0, 0.09])
    wrench_goal = np.hstack((force, np.zeros(3)))
    init_cmd = np.hstack((force / -300, np.zeros(3)))
    duration = 5
    max_force = 5.0 # maximum force in Newtons
    P = 0.0005 # stiffness gain, default is 0.0005 <--play with this for compliance sensitivity
    robot.force_position_control(wrench=wrench_goal, init_cmd=init_cmd, goal_delta=goal_delta, 
                                duration=duration, max_force=max_force, p=P, tolerance=0.015)
    print("Sequence Complete.")
    robot.stop()
    robot.moveL(pose_peg_lift, linSpeed=0.5, linAccel=0.75) # Move back up after putting

finally:
    # C. Stop Recording
    recording_active = False
    if 'recorder_thread' in locals():
        recorder_thread.join() # Wait for thread to finish cleanly
    
    # D. Save Data
    if len(data_log) > 0:
        print(f"Saving {len(data_log)} samples to {OUTPUT_CSV}...")
        with open(OUTPUT_CSV, mode='w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(["timestamp_epoch", "wrist_force_N"])
            writer.writerows(data_log)
        print("File saved successfully.")
    else:
        print("Warning: No data recorded.")

UR5 Connected & F/T Sensor Started.
Background recording started...
Executing movement sequence...
Moving to Prep...
Moving to Go...
Pausing 8s for grip...
Moving to N...
Lifting...
Prep putting...
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from OptoForce!
Error reading from Opt

RTDEControlInterface: RTDE control script is not running!
RTDEControlInterface: RTDE control script is not running!
RTDEControlInterface: RTDE control script is not running!
Exception in thread Thread-4 (background_recorder):
Traceback (most recent call last):
  File "/home/will/miniconda3/envs/octo/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/home/will/.local/lib/python3.10/site-packages/ipykernel/ipkernel.py", line 761, in run_closure
    _threading_Thread_run(self)
  File "/home/will/miniconda3/envs/octo/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_447139/2392740882.py", line 66, in background_recorder
  File "/home/will/workspace/magpie_control/src/magpie_control/ur5.py", line 152, in get_ft_data
    return self.ft_sensor.recv_datum()
  File "/home/will/workspace/magpie_control/src/magpie_control/ft_sensor.py", line 92, in recv_datum
    data, _ = self.sock_r.recvfrom(RESP

Error reading from OptoForce!
Sequence Complete.
Saving 806 samples to ur5_task_data_1771014622.csv...
File saved successfully.


In [None]:
robot.toggle_teach_mode()

In [None]:
import time

import csv
import threading
import numpy as np
from magpie_control import ur5

robot = ur5.UR5_Interface(provide_ft_sensor=True)
robot.start()
pose_peg_put = np.array([
    [-0.999, -0.019, -0.041, -0.450],
    [ 0.026,  0.499, -0.866, -0.450],
    [ 0.037, -0.867, -0.498,  0.260],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)
pose_peg_put_1 = np.array([
    [-0.999, -0.019, -0.041, -0.450],
    [ 0.026,  0.499, -0.866, -0.500],
    [ 0.037, -0.867, -0.498,  0.260],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)
robot.moveL(pose_peg_put, linSpeed=0.05, linAccel=0.05,asynch=False)
robot.moveL(pose_peg_put_1, linSpeed=0.05, linAccel=0.05,asynch=False)
time.sleep(3)
robot.moveL(pose_peg_put, linSpeed=0.05, linAccel=0.05,asynch=False)

In [None]:
import time
import csv
import threading
import numpy as np
from magpie_control import ur5

ROBOT_IP = "192.168.0.5" 
OUTPUT_CSV = f"ur5_task_data_{int(time.time())}.csv"

try:
    # Check if robot is already connected to avoid re-initialization errors
    if 'robot' not in locals():
        robot = ur5.UR5_Interface()
        robot.start()
        robot.start_ft_sensor(ip_address=ROBOT_IP, poll_rate=100)
        print("UR5 Connected & F/T Sensor Started.")
    else:
        print("Using existing robot connection.")
except Exception as e:
    print(f"Connection Error: {e}")

# --- 2. Background Recording Setup ---
data_log = []
recording_active = False
pose_peg_prep = np.array([
    [-0.999, -0.019, -0.041, -0.450],
    [ 0.026,  0.499, -0.866, -0.443],
    [ 0.037, -0.867, -0.498,  0.243],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)

pose_peg_go = np.array([
    [-0.999, -0.019, -0.041, -0.450],
    [ 0.026,  0.499, -0.866, -0.485],
    [ 0.037, -0.867, -0.498,  0.243],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)

pose_peg_n = np.array([
    [-0.999, -0.019, -0.041, -0.450],
    [ 0.026,  0.499, -0.866, -0.480],
    [ 0.037, -0.867, -0.498,  0.243],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)

pose_peg_lift = np.array([
    [-0.999, -0.019, -0.041, -0.450],
    [ 0.026,  0.499, -0.866, -0.480],
    [ 0.037, -0.867, -0.498,  0.290],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)

pose_peg_put = np.array([
    [-0.999, -0.019, -0.041, -0.450],
    [ 0.026,  0.499, -0.866, -0.450],
    [ 0.037, -0.867, -0.498,  0.270],
    [ 0.    , 0.    , 0.    , 1.   ]
 ], dtype=float)
def background_recorder():
    """Runs in the background to record data while the robot moves."""
    global data_log, recording_active
    print("Background recording started...")
    
    while recording_active:
        # Capture precise Wall-Clock Time (Unix Epoch) for sync
        loop_rate = 0.01 # Hz
        timestamp = time.time()
        
        # Capture Force
        wrist_wrench = robot.get_ft_data()
        if wrist_wrench and len(wrist_wrench) >= 3:
            f_norm = np.linalg.norm(wrist_wrench[:3])
        else:
            f_norm = 0.0
            
        data_log.append([timestamp, f_norm])
        # time.sleep(0.01) # Sample at ~100Hz
        elapsed = time.time() - timestamp
        remaining = loop_rate - elapsed
        if remaining > 0:
            time.sleep(remaining)

# --- 3. Execution Sequence ---
try:
    # A. Start Recording Thread
    recording_active = True
    recorder_thread = threading.Thread(target=background_recorder)
    recorder_thread.start()
    
    # B. Perform Robot Movements (Blocking)
    print("Executing movement sequence...")
    
    # print("Moving to Prep...")
    # robot.moveL(pose_peg_prep, linSpeed=0.5, linAccel=0.75, asynch=False)
    
    # print("Moving to Go...")
    # robot.moveL(pose_peg_go, linSpeed=0.5, linAccel=0.75, asynch=False)
    # robot.moveL(pose_peg_n, linSpeed=0.5, linAccel=0.75, asynch=False)
    
    # print("Pausing 8s for grip...")
    # time.sleep(7) # Data recording continues during this pause
    
    # print("Moving to N...")
    # 
    
    # print("Lifting...")
    # robot.moveL(pose_peg_lift, linSpeed=0.5, linAccel=0.75, asynch=False)
    
    # print("Prep putting...")
    # robot.moveL(pose_peg_put, linSpeed=0.5, linAccel=0.75, asynch=False)

    force = np.array([0, 1, -3.75])
    goal_delta = np.array([0, 0, 0.09])
    wrench_goal = np.hstack((force, np.zeros(3)))
    init_cmd = np.hstack((force / -300, np.zeros(3)))
    duration = 4
    max_force = 5.0 # maximum force in Newtons
    P = 0.0005 # stiffness gain, default is 0.0005 <--play with this for compliance sensitivity
    robot.force_position_control(wrench=wrench_goal, init_cmd=init_cmd, goal_delta=goal_delta, 
                                duration=duration, max_force=max_force, p=P, tolerance=0.015)
    print("Sequence Complete.")
    robot.stop()
    robot.moveL(pose_peg_lift, linSpeed=0.5, linAccel=0.75) # Move back up after putting

finally:
    # C. Stop Recording
    recording_active = False
    if 'recorder_thread' in locals():
        recorder_thread.join() # Wait for thread to finish cleanly
    
    # D. Save Data
    if len(data_log) > 0:
        print(f"Saving {len(data_log)} samples to {OUTPUT_CSV}...")
        with open(OUTPUT_CSV, mode='w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(["timestamp_epoch", "wrist_force_N"])
            writer.writerows(data_log)
        print("File saved successfully.")
    else:
        print("Warning: No data recorded.")