# Ping Pong Catching

### Spatial Projectile Matching with SNOPT

_Final Project for 6.843, Fall 2021_

_Viraj Parimi, Cameron Pittman_

In [None]:
import time
import pydot
import numpy as np
from manipulation.meshcat_cpp_utils import (
    StartMeshcat
)
from pydrake.systems.meshcat_visualizer import MeshcatVisualizer
from meshcat.servers.zmqserver import start_zmq_server_as_subprocess

from pydrake.all import (
    BasicVector, DiagramBuilder, Integrator, JacobianWrtVariable, LeafSystem, MeshcatVisualizerCpp,
    MeshcatVisualizerParams, Simulator, MathematicalProgram, SnoptSolver, Solve
)

from iiwa_paddle_station import MakeIiwaPaddleStation
proc, zmq_url, web_url = start_zmq_server_as_subprocess(server_args=[])

In [None]:
meshcat = StartMeshcat()

### Define Controller

In [None]:
class DifferentialIKSystem(LeafSystem):
    """
        Wrapper system for Differential IK. 

        Args:
            plant (MultibodyPlant): multibody plant object of the simulated plant. 
            diffik_fun (function): function object that handles diffik. Must have the signature 
                                   diffik_fun(J_G, V_G_desired).
    """
    
    def __init__(self, plant, diffik_fun):

        LeafSystem.__init__(self)

        self._plant = plant
        self._W = plant.world_frame()
        self._diffik_fun = diffik_fun
        self._plant_context = plant.CreateDefaultContext()
        self._iiwa = plant.GetModelInstanceByName("iiwa7")
        self._G = plant.GetBodyByName("base_link").body_frame()

        # Inputs
        self.DeclareVectorInputPort("spatial_velocity", BasicVector(6))
        self.DeclareVectorInputPort("iiwa_position_measured", BasicVector(7))
        self.DeclareVectorInputPort("iiwa_velocity_measured", BasicVector(7))

        # Outputs
        self.DeclareVectorOutputPort("iiwa_velocity_command", BasicVector(7), self.CalcOutput)
        
    def CalcOutput(self, context, output):

        V_G_desired = self.get_input_port(0).Eval(context)
        
        q_now = self.get_input_port(1).Eval(context)
        v_now = self.get_input_port(2).Eval(context)
        self._plant.SetPositions(self._plant_context, self._iiwa, q_now)

        J_G = self._plant.CalcJacobianSpatialVelocity(
            self._plant_context, 
            JacobianWrtVariable.kQDot, 
            self._G, 
            [0, 0, 0],
            self._W,
            self._W
        )
        J_G = J_G[:, 0:7]
        
        v = self._diffik_fun(J_G, V_G_desired) 
        output.SetFromVector(v)

### Define the Differential Inverse Kinematics Mathematical Program

In [None]:
def DiffIKQP(J_G, V_G_desired):

    prog = MathematicalProgram()
    v = prog.NewContinuousVariables(7, 'v')
    v_max = 3.0

    obj = J_G.dot(v) - V_G_desired
    prog.AddCost(obj.dot(obj))
    prog.AddBoundingBoxConstraint(-v_max * np.ones(7), v_max * np.ones(7), v)
    
    solver = SnoptSolver()
    result = solver.Solve(prog)

    if not (result.is_success()):
        raise ValueError("Could not find the optimal solution.")

    v_solution = result.GetSolution(v)
    return v_solution

### Constants

In [None]:
iiwa_q0 = np.array([-1.57, 0, 0, -np.pi/2, 0, 0, 0])
paddle_state_pos = np.array([0.07659419, -0.75730758, 0.63302061])

g = -9.81
height = 1
offset = 0.5
CATCH_HEIGHT = 0.55
simulation_time = 3
ball_radius = 0.035
max_angle = np.pi / 6
paddle_radius = 0.175
e_z = np.array([0, 0, 1], dtype = float)

### Define Initial States

In [None]:
# Experiment 1 - drop on top of the paddle
ball_q0_exp1 = [1, 1, 1, 1, paddle_state_pos[0], paddle_state_pos[1], paddle_state_pos[2] + height, 0, 0, 0, 0, 0, 0]

# Experiment 2 - drop from (+ve x) offset with (-ve x) velocity
ball_q0_exp2 = [1, 1, 1, 1, paddle_state_pos[0] + offset, paddle_state_pos[1], paddle_state_pos[2] + height, 0, 0, 0, -1, 0, 0]

