In [1]:
import numpy as np
import pickle
import matplotlib.pyplot as plt
from pathlib import Path
from tqdm import trange

from nmf_grooming import NeuromechflyGrooming, plot_state_and_contacts

from flygym.mujoco import Parameters, NeuroMechFly

Overwritte joints description to add abdomen joints.

In [2]:
class AbdomenNMF(NeuroMechFly):
    def _set_joints_stiffness_and_damping(self):
        # Do not forget to call the parent method
        super()._set_joints_stiffness_and_damping() # On appelle l'ancienne fonction avant de l'overwrite pour lui ajouter des trucs

        # Set the abdomen joints stiffness and damping
        for body_name in ["A1A2", "A3", "A4", "A5", "A6"]:
            body = self.model.find("body", body_name)
            # add pitch degree of freedom to bed the abdomen
            body.add(
                "joint",
                name=f"joint_{body_name}",
                type="hinge",
                pos="0 0 0",
                axis="0 1 0",
                stiffness=5.0,
                springref=0.0,
                damping=5.0,
                dclass="nmf",
            )


abd_nmf = AbdomenNMF(Parameters())

Rapide contrôle que les joints sont set up correctement:

In [3]:
try:
    abd_nmf.physics.model.jnt("Animat/joint_A1A2")
except:
    raise ("The abdoment joints were NOT properly defined.")
else:
    print("The abdomen joints were properly defined.")

The abdomen joints were properly defined.


Actuate the new added abdomen joints:

In [4]:
class ActuatedAbdomenNMF(NeuroMechFly):
    def _set_joints_stiffness_and_damping(self):
        # Do not forget to call the parent method
        super()._set_joints_stiffness_and_damping() # rewriting

        # Set the abdomen joints stiffness and damping
        for body_name in ["A1A2", "A3", "A4", "A5", "A6"]:
            body = self.model.find("body", body_name)
            # add pitch degree of freedom to bed the abdomen
            body.add(
                "joint",
                name=f"joint_{body_name}",
                type="hinge",
                pos="0 0 0",
                axis="0 1 0",
                stiffness=5.0,
                springref=0.0,
                damping=5.0,
                dclass="nmf",
            )

            # adding the actuated joints to the list of actuated joints her implies there will be a sensor per leg joint
            # if added later that would not be the case (just be aware of the differences)
            self.actuated_joints.append(f"joint_{body_name}")

    def _add_adhesion_actuators(self, gain):
        for body_name in ["A1A2", "A3", "A4", "A5", "A6"]:
            joint = self.model.find("joint", f"joint_{body_name}")
            actuator = self.model.actuator.add(
                "position",
                name=f"actuator_position_joint_{body_name}",
                joint=joint,
                forcelimited="true",
                ctrlrange="-1000000 1000000",
                forcerange="-10 10",
                kp=self.sim_params.actuator_kp,
                dclass="nmf",
            )

            # this is needed if you do not want to override add joint sensors
            vel_actuator = self.model.actuator.add(
                "velocity",
                name=f"actuator_velocity_joint_{body_name}",
                joint=joint,
                dclass="nmf",
            )
            torque_actuator = self.model.actuator.add(
                "motor",
                name=f"actuator_torque_joint_{body_name}",
                joint=joint,
                dclass="nmf",
            )
            # self.actuated_joints.append(joint)
            self._actuators.append(actuator)

        return super()._add_adhesion_actuators(gain)

Demo of abdomen bending + saving the video

In [5]:
from flygym.mujoco.examples.common import PreprogrammedSteps

preprogrammed_steps = PreprogrammedSteps()
leg_action = []
for leg in preprogrammed_steps.legs:
    # fly is standing
    leg_action.extend(preprogrammed_steps.get_joint_angles(leg, 0.0))

act_abd_nmf = ActuatedAbdomenNMF(Parameters())

run_time = 0.5
target_num_steps = int(run_time / act_abd_nmf.timestep)

abdomen_control = np.sin(
    2 * 2 * np.pi * np.linspace(0, run_time, target_num_steps) / run_time
)
# scale abdomen control between -pi/10 and 3pi/10
abdomen_control = abdomen_control * np.pi / 5 + np.pi / 10

# run a simulation and bend the abdomen
obs, info = act_abd_nmf.reset()
for i in range(target_num_steps):
    # # here, we simply use the recorded joint angles as the target joint angles
    joint_pos = np.hstack([leg_action, abdomen_control[i] * np.ones(5)])
    action = {"joints": joint_pos}
    obs, reward, terminated, truncated, info = act_abd_nmf.step(action)
    act_abd_nmf.render()

act_abd_nmf.save_video("bend_abdomen.mp4", 0)

