Note: this notebook was runned on Colab, because the mjcf of dm_control needs Nvidia GPU

In [None]:
#@title Run to install MuJoCo and `dm_control`
import distutils.util
import os
import subprocess
if subprocess.run('nvidia-smi').returncode:
  raise RuntimeError(
      'Cannot communicate with GPU. '
      'Make sure you are using a GPU Colab runtime. '
      'Go to the Runtime menu and select Choose runtime type.')

# Add an ICD config so that glvnd can pick up the Nvidia EGL driver.
# This is usually installed as part of an Nvidia driver package, but the Colab
# kernel doesn't install its driver via APT, and as a result the ICD is missing.
# (https://github.com/NVIDIA/libglvnd/blob/master/src/EGL/icd_enumeration.md)
NVIDIA_ICD_CONFIG_PATH = '/usr/share/glvnd/egl_vendor.d/10_nvidia.json'
if not os.path.exists(NVIDIA_ICD_CONFIG_PATH):
  with open(NVIDIA_ICD_CONFIG_PATH, 'w') as f:
    f.write("""{
    "file_format_version" : "1.0.0",
    "ICD" : {
        "library_path" : "libEGL_nvidia.so.0"
    }
}
""")

print('Installing dm_control...')
!pip install -q dm_control>=1.0.26

# Configure dm_control to use the EGL rendering backend (requires GPU)
%env MUJOCO_GL=egl

print('Checking that the dm_control installation succeeded...')
try:
  from dm_control import suite
  env = suite.load('cartpole', 'swingup')
  pixels = env.physics.render()
except Exception as e:
  raise e from RuntimeError(
      'Something went wrong during installation. Check the shell output above '
      'for more information.\n'
      'If using a hosted Colab runtime, make sure you enable GPU acceleration '
      'by going to the Runtime menu and selecting "Choose runtime type".')
else:
  del pixels, suite

!echo Installed dm_control $(pip show dm_control | grep -Po "(?<=Version: ).+")

In [None]:
import dm_env
import numpy as np
from dm_control import manipulation, suite
from dm_control.suite.wrappers import action_scale, pixels
from dm_env import StepType, specs
from dm_control import mjcf
from dm_control.mjcf import Physics
from dm_control.utils import io as resources
import matplotlib.pyplot as plt
import os

In [None]:
from google.colab import drive
from pathlib import Path
drive.mount('/content/drive')

#Path to store file on Google Drive
output_dir = "/content/drive/My Drive/Reinforcement Learning/DRAP"
#output_dir = Path(output_dir)

In [None]:
import random
import xml.etree.ElementTree as ET

def random_skybox(xml_input_path, xml_output_path, markrgb_limit=255):
    """
    Modify the parameters of the 'texture' node in XML file.

    1. Change 'builtin' value with 
       - "gradient": 70% of probability
       - "flat": 30% of probability
    2. Change the value of 'markrgb' as ".a .b .c", where a, b, c are random integer values.
       - The upper bound of these value can be defined by `markrgb_limit`.

    Args:
        xml_input_path (str): Path of XML input file.
        xml_output_path (str): Path of modified XML output file.
        markrgb_limit (int): Upper bound for a, b and c.
    """
    #Load XML
    tree = ET.parse(xml_input_path)
    root = tree.getroot()

    #Identify texture node
    texture = root.find(".//texture")
    if texture is None:
        raise ValueError("The node 'texture' was not founded in the XML file.")

    #Modify builtin with defined probabilities
    texture.set("builtin", "gradient" if random.random() < 0.6 else "flat")

    #Random values for 'markrgb'
    a = random.randint(0, markrgb_limit)
    b = random.randint(0, markrgb_limit)
    c = random.randint(0, markrgb_limit)
    texture.set("markrgb", f".{a} .{b} .{c}")

    tree.write(xml_output_path)
    #print(f"Modified XML file saved in: {xml_output_path}")

In [None]:
import random
import xml.etree.ElementTree as ET

