# Bricklayer Robot

In [1]:
# Extensions: magic to avoid re-loading modified .py modules all the time
%load_ext autoreload
%autoreload 2

# Bookkeeping
import sys
import shutil
import os as os
import matplotlib.pyplot as plt

# Custom project classes and functions
from iiwa import *
from Trajectory import *
from TrajectoryBuilder import *
from Visualizer import *
from BrickPlanner import *
from HighPerformanceTrajectoryBuilder import *

# Some paths
kProjectDir = os.getcwd()
print("Working in directory ", kProjectDir)

# Hack: move our model packages to the manipulation/ folder to allow Drake to find them
shutil.copyfile(kProjectDir + "/models/brick.dmd.yaml", kProjectDir + "/../manipulation/manipulation/models/brick.dmd.yaml")
shutil.copyfile(kProjectDir + "/models/real_brick.sdf", kProjectDir + "/../manipulation/manipulation/models/real_brick.sdf")
shutil.copyfile(kProjectDir + "/models/ground_model.sdf", kProjectDir + "/../manipulation/manipulation/models/ground_model.sdf")

# Start the visualizer.
meshcat = StartMeshcat()

  from distutils.version import LooseVersion
INFO:drake:Meshcat listening for connections at http://localhost:7000


Working in directory  /home/nikita/MIT_6_4212/project


In [2]:
# System configs
# Brick configuration
kNumberOfBricks =  300 # @param
# Brick dimensions, must be consistent with the geometry in models/real_brick.sdf
kBrickGeom = np.array([0.075, 0.1, 0.05]) # @param

# Brick warehouse config
kWhLocation = np.array([2.5, 2.5, 0.15]) # location of the warehouse (corner), # @param
kWhSize = np.array([6, 6, 10]) # size of the warehouse grid in cells, # @param
if (kWhSize.prod() < kNumberOfBricks):
    assert False, "The warehouse is too small to fit all bricks"

In [3]:
# Generate bricks
floorplan = """
- - - - - - - 
"""

floorplan_0 = """
|- - - - - |
|          |
|          |
|          |
|- - - - - |
"""

floorplan_1 = """
|- - - - - - - - |
|                |
|                |
|                |
|                |
|                |
|                |
|                |
|                |
|- - - - - - - - |
"""

floorplan_2 = """
|- - - - - - - - |
|                |
|                |
|                |
|                |- - - |
|                |      |
|                |      |
|                |- - - |
|                |
|- - - - - - - - |
"""

kHeight = 6 # @param
kGap = 0.005 # @param
planer = Planner(floorplan_2, kBrickGeom, kGap, kHeight)
planer.shift(-0.55, -0.45)
#planer.shift(-0.55, -0.2)
total_bricks_needed = planer.get_total_bricks_n()
if (total_bricks_needed > kNumberOfBricks):
    assert False, "There is not enough bricks to cover the building"
print("Bricks planned, total bricked needed: ", total_bricks_needed)

# Visualize floorplan
vis = Visualizer(meshcat, kBrickGeom)
vis.clear_vis()
vis.visualize_plan(np.array(planer.get_brick_poses()))

Parsing floorplan:  
|- - - - - - - - |
|                |
|                |
|                |
|                |- - - |
|                |      |
|                |      |
|                |- - - |
|                |
|- - - - - - - - |

Bricks planned, total bricked needed:  276


