# Optimize a modular robot for drilling

In this tutorial we will:
- Define a drilling task
- See how a standard CONCERT robot configuration solves this task
- Use a Genetic Algorithm to find a suitable but smaller robot to perform the same task
- Visualize our results

We start by **defining a drilling task**. For that purpose, we use a BIM model of the Fraunhofer Bolzano Cran Hall that was parsed to .stl.

Again, we use Timor on a development branch. For information on the setup, check the viz_demo notebook.

In [None]:
# The meshcat visualizer does sometimes not terminate properly. You can kill all local processes with "meshcat" in there name by runnning this cell.
! pkill -f meshcat

In [None]:
from datetime import datetime
import json
from pathlib import Path
import os
from time import time

concert_folder = Path(os.path.abspath("")).parent
crane_hall_path = Path(os.path.abspath("")).joinpath('assets/Crane_Hall_V10.stl')
jsons = concert_folder.joinpath('json/concert/')

In [None]:
import numpy as np
from pinocchio.visualize import MeshcatVisualizer

from timor import AtomicModule, Geometry, ModuleAssembly, ModulesDB, Obstacle, Task, Tolerance, Transformation
from timor.task.Goals import At, Follow
from timor.utilities import logging, spatial, visualization
from timor.utilities.tolerated_pose import ToleratedPose
from timor.utilities.trajectory import Trajectory

In [None]:
crane_hall = Obstacle.Obstacle(
    "crane_hall",
    collision=Geometry.Mesh(
        parameters={'package_dir': concert_folder, 'file': crane_hall_path, 'scale': 0.001},
        pose=Transformation.neutral()
    ),
    visual=None  # Defaults to collision
)

In [None]:
%%capture
header = dict(
    ID='CONCERT_demo',
    version="2023",
    taskName="Crane Hall Drilling Demo",
    tags=["trajectory", "CONCERT"],
    date=datetime(2023, 3, 7),  # This was written in the IIT hosted integration week
    author=["Jonathan Külz", "Matthias Mayer"],
    email=["jonathan.kuelz@tum.de", "matthias.mayer@tum.de"],
    affiliation=["TUM"]
)
task = Task.Task(
    header=header,
    obstacles=[crane_hall],
    goals=None,
    constraints=None
)

So far, we have only defined the environment. It is still open what kind of goals we want to achieve.

For CONCERT, modular robots should be used in a construction site environment. We start by defining two different trajectories that reassemble a drilling motion.

In [None]:
drill_tolerance = Tolerance.CartesianXYZ.default() + Tolerance.RotationAxisAngle(n_x=(-0.01, 0.01), n_y=(-0.01, 0.01), n_z=(-1, 1), theta_r=(0, np.pi))
def drilling_trajectory(initial_pose: Transformation) -> Trajectory:
    """We write a little helper function to create a desired drilling trajectory based on a starting position"""
    drilling_offset = Transformation.from_translation((0, 0, .2))  # Drilling for 20cm
    poses = [initial_pose.interpolate(initial_pose @ drilling_offset, i / 100) for i in range(100)]
    with_tolerances = [ToleratedPose(p, tolerance=drill_tolerance) for p in poses]
    return Trajectory(pose=np.array(with_tolerances))

first_position = Transformation.from_roto_translation(spatial.rotX(-np.pi / 2)[:3, :3], (0, 5.7, 1.5))  # That is a position on the hall's wall
second_position = Transformation.from_roto_translation(spatial.rotX(-np.pi / 2)[:3, :3], (2.5, 5.7, 1.5))  # That another position on the hall's wall

T_one = drilling_trajectory(first_position)
T_two = drilling_trajectory(second_position)

Have a look at the interactive task visualization:

In [None]:
task = Task.Task(
    header=header,
    obstacles=[crane_hall],
    goals=[
        Follow('1', T_one),
        Follow('2', T_two)
    ],
    constraints=None
)

task_viz = MeshcatVisualizer()
task_viz.initViewer()
task_viz = task.visualize(task_viz)
# You can also invoke a local visualization using: task_viz.viewer.jupyter_cell()

## Load the modules

We can now start placing a robot in the environment and solving the task with that robot. Note that, for the sake of efficiency, we are using a simplified version of the CONCERT platform. First, let's load all the modules and make create a typical assembly.

In [None]:
colors = {}  # A little hack because Timor does not support body colors natively
db = ModulesDB()  # We are gonna parse them one by one
for f in jsons.iterdir():
    m, c = AtomicModule.from_concert_specification(json.loads(f.open('rb').read()), concert_folder.parent)
    db.add(m)
    colors[m.id] = c

classic_assembly = ModuleAssembly.from_serial_modules(db, ('23_planar', '14', '16', '14', '16', '14', '16'))
visualization.color_visualization(task_viz, classic_assembly, colors)  # You can check the task visualization: The robot is now included there