def random_materials(xml_input_path, xml_output_path, rgb_limit=255):
    """
    Modify the parameters of the 'texture' and 'material nodes in XML file.
    
    Modifications:
    1. 'builtin' in 'texture': random value between "gradient", "flat", "checker".
    2. 'rgb1', 'rgb2' e 'markrgb' in 'texture': random value.
    3. 'rgba' in ('self', 'decoration', 'effector'): random value.

    Args:
        xml_input_path (str): Path of XML input file.
        xml_output_path (str): Path of modified XML output file.
        rgb_limit (int): Upper bound for rgb and rgba (default: 255).
    """
    #Load XML
    tree = ET.parse(xml_input_path)
    root = tree.getroot()

    #Identify texture node
    texture = root.find(".//texture")
    if texture is not None:
        # random 'builtin'
        texture.set("builtin", random.choices(["gradient", "flat", "checker"],weights=[0.2, 0.5, 0.3])[0])

        #Random values for rgb1, rgb2, e markrgb
        r1, g1, b1 = [random.randint(0, rgb_limit) for _ in range(3)]
        r2, g2, b2 = [random.randint(0, rgb_limit) for _ in range(3)]
        mr, mg, mb = [random.randint(0, rgb_limit) for _ in range(3)]

        texture.set("rgb1", f".{r1} .{g1} .{b1}")
        texture.set("rgb2", f".{r2} .{g2} .{b2}")
        texture.set("markrgb", f".{mr} .{mg} .{mb}")
    else:
        raise ValueError("The node 'texture' was not founded in the XML file.")

    #Modify the materials colors
    materials_to_modify = ["self", "decoration", "effector"]
    for material_name in materials_to_modify:
        material = root.find(f".//material[@name='{material_name}']")
        if material is not None:
            #Random values for rgba
            r = random.randint(0, rgb_limit)
            g = random.randint(0, rgb_limit)
            b = random.randint(0, rgb_limit)
            a = 1.0  # Keep transparency to 1.0 to avoid unusual transparency 
            material.set("rgba", f".{r} .{g} .{b} {a}")

    tree.write(xml_output_path)
    #print(f"Modified XML file saved in: {xml_output_path}")

In [None]:
#Set the random initial state 
def set_random_initial_state(physics):
    #qpos shapes
    num_positions = physics.data.qpos.shape[0]
    num_velocities = physics.data.qvel.shape[0]

    #Position and velocity limits
    pos_limit_sup = np.array([1.0] * num_positions)
    pos_limit_inf = np.array([0.0] * num_positions)
    vel_limit = np.array([1.0] * num_velocities)

    #Random values for position and velocity
    random_pos = np.random.uniform(pos_limit_inf, pos_limit_sup)
    random_vel = np.random.uniform(-vel_limit, vel_limit)

    #Assign position and velocity
    physics.data.qpos[:] = random_pos
    physics.data.qvel[:] = random_vel

    return random_pos, random_vel

#Function to synchronize the initial state in the other 3 physics
def sync_initial_state(physics_1, physics_2, physics_3, initial_pos, initial_vel):
    physics_1.data.qpos[:] = initial_pos
    physics_1.data.qvel[:] = initial_vel

    physics_2.data.qpos[:] = initial_pos
    physics_2.data.qvel[:] = initial_vel

    physics_3.data.qpos[:] = initial_pos
    physics_3.data.qvel[:] = initial_vel

#Function to apply an action modifying the joints values 
def apply_action(physics, action):
    physics.data.ctrl[:] = action

In [None]:
#Defining XML files path
agent_xml= output_dir + "/walker/walker.xml"
skybox_xml = output_dir + "/walker/common/skybox.xml"
materials_xml = output_dir + "/walker/common/materials.xml"

skybox_xml_random1 = output_dir + "/walker/common/skybox_random1.xml"
skybox_xml_random2 = output_dir + "/walker/common/skybox_random2.xml"
skybox_xml_random3 = output_dir + "/walker/common/skybox_random3.xml"