In [5]:
# Build trajectories for robots
def build_trajectories_for_robot(ilyich, robot_id, bricks, kBrickSources, kArms, verbose=True):
    X_WBrickSource = kBrickSources[robot_id]
    robot_pose = RigidTransform(RollPitchYaw(0, 0, 0), kArms[robot_id])
    X_WG = ilyich.get_X_WG()[robot_id]

    # Build initial trajectory and get q0 in the calibration point
    trj_builder = TrajectoryBuilder(X_WBrickSource)
    trj_builder.gen_initial_traj(X_WG)
    res, q0 = trj_builder.solve_IK(np.array([ 0., 0.0, 0., 0.0, 0.0, 0.0, 0.0, 0., 0.]), robot_pose)
    assert res == True, "Failed to solve IK for the initial trajectory"
    
    if verbose:
        print("q0 in calibration point is ", q0)

    # Build other trajectories
    kNumWorkers = 20 # @Param
    hpc_trj_builder = HighPerformanceTrajectoryBuilder(planer,
                                                       X_WBrickSource,
                                                       bricks,
                                                       kNumWorkers,
                                                       q0,
                                                       robot_pose)
    covered_bricks, failed_bricks = hpc_trj_builder.solve(trj_builder)
    
    if verbose:
        print("Covered bricks: ", covered_bricks)
        print("Failed bricks:", failed_bricks)

    # Speed-up a bit
    trj_builder.get_trajectories().slow_down(0.5)

    # Make trajectories for iiwa with IK
    return trj_builder, covered_bricks, failed_bricks

# Visualize coverage
def visualize_coverage(trj_builder, covered_bricks, failed_bricks, covered_color, clear=True):
    # Visualize coverage
    vis = Visualizer(meshcat, kBrickGeom)
    if clear:
        vis.clear_vis()
    vis.visualize_coverage(planer.get_brick_poses()[covered_bricks],
                           planer.get_brick_poses()[failed_bricks],
                           covered_color)

    # Visualize all trajectories
    for b_id in covered_bricks:
        points = trj_builder.get_trajectory_points_for_brick(b_id)
        vis.visualize_traj(points, b_id)

In [4]:
# Implement greedy search for robot positions
def greedy_search(total_bricks_needed, initial_guess, num_directions, num_of_iterations, delta, eps):
    stat = []
    bricks_covered_best = 0
    full_solution_found = False
    solutions = []
    i = 0
    while i < num_of_iterations and full_solution_found == False:
        found = False
        for j in range(num_directions):
            # Guess a new position
            phi = 2*np.pi*np.random.rand(1)[0]
            new_guess = np.array([initial_guess[0] + delta * np.cos(phi),
                                  initial_guess[1] + delta * np.sin(phi),
                                  initial_guess[2]])
            kArms = [
                     np.array([0, 0, 0]),
                     new_guess
                    ]
            print("Iteration #", str(i), ", positions: ", kArms)
            # Test this new position
            brick_sources = [RigidTransform(arm + np.array([0.6, 0.0, kBrickGeom[2]/2])) for arm in kArms]
            ilyich = IIWA_Ilyich(meshcat, 0, kBrickGeom, kArms, None)
            # Do 1st robot
            trj_builder, covered_bricks, failed_bricks = build_trajectories_for_robot(ilyich,
                                                                                      0,
                                                                                      range(0, total_bricks_needed),
                                                                                      brick_sources,
                                                                                      kArms,
                                                                                      verbose=False)
            # Do 2nd robot
            trj_builder_1, covered_bricks_1, failed_bricks_1 = build_trajectories_for_robot(ilyich,
                                                                                            1,
                                                                                            failed_bricks,
                                                                                            brick_sources,
                                                                                            kArms,
                                                                                            verbose=False)
            # Do stat
            bricks_covered = len(covered_bricks + covered_bricks_1)
            stat.append(bricks_covered)
            print("   bricks_covered: ", bricks_covered)
            i = i + 1

            # Append stuff
            if (bricks_covered > bricks_covered_best):
                solutions.append(new_guess)
                bricks_covered_best = bricks_covered
                initial_guess = new_guess
                found = True
                break

            if (bricks_covered == total_bricks_needed):
                solutions.append(new_guess)
                found = True
                full_solution_found = True
                initial_guess = new_guess
                break

        if found == False:
            delta = delta - eps
            if delta <= 0:
                return None

    # Return
    return (solutions, stat)

solutions, stat = greedy_search(total_bricks_needed,
                                np.array([0.0, 1.5, 0]),
                                num_directions=10,
                                num_of_iterations=100,
                                delta=0.2,
                                eps=0.005)