In [6]:
class NeuromechflyDust(NeuromechflyGrooming):
    def __init__(
        self,
        sim_params,
        init_dust_level=5.0,
        saturation_level=10.0,
        hindleg_decay=0.01,
        dusted_areas=["hindleg"],
        touch_thr=1.0,
    ):
        self.bodies_in_app = {
            "RH_leg": [
                "RHTarsus1",
                "RHTarsus2",
                "RHTarsus3",
                "RHTarsus4",
                "RHTarsus5",
                "RHTibia",
            ],
            "LH_leg": [
                "LHTarsus1",
                "LHTarsus2",
                "LHTarsus3",
                "LHTarsus4",
                "LHTarsus5",
                "LHTibia",
            ],
            
        }

        self.dusted_bodies = [
            body for area in dusted_areas for body in self.bodies_in_app[area]
        ]

        self.dusted_areas = dusted_areas
        # check if the dust level is a list
        self.saturation_level = saturation_level
        if isinstance(init_dust_level, list):
            self.dust_level = np.array(init_dust_level)
        else:
            self.dust_level = np.ones(len(self.dusted_areas)) * init_dust_level
        self.dust_level = np.clip(self.dust_level, 0.0, self.saturation_level)

        self.touch_thr = touch_thr
        self.hindleg_dust_decay = hindleg_decay

        touch_sensors_locations = []
        self.touch_sensors_ids = []
        self.appendage_touch_sensors = [[]] * len(self.dusted_areas)
        for i, area in enumerate(self.dusted_areas):
            n_touch_sensors_previous = len(touch_sensors_locations)
            if area in self.bodies_in_app:
                touch_sensors_locations += self.bodies_in_app[area]
            else:
                raise ValueError("Unknown dusted area")
            self.appendage_touch_sensors[i] = np.arange(
                n_touch_sensors_previous, len(touch_sensors_locations)
            )

        self.baseline_rgba = np.array([1.0, 1.0, 1.0, 1.0])
        self.no_dust_color = np.array([0.59, 0.39, 0.12, 1.0])
        self.full_dust_color = np.array([0.0, 1.0, 0.0, 1.0])

        super().__init__(
            sim_params=sim_params,
            groom_collision=True,
            touch_sensor_locations=touch_sensors_locations,
        )

        self.update_segment_aspect()

    def _set_geom_colors(self):
        self.areas_materials = {}
        self.areas_materials["RH_leg"] = []
        self.areas_materials["LH_leg"] = []

        for type_, specs in self._mujoco_config["appearance"].items():
            # If part of the dusted bodies, set the color to the baseline
            dusted_bodies_type_match = [
                db[2:-1].lower() in type_ for db in self.dusted_bodies
            ]
            if (
                "antenna" in type_
                or any(dusted_bodies_type_match)
                and not "arista" in type_
            ):
                specs["material"]["rgba"] = self.baseline_rgba
                if type_ == "eye":
                    specs["texture"] = self._mujoco_config["appearance"]["antenna"][
                        "texture"
                    ]
                    specs["texture"]["rgb1"] = self.baseline_rgba[:3]
                    specs["texture"]["rgb2"] = self.baseline_rgba[:3]
                    specs["texture"]["width"] = 1e6
                    specs["texture"]["height"] = 1e6
                    specs["texture"]["random"] = 1e-5
                elif specs["texture"] is not None:
                    specs["texture"]["rgb1"] = self.baseline_rgba[:3]
                    specs["texture"]["rgb2"] = self.baseline_rgba[:3]

            # Define texture and material
            if specs["texture"] is not None:
                self.model.asset.add(
                    "texture",
                    name=f"{type_}_texture",
                    builtin=specs["texture"]["builtin"],
                    mark="random",
                    width=specs["texture"]["size"],
                    height=specs["texture"]["size"],
                    random=specs["texture"]["random"],
                    rgb1=specs["texture"]["rgb1"],
                    rgb2=specs["texture"]["rgb2"],
                    markrgb=specs["texture"]["markrgb"],
                )
            self.model.asset.add(
                "material",
                name=f"{type_}_material",
                texture=f"{type_}_texture" if specs["texture"] is not None else None,
                rgba=specs["material"]["rgba"],
                specular=0.0,
                shininess=0.0,
                reflectance=0.0,
                texuniform=True,
            )
            # Apply to geoms
            for segment in specs["apply_to"]:
                geom = self.model.find("geom", segment)
                if geom is None:
                    geom = self.model.find("geom", f"{segment}")
                geom.material = f"{type_}_material"

    def get_observation(self):
        obs = super().get_observation()

        appendage_touch = [
            np.sum(obs["touch_sensors"][touch_sensors])
            for touch_sensors in self.appendage_touch_sensors
        ]

        RH_leg_contact = np.mean(appendage_touch[self.dusted_areas.index("RH_leg")])
        LH_leg_contact = np.mean(appendage_touch[self.dusted_areas.index("LH_leg")])

        
        is_hindleg_grooming = (
            RH_leg_contact > self.touch_thr
            and LH_leg_contact > self.touch_thr

        )

        self.update_dust_level(
            is_hindleg_grooming,
            LH_leg_contact,
            RH_leg_contact,
        )

        self.update_segment_aspect()

        # add curr behavior
        obs["is_hindleg_grooming"] = is_hindleg_grooming

        assert all(self.dust_level >= 0.0) and all(
            self.dust_level <= self.saturation_level
        ), f"Dust level out of bounds: {self.dust_level}"
        # add dust level
        obs["dust_level"] = self.dust_level

        return obs

    def update_segment_aspect(self):
        # change body rgba depending on dust level
        for app, dl in zip(self.dusted_areas, self.dust_level):
            for body in self.bodies_in_app[app]:
                if not "Arista" in body:
                    norm_dl = (dl.copy() - 0.0) / (self.saturation_level - 0.0)
                    self.physics.named.model.geom_rgba[
                        f"Animat/{body}"
                    ] = self.interp_color(norm_dl)

    def interp_color(self, dust_level):
        return (
            self.no_dust_color
            + (self.full_dust_color - self.no_dust_color) * dust_level
        )

    def update_dust_level(
        self,
        is_hindleg_grooming,
        RH_leg_contact,
        LH_leg_contact,
    ):
        is_saturated = self.dust_level >= self.saturation_level
        saturated_app = [
            self.dusted_areas[i] for i, sat in enumerate(is_saturated) if sat
        ]
        is_zero = self.dust_level <= 0.0
        zero_app = [self.dusted_areas[i] for i, zero in enumerate(is_zero) if zero]

        # update dust level bodies that have no dust should not give dust to the forelegs,
        # when the forelegs are saturated, they can not pick up dust anymore

        if is_hindleg_grooming and(not "RH_leg" in zero_app or not "LH_leg" in zero_app):
            removed_dust = (
                np.mean([RH_leg_contact, LH_leg_contact]) * self.hindleg_dust_decay
            )
            if "RH_leg" in zero_app and "LH_leg" in zero_app:
                removed_dust = 0
            elif "RH_leg" in zero_app:
                self.dust_level[self.dusted_areas.index("LH_leg")] -= removed_dust / 2
            elif "LH_leg" in zero_app:
                self.dust_level[self.dusted_areas.index("RH_leg")] -= removed_dust / 2
            else:
                self.dust_level[self.dusted_areas.index("RH_leg")] -= removed_dust / 2
                self.dust_level[self.dusted_areas.index("LH_leg")] -= removed_dust / 2

        self.dust_level = np.clip(self.dust_level, 0.0, self.saturation_level)

    def update_hindleg_dust_antgroom(self, saturated_app, supposed_removed_dust):
        if "LH_leg" in saturated_app and "RH_leg" in saturated_app:
            return 0.0
        elif "RH_leg" in saturated_app:
            self.dust_level[self.dusted_areas.index("RH_leg")] += (
                supposed_removed_dust / 2
            )
            return supposed_removed_dust / 2
        elif "LH_leg" in saturated_app:
            self.dust_level[self.dusted_areas.index("LH_leg")] += (
                supposed_removed_dust / 2
            )
            return supposed_removed_dust / 2
        else:
            self.dust_level[self.dusted_areas.index("RH_leg")] += (
                supposed_removed_dust / 2
            )
            self.dust_level[self.dusted_areas.index("LH_leg")] += (
                supposed_removed_dust / 2
            )
            return supposed_removed_dust / 2

