In [1]:
import numpy as np
import os
import time
from pydrake.all import (
    AddDefaultVisualization, AddMultibodyPlantSceneGraph, ApplyVisualizationConfig,
    Box, ConstantVectorSource, CoulombFriction, Cylinder, DiagramBuilder,
    FindResourceOrThrow, FixedOffsetFrame, HalfSpace, LinearSpringDamper,
    LoadModelDirectives, LoadModelDirectivesFromString, MeshcatVisualizer,
    MeshcatVisualizerParams, MultibodyPlant, OrientationConstraint, Parser,
    PidController, PiecewisePolynomial, PiecewiseQuaternionSlerp, ProcessModelDirectives,
    Rgba, RigidTransform, RollPitchYaw, RotationMatrix, Simulator, Solve,
    Sphere, SpatialInertia, SpatialVelocity, StartMeshcat, TrajectorySource,
    UnitInertia, VisualizationConfig
)
from pydrake.common import temp_directory
from pydrake.geometry import GeometryId
from pydrake.multibody import inverse_kinematics
from pydrake.systems.analysis import Simulator
from pydrake.visualization import ModelVisualizer

from manipulation import running_as_notebook
from manipulation.meshcat_utils import AddMeshcatTriad, StopButton
from manipulation.scenarios import AddMultibodyTriad
from manipulation.station import LoadScenario, MakeHardwareStation, MakeMultibodyPlant
from manipulation.utils import ConfigureParser, FindResource

import multiprocessing as mp
from tqdm import tqdm

import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import TensorDataset, DataLoader, random_split
import torch.nn.functional as F

import matplotlib.pyplot as plt

In [2]:
meshcat = StartMeshcat()

INFO:drake:Meshcat listening for connections at http://localhost:7010


In [None]:
def add_spring_and_sphere(parser):
    plant = parser.plant()

    X_WG = RigidTransform()
    surface_friction = CoulombFriction(static_friction=.1, dynamic_friction=0.1)
    
    ground_shade = np.array([0.5, 0.5, 0.5, 1.0])
    ground_box = Box(width=10.0, depth=10.0, height=0.01)
    X_WG_visual = RigidTransform([0, 0, -0.005])  
    X_WG = RigidTransform([0, 0, -0.005]) 

    plant.RegisterCollisionGeometry(
        plant.world_body(),
        X_WG,
        HalfSpace(),
        "ground_collision",
        surface_friction)

    plant.RegisterVisualGeometry(
        plant.world_body(),
        X_WG,
        ground_box,
        "ground_visual",
        ground_shade)

    sphere_radius = 0.045
    sphere_mass = .03  
    sphere_inertia = 2.0/5.0 * sphere_mass * sphere_radius**2
    sphere_instance = plant.AddModelInstance("sphere")
    sphere_shape = Sphere(sphere_radius)
    sphere_color = np.array([1.0, 0.0, 0.0, 1.0])  

    sphere_body = plant.AddRigidBody(
        "sphere",
        sphere_instance,
        SpatialInertia(
            mass=sphere_mass,
            p_PScm_E=np.zeros(3),
            G_SP_E=UnitInertia(sphere_inertia, sphere_inertia, sphere_inertia)
        ))
    
    plant.RegisterCollisionGeometry(
        sphere_body,
        RigidTransform(),
        sphere_shape,
        "sphere_collision",
        CoulombFriction(0.7, 0.6))
    
    plant.RegisterVisualGeometry(
        sphere_body,
        RigidTransform(),
        sphere_shape,
        "sphere_visual",
        sphere_color)

    sphere_offset = RigidTransform(
        R=RotationMatrix(),
        p=[0, 0, 1.1]
    )
    plant.SetDefaultFreeBodyPose(sphere_body, sphere_offset)

    # Add springs for bucket
    rest_length = 0.1
    spring_stiffness = 95
    spring_damping = 1
    rotation = RollPitchYaw(0, 0, 0)
    X_WB = RigidTransform(rotation.ToRotationMatrix())
    sling_frame = plant.GetFrameByName("slingshot")
    slingshot_body = plant.GetBodyByName("slingshot")
    X_WS = plant.GetDefaultFreeBodyPose(slingshot_body)

    # Get bucket body for springs
    bucket_body = plant.GetBodyByName("bucket", plant.GetModelInstanceByName("bucket"))

    right_spring = plant.AddForceElement(LinearSpringDamper(
        bodyA=slingshot_body,
        p_AP=np.array([0.20, 0, 1]),
        bodyB=bucket_body,
        p_BQ=[.05, 0, 0.1],
        free_length=rest_length,
        stiffness=spring_stiffness,
        damping=spring_damping))

    left_spring = plant.AddForceElement(LinearSpringDamper(
        bodyA=slingshot_body,
        p_AP=np.array([-0.20, 0, 1]),
        bodyB=bucket_body,
        p_BQ=[-.05, 0, 0.1],
        free_length=rest_length,
        stiffness=spring_stiffness,
        damping=spring_damping))