# Experiment 3 - drop from (-ve x) offset with (+ve x) velocity
ball_q0_exp3 = [1, 1, 1, 1, paddle_state_pos[0] - offset, paddle_state_pos[1], paddle_state_pos[2] + height, 0, 0, 0, 1, 0, 0]

# Experiment 4 - drop from (-ve y) offset with (+ve y) velocity
ball_q0_exp4 = [1, 1, 1, 1, paddle_state_pos[0], paddle_state_pos[1] - offset, paddle_state_pos[2] + height, 0, 0, 0, 0, 1, 0]

# Experiment 5 - drop from (+ve y) offset with (-ve y) velocity
ball_q0_exp5 = [1, 1, 1, 1, paddle_state_pos[0], paddle_state_pos[1] + offset, paddle_state_pos[2] + height, 0, 0, 0, 0, -1, 0]

# Experiment 6 - drop from (-ve x, -ve y) offset with (+ve x, +ve y) velocity
ball_q0_exp6 = [1, 1, 1, 1, paddle_state_pos[0] - offset, paddle_state_pos[1] - offset, paddle_state_pos[2] + height, 0, 0, 0, 1, 1, 0]

# Experiment 7 - drop from (-ve x, +ve y) offset with (+ve x, -ve y) velocity
ball_q0_exp7 = [1, 1, 1, 1, paddle_state_pos[0] - offset, paddle_state_pos[1] + offset, paddle_state_pos[2] + height, 0, 0, 1, -1, 0, 0]

# Experiment 8 - drop from (+ve x, -ve y) offset with (-ve x, +ve y) velocity
ball_q0_exp8 = [1, 1, 1, 1, paddle_state_pos[0] + offset, paddle_state_pos[1] - offset, paddle_state_pos[2] + height, 0, 0, 0, -1, 1, 0]

# Experiment 9 - drop from (+ve x, +ve y) offset with (-ve x, -ve y) velocity
ball_q0_exp9 = [1, 1, 1, 1, paddle_state_pos[0] + offset, paddle_state_pos[1] + offset, paddle_state_pos[2] + height, 0, 0, 0, -1, -1, 0]

scenarios = [ball_q0_exp1, ball_q0_exp2, ball_q0_exp3, ball_q0_exp4, ball_q0_exp5, ball_q0_exp6, ball_q0_exp7, ball_q0_exp8, ball_q0_exp9]

### Utility Functions

In [None]:
def ball_p_from_state(pose):
    """
    Get x, y, z from ball_state

    Params:
        BasicVector(13)
    Returns:
        array[3]
    """
    return pose[4:7]


def ball_v_from_state(pose):
    """
    Get translational velocity from ball_state

    Params:
        BasicVector(13)
    Returns:
        array[3]
    """
    return pose[-3:]


def get_mode(p_ball, v_ball, p_paddle, atol = 1e-03):
    """
    Determine current paddle mode

    Output is one of: "rising", "falling", "contact"
    """
    b_z = p_ball[2]
    vz = v_ball[2]
    p_z = p_paddle[2]

    if np.isclose(b_z, p_z, atol=atol):
        return "contact"
    else:
        return "rising" if vz >= 0 else "falling"

### Define the Catcher class