## Task solving capabilities

We need to do path planning and collision checks in order to solve this task. For this demo, we only perform very simple checks -- those capabilities exist but are currently still subject to optimization, so a final version will be included towards the end of the CONCERT project. For now, a couple of simple solution capabilities will suffice:

In [None]:
logging.setLevel('DEBUG')  # If not, we will be flooded by the ik solver
def find_follow_configuration(assembly: ModuleAssembly, goal: Follow, task: Task) -> Trajectory:
    """
    A simple solver that moves the assembly to a goal position, performs a drilling motion (without environment collision checks) and moves the assembly back to start.
    
    :param assembly: A module assembly
    :param goal: A drilling goal
    """
    desired = goal.trajectory.pose

    place_base = np.zeros(assembly.robot.njoints)
    place_base[:3] = [desired[0].nominal.translation[0], desired[0].nominal.translation[1] - 0.9, np.pi / 2]
    start = desired[0]
    joint_mask = np.ones((assembly.robot.njoints,))
    joint_mask[0] = 0
    joint_mask[2] = 0
    q_0, success = assembly.robot.ik(start, task=task, q_init=place_base, joint_mask=joint_mask)
    if not success:
        return
    q = [q_0]
    for i, p in enumerate(goal.trajectory.pose[1:]):
        _q, success = assembly.robot.ik(p, q_init=assembly.robot.configuration, joint_mask=joint_mask)
        if not success:
            assembly.robot.update_configuration(_q)
            return
        q.append(_q)
    ret = Trajectory(q=q)
    return ret + ret[::-1]

In [None]:
def get_task_trajectories(assembly, task) -> tuple[list[Trajectory], bool]:
    """
    Loops over goals in the task and tries to find trajectories to solve them.
    
    Returns a tuple with (1) A list of trajectories that solve the goals and (2) A boolean indicator whether all goals could be achieved
    """
    ret = list()
    for goal in task.goals:
        t = find_follow_configuration(assembly, goal, task)
        if t is None:
            return ret, False
        ret.append(t)
    return ret, True

In [None]:
def make_solution_trajectory(t1, t2, assembly):
    """This only works for this specific task!"""
    p_inter = assembly.robot.fk(t1.q[-1]) @ Transformation.from_translation([ second_position.translation[0] / 2, 0, -2])
    mask = np.zeros((assembly.robot.njoints,))
    mask[:2] = 1
    q_inter, suc = assembly.robot.ik(ToleratedPose(p_inter), q_init=t1.q[-1], joint_mask=mask)
    assert suc

    neutral = np.zeros((assembly.robot.njoints,))
    t_start = Trajectory(q=np.linspace(neutral, t1.q[0], 400))
    t_inter = Trajectory(q=np.vstack([
        np.linspace(t1.q[-1], q_inter, 200),
        np.linspace(q_inter, t2.q[0], 200)
    ]))
    t_end = Trajectory(q=np.linspace(t2.q[-1], neutral, 500))
    
    return t_start + t1 + t_inter + t2 + t_end

In [None]:
(t1, t2), suc = get_task_trajectories(classic_assembly, task)
T = make_solution_trajectory(t1, t2, classic_assembly)

visualization.animation(classic_assembly.robot, T.q, .01, task_viz)

# Now, let's optimize the Assembly instead of taking a predefined one

The flexibility of modular robots allows us to tailor our robot to whatever task we have to deal with. The goal of CONCERT WP2.4 was to develop an automatic assembly optimizer -- Timor serves as such.

As we mostly optimize for kinematics, we will deal with a subset of modules only that capture all kinematic capabilities of the CONCERT modules without introducing duplicity. Also, we only consider our simplified base model that can "float" accross the floor.

In [None]:
names_for_optim = ('concert_elbow_joint_A', 'concert_yaw_joint_A', 'concert_link_straight_10', 'concert_link_straight_20', 'concert_link_straight_50', 'concert_mobile_platform_simplified')
modules = ModulesDB(mod for mod in db if mod.name in names_for_optim)
print("There are {} modules which will be used to create an optimized assembly".format(len(modules)))

In the next field, we will define the heart of a genetic optimizer:
- We use the PyGAD library for a GA implementation
- The hyperparameters have been retrieved running an optimization with optuna
- We can optimize for an arbitrary fitness funciton. Here we just take the mass of the robot arm as a proxy for real-world part cost (combined with a fitness value for the capability to solve the task)

In [None]:
import itertools
import pygad
import random

from timor.utilities import module_classification