materials_xml_random1 = output_dir + "/walker/common/materials_random1.xml"
materials_xml_random2 = output_dir + "/walker/common/materials_random2.xml"
materials_xml_random3 = output_dir + "/walker/common/materials_random3.xml"

agent_xml_randomized_1= output_dir + "/walker/walker1.xml"
agent_xml_randomized_2= output_dir + "/walker/walker2.xml"
agent_xml_randomized_3= output_dir + "/walker/walker3.xml"

def create_canonical_randomization():
  #Create randomized features
  random_skybox(skybox_xml, skybox_xml_random1, markrgb_limit=10)
  random_skybox(skybox_xml, skybox_xml_random2, markrgb_limit=10)
  random_skybox(skybox_xml, skybox_xml_random3, markrgb_limit=10)
  #Create randomized features
  random_materials(materials_xml, materials_xml_random1, rgb_limit=100)
  random_materials(materials_xml, materials_xml_random2, rgb_limit=100)
  random_materials(materials_xml, materials_xml_random3, rgb_limit=100)

  #Canonical
  mjcf_model = mjcf.from_path(agent_xml)
  physics_canon = Physics.from_mjcf_model(mjcf_model)

  #randomized 1
  mjcf_model = mjcf.from_path(agent_xml_randomized_1)
  physics_rd_1 = Physics.from_mjcf_model(mjcf_model)

  #randomized 2
  mjcf_model = mjcf.from_path(agent_xml_randomized_2)
  physics_rd_2 = Physics.from_mjcf_model(mjcf_model)

  #randomized 3
  mjcf_model = mjcf.from_path(agent_xml_randomized_3)
  physics_rd_3 = Physics.from_mjcf_model(mjcf_model)

  return physics_canon,physics_rd_1,physics_rd_2, physics_rd_3

In [None]:
import os
import torch
import gc

def create_part_dataset(total_sequence, n):
  sequences_per_randomization = 1000 
  steps_between_randomization = 1000   #To simulate one environment step
  sampling_interval = 5
  sequence_count = 0
  sequence_length = 4  # 3 images + 1 next target 

  #dataset_dir = os.path.join(output_dir, "dataset/finger")
  dataset_dir = os.path.join(output_dir, "dataset/walker")
  os.makedirs(dataset_dir, exist_ok=True)

  final_dataset = []

  while sequence_count < total_sequence:

    #New randomization every 'sequences_per_randomization'
    if sequence_count % sequences_per_randomization == 0:
      physics_canon,physics_rd_1,physics_rd_2, physics_rd_3 = create_canonical_randomization()
      print(f"New randomizations generated at step {sequence_count}.")


    initial_pos, initial_vel = set_random_initial_state(physics_canon)
    #Sync initial state
    sync_initial_state(physics_rd_1,physics_rd_2,physics_rd_3, initial_pos, initial_vel)

    #Simulate for 1000 steps
    for step in range(steps_between_randomization):

      phi = np.random.uniform(0, np.pi)
      a = np.random.randint(1,70)
      b = np.random.randint(1,50)
      action = [np.cos(phi)*a, -np.cos(phi)*a, np.cos(phi)*a, np.cos(phi)*b, -np.cos(phi)*b, -np.cos(phi)*b]

      apply_action(physics_canon, action)
      apply_action(physics_rd_1, action)
      apply_action(physics_rd_2, action)
      apply_action(physics_rd_3, action)

      physics_canon.step()
      physics_rd_1.step()
      physics_rd_2.step()
      physics_rd_3.step()

      #Sample the sequence
      if step % sampling_interval == 0:
        images_canon, images_rd_1, images_rd_2, images_rd_3, actions = [], [], [], [], []

        x=0
        for i in range(sequence_length - 1):
          images_canon.append(physics_canon.render(height=84, width=84, camera_id=0))
          images_rd_1.append(physics_rd_1.render(height=84, width=84, camera_id=0))
          images_rd_2.append(physics_rd_2.render(height=84, width=84, camera_id=0))
          images_rd_3.append(physics_rd_3.render(height=84, width=84, camera_id=0))

          phi = np.random.uniform(0, np.pi)
          a = np.random.randint(1,70)
          b = np.random.randint(1,50)
          action = [np.cos(phi)*a, -np.cos(phi)*a, np.cos(phi)*a, np.cos(phi)*b, -np.cos(phi)*b, -np.cos(phi)*b]

          actions.append(action)

          apply_action(physics_canon, action)
          apply_action(physics_rd_1, action)
          apply_action(physics_rd_2, action)
          apply_action(physics_rd_3, action)

          physics_canon.step()
          physics_rd_1.step()
          physics_rd_2.step()
          physics_rd_3.step()

        images_combined = []
        for i in range(len(images_rd_1)):
          random_choice = random.choice([images_rd_1, images_rd_2, images_rd_3])
          images_combined.append(random_choice[i])


        img_canon_future = physics_canon.render(height=84, width=84, camera_id=0)

        #Save sequence informations
        sequence_data = {
            "canonical": images_canon,           # Canonical sequence [t-2, t-1, t0]
            "randomized": images_combined,       # Random sequence [t-2, t-1, t0]
            "actions": actions,                  # Actions [a-2, a-1, a0]
            "future_canon": img_canon_future     # Future canon image (t+1)
        }

        final_dataset.append(sequence_data)

        del sequence_data
        gc.collect()

        sequence_count += 1
        #print(f"Sequence {sequence_count}/{total_sequence} saved.")

        if sequence_count >= total_sequence:
                  break

  print("Sequences collected!")

  num = "dataset_sequences_" + n + ".pt"
  dataset_path = os.path.join(dataset_dir, num)
  torch.save(final_dataset, dataset_path)
  print(f"Dataset with {len(final_dataset)} sequences saved in {dataset_path}")
  del final_dataset
  gc.collect()