In [None]:
class Catcher(LeafSystem):
    """
    Catcher class that computes the trajectory of ball and outputs desired velocity estimates of the
    paddle to reach the ball at a fixed height. This approach does not compute the desired angular 
    velocity estimates of the paddle.
    """

    def __init__(self, plant, exp_idx):

        LeafSystem.__init__(self)
        
        self.plant = plant
        self.t_catch = None
        self.made_contact = False
        self.W = plant.world_frame()
        self.P = plant.GetBodyByName("base_link")
        self.iiwa = plant.GetModelInstanceByName("iiwa7")
        self.plant_context = plant.CreateDefaultContext()
        self.p_b_0 = np.array(ball_p_from_state(scenarios[exp_idx]))
        self.v_b_0 = np.array(ball_v_from_state(scenarios[exp_idx]))

        # Inputs
        self.DeclareVectorInputPort("ball_state", BasicVector(13))
        self.DeclareVectorInputPort("iiwa_pos_measured", BasicVector(7))
        self.DeclareVectorInputPort("iiwa_velocity_estimated", BasicVector(7))
        
        # Outputs
        self.DeclareVectorOutputPort("paddle_desired_spatial_velocity", BasicVector(6), self.CalcDesSpatialVelOutput)

    def compute_time_of_impact(self, p_bz, v_bz, catch_height):
        g_pos = abs(g)
        return (1. / g_pos) * (v_bz + np.sqrt(v_bz**2 + 2 * g_pos * (p_bz - catch_height)))

    def CalcDesSpatialVelOutput(self, context, output):

        q = self.GetInputPort("iiwa_pos_measured").Eval(context)
        v = self.GetInputPort("iiwa_velocity_estimated").Eval(context)
        self.plant.SetPositionsAndVelocities(self.plant_context, self.iiwa, np.hstack([q, v]))
        
        state_Ball = np.array(self.GetInputPort("ball_state").Eval(context))
        p_Ball = ball_p_from_state(state_Ball)
        v_Ball = ball_v_from_state(state_Ball)

        p_Paddle = self.plant.EvalBodyPoseInWorld(self.plant_context, self.P).translation()
        w_Paddle = self.plant.EvalBodySpatialVelocityInWorld(self.plant_context, self.P).rotational()
        v_Paddle = self.plant.EvalBodySpatialVelocityInWorld(self.plant_context, self.P).translational()

        mode = get_mode(p_Ball, v_Ball, p_Paddle, atol=1e-01)

        if not self.made_contact and mode == "contact":
            self.made_contact = True        

        gain = np.array([1., 1., 1.])
        
        # Useful when trying to match the positions of the ball and paddle
        dist_ball_and_paddle = p_Ball - p_Paddle - np.array([0, 0, ball_radius])

        [x_offset_percent, y_offset_percent, z_offset_percent] = (dist_ball_and_paddle) / paddle_radius

        w_p_x = y_offset_percent * max_angle
        w_p_y = x_offset_percent * max_angle * -1
        w_p_z = 0

        w_P_desired = np.array([w_p_x, w_p_y, w_p_z] - w_Paddle) * gain
        # w_P_desired = np.array([w_p_x, w_p_y, w_p_z]) * gain

        if not self.made_contact:
            
            # Try to beat the ball to the catch point
            if self.t_catch is None:
                self.t_catch = self.compute_time_of_impact(p_Ball[2], v_Ball[2], CATCH_HEIGHT)

            # Tell the iiwa to get to the location of the catch a split second early
            t_pre_catch = self.t_catch * 0.75

            # Assume ball is only acting under gravity
            a_b = np.array([0, 0, g])

            p_catch = self.p_b_0 + self.v_b_0 * self.t_catch + (a_b / 2) * self.t_catch**2
            v_b_catch = self.v_b_0 + a_b * self.t_catch

            v_p_0 = (p_catch - paddle_state_pos - 0.5 * v_b_catch * t_pre_catch) / (0.5 * t_pre_catch)
            a_p = (v_b_catch - v_p_0) / t_pre_catch

            v_P_desired = v_p_0 + a_p * context.get_time()

        else:

            # Ball should be moving slowly so switch to PD behavior after the intial contact
            kwp = 3
            kwd = 7
            w_P_desired = kwp * w_P_desired + kwd * (v_Ball - v_Paddle)

            kvp = 3
            kvd = 0.1
            v_P_desired = kvp * (dist_ball_and_paddle) + kvd * (v_Ball - v_Paddle)
 
        V_P = np.concatenate([w_P_desired, v_P_desired])
        output.SetFromVector(V_P)


### Experimental Setup