In [4]:
def define_hardware_station():
    builder = DiagramBuilder()
    scenario = LoadScenario(filename=os.getcwd() + "/sdfs_2/bucket.yaml")
    station = builder.AddSystem(MakeHardwareStation(scenario, meshcat, parser_prefinalize_callback=add_spring_and_sphere, package_xmls=[os.getcwd()+"/package.xml"]))
    plant = station.GetSubsystemByName("plant")
    return builder, station, plant

In [None]:
def calculate_z_rotation(handle_pos, target_pos):
    diff = target_pos - handle_pos
    angle = np.arctan((-diff[0])/diff[1]) 
    return angle

In [6]:
def get_release_pos(handle_pos, target_pos, plane_angle):
    # project points to y-z plane

    rotation_matrix = np.array([
        [np.cos(-plane_angle), -np.sin(-plane_angle), 0],
        [np.sin(-plane_angle), np.cos(-plane_angle), 0],
        [0, 0, 1]
    ])

    rot_initial = rotation_matrix @ handle_pos
    rot_target = rotation_matrix @ target_pos

    dy = rot_target[1] - rot_initial[1]
    dz = rot_target[2] - rot_initial[2]

    k = 2 * 88
    pull_angle = np.pi/4
    mass = .095
    pull_dist = 50*(dy*mass*(9.8)) / (dy*k*np.sin(pull_angle) - dz*k*np.cos(pull_angle))

    rot_initial -= np.array([0, pull_dist*np.cos(pull_angle), pull_dist*np.sin(pull_angle)])

    return rotation_matrix.T @ rot_initial