In [None]:
total_sequence = 25000   # -> 100,000 environment steps
part_sequence = total_sequence/4  #To deal with Colab memory restrictions
create_part_dataset(part_sequence, "1")
create_part_dataset(part_sequence, "2")
create_part_dataset(part_sequence, "3")
create_part_dataset(part_sequence, "4")

## MuJoCO Tests

In [None]:
agent_xml= output_dir + "/walker/walker.xml"

#Randomize skybox
skybox_xml = output_dir + "/walker/common/skybox.xml"
skybox_xml_random = output_dir + "/walker/common/skybox_random.xml"
random_skybox(skybox_xml, skybox_xml_random, markrgb_limit=10)

#Randomize materials
materials_xml = output_dir + "/walker/common/materials.xml"
materials_xml_random = output_dir + "/walker/common/materials_random.xml"
random_materials(materials_xml, materials_xml_random, rgb_limit=100)


mjcf_model = mjcf.from_path(agent_xml)

#Initialize simulator
physics = Physics.from_mjcf_model(mjcf_model)

#Set random initial state
initial_pos, initial_vel = set_random_initial_state(physics)
#sync_initial_state(physics, initial_pos, initial_vel)
#sync_initial_state(physics2, initial_pos, initial_vel)

#Show the frames
def plot_frame(physics):
    img = physics.render(height=256, width=256, camera_id=0)
    plt.imshow(img)
    plt.axis("off")
    plt.show()

#Function to apply an action
def apply_action(physics, action):
    # Imposta l'azione sui controlli
    physics.data.ctrl[:] = action

#Simulate some steps
for step in range(5):
    phi = np.random.uniform(0, np.pi)
    action = [np.cos(phi), -np.cos(phi), np.cos(phi), np.cos(phi), -np.cos(phi), -np.cos(phi)]
    apply_action(physics, action)
    physics.step()
    plot_frame(physics)
    print(physics.model.camera(0).pos[:])
    print("Sensor data:", physics.data.sensordata)