In [None]:
class Demo:
    def __init__(self, time_step=0.001, exp_idx = 0):
        """
        Robotic Kuka iiwa with paddle end effector

        Args:
            time_step (float, optional): time step for internal manipulation station controller. Defaults to 0.001.
            exp_idx (int, optional): experiment index to run. Defaults to 0.
        """
        
        self.time = 0
        self.metric = 0
        
        diagram, plant, scene_graph = MakeIiwaPaddleStation(time_step=time_step)
        
        self.plant = plant
        self.builder = DiagramBuilder()
        self.station = self.builder.AddSystem(diagram)
        
        self.visualizer = MeshcatVisualizerCpp.AddToBuilder(
            self.builder,
            diagram.GetOutputPort("geometry_query"),
            meshcat,
            MeshcatVisualizerParams(delete_prefix_initialization_event=False)
        )

        self.plant.mutable_gravity_field().set_gravity_vector([0, 0, g])

        diff_ik_sys = self.builder.AddSystem(DifferentialIKSystem(self.plant, DiffIKQP))
        diff_ik_sys.set_name("diff_ik_sys")
        catcher = self.builder.AddSystem(Catcher(self.plant, exp_idx))
        catcher.set_name("catcher")
        integrator = self.builder.AddSystem(Integrator(7))
        integrator.set_name("integrator")
        
        self.builder.Connect(integrator.get_output_port(), self.station.GetInputPort("iiwa_position"))
        self.builder.Connect(self.station.GetOutputPort("ball_state"), catcher.GetInputPort("ball_state"))
        self.builder.Connect(diff_ik_sys.GetOutputPort("iiwa_velocity_command"), integrator.get_input_port())
        self.builder.Connect(self.station.GetOutputPort("iiwa_position_measured"), catcher.GetInputPort("iiwa_pos_measured"))
        self.builder.Connect(catcher.GetOutputPort("paddle_desired_spatial_velocity"), diff_ik_sys.GetInputPort("spatial_velocity"))
        self.builder.Connect(self.station.GetOutputPort("iiwa_velocity_estimated"), catcher.GetInputPort("iiwa_velocity_estimated"))
        self.builder.Connect(self.station.GetOutputPort("iiwa_position_measured"), diff_ik_sys.GetInputPort("iiwa_position_measured"))
        self.builder.Connect(self.station.GetOutputPort("iiwa_velocity_estimated"), diff_ik_sys.GetInputPort("iiwa_velocity_measured"))
        
        self.diagram = self.builder.Build()
        self.simulator = Simulator(self.diagram)
        self.simulator.set_target_realtime_rate(1.0)

        self.context = self.simulator.get_context()
        self.station_context = self.station.GetMyContextFromRoot(self.context)
        self.plant_context = self.plant.GetMyContextFromRoot(self.context)

        self.plant.SetPositions(self.plant_context, self.plant.GetModelInstanceByName("iiwa7"), np.array(iiwa_q0))
        self.plant.SetPositionsAndVelocities(self.plant_context, self.plant.GetModelInstanceByName("ball"), np.array(scenarios[exp_idx]))

        self.station.GetInputPort("iiwa_feedforward_torque").FixValue(self.station_context, np.zeros((7,1)))
        iiwa_q = self.plant.GetPositions(self.plant_context, self.plant.GetModelInstanceByName("iiwa7"))
        integrator.GetMyContextFromRoot(self.context).get_mutable_continuous_state_vector().SetFromVector(iiwa_q)

    def step(self, simulate=True, duration=0.1, final=True):
        """
        Step the closed loop system

        Args:
            simulate (bool, optional): whether or not to visualize the command. Defaults to True.
            duration (float, optional): duration to complete command in simulation. Defaults to 0.1.
            final (bool, optional): whether or not this is the final command in the sequence; relevant for recording. Defaults to True.
        """
        if simulate:

            self.visualizer.StartRecording()
            self.simulator.AdvanceTo(self.time + duration)
            self.visualizer.StopRecording()

            # Compute the stability metric
            contact_results = self.plant.get_contact_results_output_port().Eval(self.plant_context)
            for i in range(contact_results.num_point_pair_contacts()):
                info = contact_results.point_pair_contact_info(i)
                bodyA = self.plant.get_body(info.bodyA_index()).name()
                bodyB = self.plant.get_body(info.bodyB_index()).name()
                if (bodyA == "ball" and bodyB == "base_link") or (bodyA == "base_link" and bodyB == "ball"):
                    self.metric += duration

            if final:
                self.visualizer.PublishRecording()
                return self.time + duration

        self.time += duration

### Experiments

In [None]:
time_step = .0005

for idx, experiment in enumerate(scenarios):
    
    start_time = time.perf_counter()
    demo = Demo(time_step=time_step, exp_idx=idx)

    seconds = simulation_time
    for i in range(int(seconds * 20)):
        t = demo.step(duration=0.05, final=i==seconds * 20 - 1)
        if t is not None and t != seconds:
            break
    end_time = time.perf_counter()

    metric_val = demo.metric
    print("Experiment: ", idx)
    print("Stability Metric: ", metric_val)
    print("Total Time(s): ", (end_time - start_time))

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=b95b60bf-1e32-44f8-a07c-9240db62d20a' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>