In [7]:
class net(nn.Module):
    def __init__(self):
        super(net, self).__init__()
        self.layers = nn.Sequential(
            nn.Linear(3, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Linear(128, 2)
        )

    def forward(self, x):
      return self.layers(x)


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# device = 'mps'
model_path = "model.pth"

model = net().to(device)
model.load_state_dict(torch.load(model_path))

def get_release_pos_learned(target_pos, num_attempts=100):
    model.eval()
    
    target = torch.FloatTensor(target_pos[:2]).to(device)
    best_params = None
    best_loss = float('inf')

    for attempt in range(num_attempts):
        params = torch.rand(3, device=device, requires_grad=True)
        optimizer = torch.optim.Adam([params], lr=0.01)

        for i in range(100):
            optimizer.zero_grad()

            pred_xy = model(params.unsqueeze(0))
            loss = F.mse_loss(pred_xy.squeeze(), target)

            loss.backward()
            optimizer.step()

            current_loss = loss.item()
            if current_loss < best_loss:
                best_loss = current_loss
                best_params = params.detach().clone()

            if current_loss < 1e-4:  
                break

    return best_params.cpu().numpy()

In [None]:
import numpy as np
from sklearn.linear_model import LinearRegression

data = np.load('slingshot_training_data_10k_3d.npz')
X = data['inputs']  
y = data['outputs'] 

from sklearn.linear_model import LinearRegression
model_lin_reg = LinearRegression(fit_intercept=True, copy_X=True, n_jobs=None, positive=False)
model_lin_reg.fit(X, y)

def get_launch_pos_linreg(target_pos, model, learning_rate=0.01, num_iterations=1000, num_attempts=10):

    target_pos = np.array(target_pos)
    best_launch_pos = None
    best_error = float('inf')
    
    bounds = {
        'x': [-0.1, 0.1],
        'y': [-0.5, 0],
        'z': [0.27, 0.77]
    }
    
    def clip_to_bounds(pos):
        return np.array([
            np.clip(pos[0], bounds['x'][0], bounds['x'][1]),
            np.clip(pos[1], bounds['y'][0], bounds['y'][1]),
            np.clip(pos[2], bounds['z'][0], bounds['z'][1])
        ])
    
    for attempt in range(num_attempts):
        launch_pos = np.array([
            np.random.uniform(bounds['x'][0], bounds['x'][1]),
            np.random.uniform(bounds['y'][0], bounds['y'][1]),
            np.random.uniform(bounds['z'][0], bounds['z'][1])
        ])
        
        for i in range(num_iterations):
            coefficients = model.coef_ 
            
            predicted_pos = model.predict(launch_pos.reshape(1, -1))[0]
            
            error = np.linalg.norm(predicted_pos - target_pos)
            
            if error < best_error:
                best_error = error
                best_launch_pos = launch_pos.copy()
                
            if error < 0.01:  
                break
                
            gradient = 2 * (predicted_pos - target_pos) @ coefficients
            
            launch_pos = launch_pos - learning_rate * gradient
            
            launch_pos = clip_to_bounds(launch_pos)
            
    return best_launch_pos, best_error

target = np.array([0, 3, 0])  
launch_pos, error = get_launch_pos_linreg(target, model_lin_reg)
print(f"Best launch position: {launch_pos}")
print(f"Expected error: {error:.3f} meters")

Best launch position: [-0.1        -0.40290933  0.45125576]
Expected error: 0.064 meters


In [9]:
def CreateIiwaControllerPlant():
    robot_sdf_path = (
        "sdfs_2/iiwa7_no_collision.sdf"
    )
    gripper_sdf_path = (
        "package://manipulation/hydro/schunk_wsg_50_with_tip.sdf"
    )
    plant_robot = MultibodyPlant(0.001)
    parser = Parser(plant=plant_robot)
    ConfigureParser(parser)
    parser.AddModels(robot_sdf_path)
    parser.AddModelsFromUrl(gripper_sdf_path)
    
    plant_robot.WeldFrames(
    frame_on_parent_F=plant_robot.world_frame(),
    frame_on_child_M=plant_robot.GetFrameByName("iiwa_link_0"),
    X_FM=RigidTransform.Identity().multiply(
        RigidTransform(RotationMatrix.MakeZRotation(-np.pi/2), 
                      np.array([0, -0.45, 0]))
        )
    )
    
    plant_robot.WeldFrames(
        frame_on_parent_F=plant_robot.GetFrameByName("iiwa_link_7"),
        frame_on_child_M=plant_robot.GetFrameByName("body"),
        X_FM=RigidTransform(
            RollPitchYaw(np.pi/2, 0, np.pi/2), np.array([0, 0, 0.09])
        )
    )
    
    plant_robot.mutable_gravity_field().set_gravity_vector([0, 0, 0])
    plant_robot.Finalize()

    return plant_robot, []

In [10]:
def create_q_knots(pose_lst):
    q_knots = []
    plant, _ = CreateIiwaControllerPlant()
    world_frame = plant.world_frame()
    gripper_frame = plant.GetFrameByName("body")
    q_nominal = np.array([-1.57, 0.1, 0, -1.2, 0, 1.6, 0, 0, 0]) 

    def AddOrientationConstraint(ik, R_WG, bounds):
        ik.AddOrientationConstraint(
            frameAbar=world_frame,
            R_AbarA=R_WG,
            frameBbar=gripper_frame,
            R_BbarB=RotationMatrix(),
            theta_bound=bounds,
        )

    def AddPositionConstraint(ik, p_WG_lower, p_WG_upper):
        ik.AddPositionConstraint(
            frameA=world_frame,
            frameB=gripper_frame,
            p_BQ=np.zeros(3),
            p_AQ_lower=p_WG_lower,
            p_AQ_upper=p_WG_upper,
        )

    for i in range(len(pose_lst)):
        ik = inverse_kinematics.InverseKinematics(plant)
        q_variables = ik.q()  
        prog = ik.prog()

        prog.AddQuadraticErrorCost(
            np.identity(len(q_variables)), 
            q_nominal, 
            q_variables
        )

        if i == 0:
            prog.SetInitialGuess(q_variables, q_nominal)
        else:
            prog.SetInitialGuess(q_variables, q_knots[i - 1])

        R_WG = pose_lst[i].rotation()
        orientation_bound = .025
        AddOrientationConstraint(ik, R_WG, orientation_bound)

        p_WG = pose_lst[i].translation()

        p_WG_lower = p_WG - np.array([0.0, 0.01, 0.02])
        p_WG_upper = p_WG + np.array([0.0, 0.01, 0.03])

        if i >= 17:
            p_WG_lower = p_WG - np.array([0.0, 0.01, 0.02])
            p_WG_upper = p_WG + np.array([0.0, 0.01, 0.02])

        AddPositionConstraint(ik, p_WG_lower, p_WG_upper)

        result = Solve(prog)

        # if result.is_success():
        #     print(f"Waypoint {i} error: {result.get_optimal_cost()}")
        #     print(f"Position: {p_WG}")
        # else:
        #     print(f"Faulire on waypoint {i}")
        
            
        q_knots.append(result.GetSolution(q_variables))

    return q_knots

In [11]:
def generate_bucket_trajectory(initial_pose, handle_pos, target_pos, method):
    approach_offset = np.array([0, 0, -0.3])
    approach_pos = handle_pos + approach_offset
    R_grip = RotationMatrix.MakeXRotation(np.pi/2) @ RotationMatrix.MakeYRotation(np.pi/2)
    
    times = [0, 5, 12, 15, 18, 20, 25, 30]
    z_rot = 0
    release_pos = []
    print(handle_pos)
    if method == "physics":
        z_rot = calculate_z_rotation(handle_pos, target_pos)

    if method == "physics":
        release_pos = get_release_pos(handle_pos, target_pos, z_rot) - [0, 0, .15]
    elif method == "learned":
        release_pos = get_release_pos_learned(target_pos)
    elif method == "linear-regressor":
        release_pos, _ = get_launch_pos_linreg(target_pos, model_lin_reg)
    
    rotations = [
        initial_pose.rotation(),
        R_grip,
        R_grip,
        R_grip,
        R_grip,
        R_grip @ RotationMatrix.MakeYRotation(z_rot),
        R_grip @ RotationMatrix.MakeYRotation(z_rot),
        R_grip @ RotationMatrix.MakeYRotation(z_rot)
    ]

    positions = [
        initial_pose.translation(),
        initial_pose.translation(),
        approach_pos,
        approach_pos + np.array([0, 0, .13]),
        approach_pos + np.array([0, 0, .13]),
        approach_pos + np.array([0, 0, .13]),
        release_pos,
        release_pos
    ]

    orientation_traj = PiecewiseQuaternionSlerp(times, rotations)
    position_knots = np.array(positions).T
    position_traj = PiecewisePolynomial.CubicShapePreserving(times, position_knots)
    
    N = 20
    t_lst = np.linspace(0, times[-1], N)
    pose_lst = []
    
    for t in t_lst:
        R_matrix = orientation_traj.orientation(t).rotation()
        p = position_traj.value(t).flatten()
        pose = RigidTransform(RotationMatrix(R_matrix), p)
        AddMeshcatTriad(meshcat, path=str(t), X_PT=pose, opacity=0.2)
        pose_lst.append(pose)
    
    q_knots = np.array(create_q_knots(pose_lst))
    N = 20
    gripper_knots = np.zeros((N, 1)) 
    
    for i in range(N):
        t = t_lst[i]
        if t <= 14:  
            gripper_knots[i] = 0.08
        elif t <= 17:  
            gripper_knots[i] = 0.08 - (0.06 * (t - 15) / 3)
        elif t <= 25:  
            gripper_knots[i] = .0
        else:
            gripper_knots[i] = .08
            
    q_traj = PiecewisePolynomial.CubicShapePreserving(t_lst, q_knots.T[:7, :])
    g_traj = PiecewisePolynomial.CubicShapePreserving(t_lst, gripper_knots.T)

    return q_traj, g_traj

In [12]:
def BuildAndSimulateTrajectory(method, target_pos):
    builder, station, plant = define_hardware_station()
    scene_graph = station.GetSubsystemByName("scene_graph")

    AddMultibodyTriad(plant.GetFrameByName("body"), scene_graph)
    AddMultibodyTriad(plant.GetFrameByName("bucket_handle"), scene_graph)

    station_context = station.CreateDefaultContext()
    plant_context = plant.GetMyMutableContextFromRoot(station_context)

    gripper_frame = plant.GetFrameByName("body", plant.GetModelInstanceByName("wsg"))
    initial_pose = plant.CalcRelativeTransform(
        plant_context,
        plant.world_frame(),
        gripper_frame
    )

    handle_frame = plant.GetFrameByName("bucket_handle", plant.GetModelInstanceByName("bucket"))
    X_WHandle = plant.CalcRelativeTransform(
        plant_context,
        plant.world_frame(),
        handle_frame
    )
    
    bucket_instance = plant.GetModelInstanceByName("bucket")
    bucket_body = plant.GetBodyByName("bucket", bucket_instance)
    X_WB = plant.CalcRelativeTransform(
        plant_context,
        plant.world_frame(),
        bucket_body.body_frame()
    )

    AddMeshcatTriad(meshcat, "bucket_frame", X_PT= X_WB)
    AddMeshcatTriad(meshcat, "handle_frame", X_PT=X_WHandle)

    # pig_body = plant.GetBodyByName("pig_body", plant.GetModelInstanceByName("target_pig"))
    # pose = RigidTransform(
    #     RollPitchYaw(0, 0, -np.pi/2),
    #     target_pos                  
    # )
    # plant.SetDefaultFreeBodyPose(pig_body, pose)

    # X_WP = plant.GetFreeBodyPose(plant_context, pig_body)

    q_traj, g_traj = generate_bucket_trajectory(initial_pose, X_WHandle.translation(), target_pos, method)
    
    q_traj_system = builder.AddSystem(TrajectorySource(q_traj))
    g_traj_system = builder.AddSystem(TrajectorySource(g_traj))

    builder.Connect(
        q_traj_system.get_output_port(), station.GetInputPort("iiwa.position")
    )
    builder.Connect(
        g_traj_system.get_output_port(), station.GetInputPort("wsg.position")
    )

    force_limit_source = builder.AddSystem(ConstantVectorSource([320.0]))

    builder.Connect(
        force_limit_source.get_output_port(),
        station.GetInputPort("wsg.force_limit")
    )

    diagram = builder.Build()
    
    simulator = Simulator(diagram)

    meshcat.StartRecording(set_visualizations_while_recording=True)

    # simulator.AdvanceTo(45)

    min_distance = float('inf')
    pose = [];
    sphere_body = plant.GetBodyByName("sphere")
    # pig_body = plant.GetBodyByName("pig_body", plant.GetModelInstanceByName("target_pig"))

    dt = 0.1
    end_time = 45

    for t in np.arange(0, end_time, dt):
        simulator.AdvanceTo(t)
        
        context = simulator.get_context()
        plant_context = plant.GetMyContextFromRoot(context)
        
        sphere_pose = plant.GetFreeBodyPose(plant_context, sphere_body)
        # pig_pose = plant.GetFreeBodyPose(plant_context, pig_body)

        if sphere_pose.translation()[2] <= .05:
            print(f"Sphere hit ground at time {t:.1f}s")
            print(sphere_pose.translation())
            pose = sphere_pose.translation()
            break
        
        distance = np.linalg.norm(sphere_pose.translation() - target_pos)
        min_distance = min(min_distance, distance)

    meshcat.PublishRecording()

    return simulator, plant, min_distance, sphere_pose

In [14]:
sim, plant, dist, _ = BuildAndSimulateTrajectory("physics", [-1, 2, 0])
print(dist)



[0.   0.   0.94]
Sphere hit ground at time 25.3s
[-0.03764602  2.90018637  0.03457529]
1.2028521486938188