Iteration # 0 , positions:  [array([0, 0, 0]), array([0.19978978, 1.49083241, 0.        ])]


*/ (Deprecated.)

Deprecated:
    Use ForcedPublish() instead This will be removed from Drake on or
    after 2023-03-01.
  self.diagram.Publish(context)


NameError: name 'build_trajectories_for_robot' is not defined

In [None]:
# Run the best solution
kArms = [
         np.array([0, 0, 0]),
         solutions[-1]
        ]
kBrickSources = [RigidTransform(arm + np.array([0.6, 0.0, kBrickGeom[2]/2])) for arm in kArms]
#
ilyich = IIWA_Ilyich(meshcat, 0, kBrickGeom, kArms, None)
# Do 1st robot
trj_builder, covered_bricks, failed_bricks = build_trajectories_for_robot(ilyich, 0, range(0, total_bricks_needed), kBrickSources, kArms)
visualize_coverage(trj_builder, covered_bricks, failed_bricks, Rgba(0.61, 1, 0.60), clear=True)

# Do 2nd robot
trj_builder_1, covered_bricks_1, failed_bricks_1 = build_trajectories_for_robot(ilyich, 1, failed_bricks, kBrickSources, kArms)
visualize_coverage(trj_builder_1, covered_bricks_1, failed_bricks_1, Rgba(1, 1, 0.60), clear=False)


In [None]:
# Align trajectories and form breakpoints
all_covered_bricks = (covered_bricks + covered_bricks_1)
all_covered_bricks.sort()

trajectories_0 = trj_builder.get_trajectories().get_traj()
trajectories_1 = trj_builder_1.get_trajectories().get_traj()
trj_0 = Trajectory()
trj_1 = Trajectory()
breakpoints = []
wall_clock_time = 0
for b in all_covered_bricks:
    traj_b = list(filter(lambda x: (x[3] == b), trajectories_0))
    if not traj_b == []:
        for bb in traj_b:
            time = bb[0]
            bb[0] = wall_clock_time + bb[0]
            trj_0.append_point_simple(bb)
            wall_clock_time = wall_clock_time + time
        breakpoints.append((0, wall_clock_time))
    else:
        traj_b = list(filter(lambda x: (x[3] == b), trajectories_1))
        if not traj_b == []:
            for bb in traj_b:
                time = bb[0]
                bb[0] = wall_clock_time + bb[0]
                trj_1.append_point_simple(bb)
                wall_clock_time = wall_clock_time + time
            breakpoints.append((1, wall_clock_time))

#trj_0.dump_trajectories([True, False, False, True, False, False, False])
#print("------")
#trj_1.dump_trajectories([True, False, False, True, False, False, False])

final_trajectories = []
final_trajectories.append(trj_0.form_iiwa_traj_q())
final_trajectories.append(trj_1.form_iiwa_traj_q())

In [None]:
# Create robot
ilyich = IIWA_Ilyich(meshcat, kNumberOfBricks, kBrickGeom, kArms, final_trajectories)

# Create default context
context = ilyich.CreateDefaultContext()

# Put all bricks in the warehouse
ilyich.put_bricks_in_warehouse(context, kWhLocation, kWhSize, kNumberOfBricks)

In [None]:
#
# Move
#
sim = None
ctx = context

# Continue for all bricks
for i, b_id in enumerate(all_covered_bricks):
    # Show trajectory
    points = trj_builder.get_trajectory_points_for_brick(b_id)
    vis.clear_traj()
    vis.visualize_traj(points)

    # Teleport the next new brick from the warehouse and unlock it
    ilyich.move_brick(context, kBrickSources[breakpoints[i][0]], i)    
    ilyich.unlock_brick(context, i)

    # Manipulate the brick to target
    sim, ctx = ilyich.work(sim, ctx, sim_duration=breakpoints[i][1])

    # Lock brick
    ilyich.lock_brick(context, i)