def ga_optimize(db, task):
    hyperparameters = {
        'population_size': 50,
        'num_generations': 32,
        'mutation_probability': 0.237,
        'num_parents_mating': 5,
        'keep_parents': 4,
        'crossover_type': 'single_point',
        'keep_elitism': 5,
        'save_solutions': True
    }
    
    bases, links, joints, eefs = map(lambda DB: sorted(DB, key=lambda m: m.id), module_classification.divide_db_in_types(db, strict=False))    
    link_ids = tuple(m.id for m in links)
    joint_ids = tuple(m.id for m in joints)
    
    num2id = {i: m.id for i, m in zip(range(1, len(db) + 1), itertools.chain(bases, eefs, joints, links))}
    id2num = {ID: i for i, ID in num2id.items()}
    id2num['000'] = 0  # this encodes the "no module" option
    
    def get_gene_space(genome: str) -> list[int]:
        """Maps L: link or nothing and J: joint or nothing"""
        return [v for v in id2num.values() if v != 0 and num2id[v] not in (b.id for b in bases)]  # TODO: This is a hack and should eventually be removed
        if genome == 'L':
            return [0] + list(map(id2num.get, link_ids))
        elif genome == 'J':
            return [0] + list(map(id2num.get, joint_ids))
    
    def fitness_func(solution, solution_idx) -> float:
        solution = [1] + [i for i in solution if i != 0] # + [2]  # add B and E
        robot = map(num2id.get, solution)
        solution = ModuleAssembly.from_serial_modules(db, robot)
        results, suc = get_task_trajectories(solution, task)
        
        mass_arm = solution.mass - 127.57599
        mass_fitness = 10 / mass_arm
        
        if len(results) > 2:
            return len(results)
        
        return len(results) + mass_fitness
    
    J = id2num[joint_ids[0]]
    L = lambda: id2num[random.choice(link_ids + ('000',))]

    genes = 'L-J-L-J-L-J-L-J-L-J-L-J-L-L'.split('-')
    def make_random(gene_encoding):
        if gene_encoding == 'L':
            return L()
        elif gene_encoding == 'J':
            return J
        raise ValueError()
        
    initial_population = [  # Create a couple random solutions that do have modules at all positions
        [make_random(g) for g in genes] for _ in range(hyperparameters.pop('population_size'))
    ]
    
    last_fitness = 0
    def callback_generation(ga_instance):
        nonlocal last_fitness
        if ga_instance.generations_completed % 4 != 0 and ga_instance.generations_completed != 1:
            return
        print("Generation = {generation}".format(generation=ga_instance.generations_completed))
        print("Fitness    = {fitness}".format(fitness=ga_instance.best_solution()[1]))
        print("Change     = {change}".format(change=ga_instance.best_solution()[1] - last_fitness))
        print('---\n')
        last_fitness = ga_instance.best_solution()[1]

    ga_instance = pygad.GA(
        num_genes=len(initial_population[0]),
        fitness_func=fitness_func,
        gene_space=[get_gene_space(g) for g in genes],
        gene_type=int,
        initial_population=initial_population,
        on_generation=callback_generation,
        **hyperparameters
    )
    
    try:
        ga_instance.run()
    except Exception as exe:
        print(exe)
        raise exe

    return ga_instance

In [None]:
# This runs the actual optimization
print(f"The classical assembly has a fitness value of: {2 + 10 / (classic_assembly.mass - 127.57599):.3f} (higher is better)")
t0 = time()
optimizer = ga_optimize(modules, task)
t1 = time()
print(f"The optimization took {t1 - t0:.0f}s, which is {(t1 - t0) / optimizer.generations_completed:.2f}s/generation")

You can see the optimization history above. The plot below provides a visual interface to the fitness history.

In [None]:
fig = optimizer.plot_fitness()

Of course a good visualization helps interpreting the results. Let's see how the new solution, optimized for a small mass, looks like:

In [None]:
bases, links, joints, eefs = map(lambda DB: sorted(DB, key=lambda m: m.id), module_classification.divide_db_in_types(modules, strict=False))
num2id = {i: m.id for i, m in zip(range(1, len(modules) + 1), itertools.chain(bases, eefs, joints, links))}
id2num = {ID: i for i, ID in num2id.items()}
id2num['000'] = 0  # this encodes the "no module" option
best_as_chain = [bases[0].id] + [num2id[m] for m in optimizer.best_solution()[0] if m != 0]

best_assembly = ModuleAssembly.from_serial_modules(modules, best_as_chain)
(t1, t2), suc = get_task_trajectories(best_assembly, task)
T = make_solution_trajectory(t1, t2, best_assembly)

In [None]:
optimize_viz = MeshcatVisualizer()
optimize_viz.initViewer()
optimize_viz = task.visualize(optimize_viz)

In [None]:
visualization.color_visualization(optimize_viz, best_assembly, colors)
visualization.animation(best_assembly.robot, T.q, .01, optimize_viz)