In [7]:
grooming_module_path = Path("./data/grooming_modules_provided_slow.pkl")
with open(grooming_module_path, "rb") as f:
    grooming_modules = pickle.load(f)

timestep = grooming_modules["timestep"]

In [8]:
sim_params = Parameters(
    timestep=timestep,
    render_mode="saved",
    render_playspeed=0.1,
    actuator_kp=10.0,
    render_camera="Animat/camera_back",
)

run_time = 1.0
target_num_steps = int(run_time / timestep)

nmf = NeuromechflyDust(
    sim_params=sim_params,
    init_dust_level=5.0,
    hindleg_decay=0.005,
)

np.random.seed(0)

obs_list = []
obs, info = nmf.reset()
obs_list.append(obs)

module_len = {
    module: len(grooming_modules[module][0])
    for module in grooming_modules
    if module != "timestep"
}

transition_period = 200
transit_to = ["hindleg"]
transit_times = [0, 1000, 3400, 4500, 6200, 7500]

k = 0

get_transition_joint_angles = lambda before, after, k: before + (after - before) * min(
    1, (k / transition_period)
)

modules = []

for i in trange(target_num_steps):
    if i in transit_times:
        k = 1
        curr_module = transit_to[transit_times.index(i)]
        print(f"Transit to {curr_module}")
        before_transition_pose = obs["joints"][0]
    else:
        k += 1

    target_joint_angles = grooming_modules[curr_module][:, k % module_len[curr_module]]
    joint_pos = get_transition_joint_angles(
        before_transition_pose, target_joint_angles, k
    )

    action = {"joints": joint_pos}
    obs, reward, terminated, truncated, info = nmf.step(action)

    obs_list.append(obs)
    modules.append(curr_module)

    nmf.render()

modules.append(curr_module)
nmf.save_video("outputs/dust_grooming_sequence.mp4", 0)

KeyError: 'hindleg'