<a href="https://colab.research.google.com/github/cmrn-crmns-phl/abm/blob/master/hdl_v5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install mediapy
!pip install mujoco
!sudo apt-get install -y python3-opengl
!pip install pyvirtualdisplay
!pip install piglet

!pip install xvfb -y

import time
import mediapy as media
import mujoco
import mujoco.viewer
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import xml.etree.ElementTree as ET
import sys

tree = ET.parse("/content/xml/trexml.xml")
root = tree.getroot()

cam = mujoco.MjvCamera()
# opt = mujoco.MjvOption()

""" initialize state size and action size, action size is the number of actuators"""

""" Q NETWORK """


class MultiJointQNetwork(nn.Module):
    def __init__(self, input_size=108, output_size=54):
        super(MultiJointQNetwork, self).__init__()

        self.fc1 = nn.Linear(input_size, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 64)
        self.fc4 = nn.Linear(64, output_size)

    def forward(self, x):

        x = F.relu(self.fc1(x))
        # print(x)
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        # x = torch.tanh(self.fc4(x))

        return x

def update_q_network(
    state, action, reward, next_state, done, q_network, optimizer, loss_function
):

    state_normalized = (state - state.mean()) / (state.std() + 1e-8)
    next_state_normalized = (next_state - next_state.mean()) / (next_state.std() + 1e-8)

    next_q_values = q_network(next_state_normalized).detach()
    next_action_value = next_q_values.max()

    if not isinstance(reward, torch.Tensor):
        reward = torch.tensor(reward, dtype=torch.float32).view(1, -1)

    control_signal = [0.0] * len(get_actuators())

    # # Compute the target Q-value
    # next_q_values = q_network(next_state).detach()
    # next_action_value = next_q_values.max()

    # if not isinstance(next_action_value, torch.Tensor):
    #     next_action_value = torch.tensor(next_action_value, dtype=torch.float32)

    target = reward + (
        0.99 * next_action_value * (1 - done)
    )  # Discount factor gamma = 0.99

    target = target.view(1, -1).float()

    current_q_values = q_network(state_normalized)

    action = action.view(-1, 1).long()

    action = action.view(1, 54)

    # current_q_value = q_network(state)[0]
    current_q_value = current_q_values

    # Compute loss (difference between predicted Q-value and target Q-value)
    loss = loss_function(current_q_value, target)

    # Perform backpropagation to update Q-network
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    return loss


def select_action(state, q_network):

    with torch.no_grad():
        # print("controls")
        control_signals = q_network(state) *200

        # print(control_signals)

    # exploration_noise = np.random.normal(0, 0.1, size=control_signals.shape)
    # action_with_noise = control_signals + torch.tensor(exploration_noise, dtype=torch.float32)

    return control_signals  # Return the control signal as the chosen action



def check_termination(data, model, state, max_time=15.0):

    # print(f"Gravity: {model.opt.gravity}")
    # print(f"Gravity :  {data.qpos}")
    # print(f"initial joint positions {data.qvel}")

    # print(f"control signals : {data.ctrl}")
    # print(f"solver timestep: {model.opt.timestep}")
    # print(f"Integrator: {model.opt.integrator}")

    # time.sleep(1)
    """
    Checks whether the current episode should terminate.

    Parameters:
    - data: The MuJoCo data object.
    - max_time: Maximum time (in seconds) for the episode.

    Returns:
    - done: Boolean indicating whether the episode should terminate.
    """
    # print(data.geom_xpos[17])
    # mujoco.mj_kinematics(model, data)

    """ this gets the height of the pelvis -- z direction up and down.  in meters?
    ok i need to print the hip height and nothing else, I need to know when
    gravity hits it , when does it begin to fall down
    the videos sometimes fall from gravity AFTER it tests a bunch of random limb movements
    then the limbs go limp, they don't move at all once gravity hits and the bot falls down """
    height = data.geom("HIPS").xpos[2]

    # print(height)

    if height < 1.0:
        print("Terminated : T. rex fell down!!!")
        print(3 - height)
        print(height)

        print("------------------\n\n\n")
        return True

    # Condition 2: Check if the maximum time has been reached
    if data.time >= max_time:
        print(data.time)
        print(max_time)
        print("Terminated: Maximum time reached.")
        return True

    # Additional conditions could include joint angle limits, distance covered, etc.

    # If no termination conditions are met
    return False


def compute_reward(data, previous_hip_pos):
    """Note ::: id 1 is torso3_1_1, id 2 is torso_3_1_2, skull is 17
    ok if the forward reward is the movement forward of the hips, it can be the difference between
    the previous x position of the hips, and the current x position
    since my model points in the -x direction, we want the torso position to reward increasingly negative positions
    """

    hips_x_coord = data.geom("HIPS").xpos[0]

    hip_height = data.geom("HIPS").xpos[2]

    if previous_hip_pos > hips_x_coord:

        forward_motion_reward = 1 + previous_hip_pos - hips_x_coord

    else:

        forward_motion_reward = 0

    # print("forward motion reward")

    # print(forward_motion_reward)

    stability_penalty = -1 if hip_height < 2.0 else 0

    reward = forward_motion_reward + stability_penalty

    return reward

    # current_x_pos = data.geom("HIPS").xpos[0]
    # print(current_x_pos)

    # forward_motion =  abs(previous_x_pos - current_x_pos)
    # print(forward_motion)
    # mj_forward(model, data) could be real

    # forward_motion = current_x_pos - previous_hip_position

def get_actuators():

    ids = {}
    num = 0
    for motor in root.findall(".//motor"):
        motor_name = motor.get("name")
        if motor_name is not None:
            # print(motor_name)
            num = num + 1
            ids.update({motor_name: data.actuator(motor_name).id})

    if len(ids) < 1:
        for motor in root.findall(".//general"):
            motor_name = motor.get("name")
            if motor_name is not None:
                # print(motor_name)
                num = num + 1
                ids.update({motor_name: data.actuator(motor_name).id})
        # ids.append(data.joint(joint_name).id)
    return ids

def get_joint_positions():
    ids = []
    for joint in root.findall(".//joint"):
        joint_name = joint.get("name")
        if joint_name is not None:
            # data.geom("HIPS").xpos[2]

            # print(joint_name)

            # print(data.joint(joint_name).id)

            ids.append(
                (joint_name, data.joint(joint_name).id, data.joint(joint_name).qpos)
            )
            # ids.append(data.joint(joint_name).id)
    # print(ids)
    return ids

def get_joint_names_positions():

    names = []

    for joint in root.findall(".//joint"):

        joint_name = joint.get("name")

        if joint_name is not None:

            b = (model.joint(joint_name).id, joint_name, model.joint(joint_name).pos)

            """model joint pos == the x y and z coordinates
                model joint range == the -1 to 1 range of joint motion ; """

            names.append(b)

    return names


def get_state(data):
    joint_positions = data.qpos[:54]
    joint_velocities = data.qvel[:54]

    # joint_positions_fn = get_joint_positions()

    # state = torch.tensor(state, joint_positions + joint_velocities, dtype=torch.float32)

    state = np.concatenate([joint_positions, joint_velocities])

    state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
    # print("state of the union")
    # print(state.shape)

    # print(data.joint("torso1_joint2").qpos)

    return state


model = mujoco.MjModel.from_xml_path(
    "/content/xml/trexml.xml"
)
data = mujoco.MjData(model)

input_size = 108  # Assuming 54 joints with position and velocity (54x2)
output_size = 54  # One control signal per joint

q_network = MultiJointQNetwork(input_size, output_size)
optimizer = optim.Adam(q_network.parameters(), lr=0.003)
loss_function = nn.MSELoss()

state_size = len(np.concatenate([data.qpos, data.qvel]))
action_size = model.nu

results = []
# all_results = []

scene_option = mujoco.MjvOption()
""" display joint axes and show contact points"""
scene_option.flags[mujoco.mjtVisFlag.mjVIS_JOINT] = True
scene_option.flags[mujoco.mjtVisFlag.mjVIS_CONTACTPOINT] = True
scene_option.flags[mujoco.mjtVisFlag.mjVIS_CONTACTFORCE] = True
scene_option.flags[mujoco.mjtVisFlag.mjVIS_ACTUATOR] = True
scene_option.flags[mujoco.mjtVisFlag.mjVIS_ACTIVATION] = True
# scene_option.flags[mujoco.mjtTextureRole.mjTEXROLE_OPACITY] = False

# print(scene_option.flags[mujoco.mjtTextureRole.mjTEXROLE_OPACITY])
# sys.exit()

num_actions = model.nu
weights = np.random.randn(num_actions)

viewer = mujoco.viewer.launch_passive(model, data)
print("------")

eps = range(1000)
sim_duration = 10
perturbation = 1e-7

n_steps = int(sim_duration / model.opt.timestep)
sim_time = np.zeros(n_steps)
angle = np.zeros(n_steps)
energy = np.zeros(n_steps)


best_model_path = "/content/trained/best_model.pth"
latest_model_path = best_model_path = (
    "/content/trained/latest_model.pth"
)



best_reward = -float("inf")

for k in eps:

    print(str(k) + " of " + str(len(eps)) + " episodes.")

    mujoco.mj_resetData(model, data)
    # mujoco.mj_
    # print(state)

    """ i am pretty sure qvel = joint velocities , each
        joint should have a velocity set either by index number
        or actuator I think? """

    state = get_state(data)
    model.opt.gravity[:] = [0, 0, -9.81]
    data.qvel[:] += perturbation * model.nv
    # print(state)
    # sys.exit()

    """ reset the whole thing, start a new episode from zero at the beginning of each loop
    I don't understand how the reward is saved or used with a policy, I bet that's what I need to figure out"""

    total_reward = 0
    done = False
    frames = []

    mujoco.mj_resetDataKeyframe(model, data, 0)

    """ resetting the data keyframe to 0 ?"""

    # data.qfrc_actuator

    with mujoco.Renderer(model, height=480, width=640) as renderer:

        renderer.update_scene(data)

        mujoco.mj_forward(model, data)

        # with mujoco.viewer.launch_passive(model, data) as viewer:
        t = 0

        n_steps = 0

        while not done:

            tic = time.time()

            mujoco.mj_step(model, data)
            # mujoco.mj_kinematics(model, data)

            """i think previous_hip_position needs to be a global?"""
            previous_hip_position = data.geom("HIPS").xpos[0]

            action = select_action(state, q_network)  # Select action

            # for joint_idx in range(54):

            #     if joint_idx!=0:



                #     print("joint_idx")
                #     print(joint_idx)
                #     print(data.ctrl[joint_idx])

                    # """ I think data.ctrl is set by action, but why won't this thing work on indices
                    #     """


            data.ctrl = action


            # mujoco.mj_step(model, data)

            reward = compute_reward(data, previous_hip_position)

            if random.randrange(150) == 86:
        #         print(data.qfrc_actuator[joint_idx])
                print("\ntime:")
                print(time.strftime("%H:%M.%S"))
                print("[episode]")
                print(k)
                print("[reward]")
                print(reward)

            total_reward += reward

            renderer.update_scene(data, scene_option=scene_option)

            if total_reward > best_reward:

                best_reward = total_reward

                torch.save(q_network.state_dict(), best_model_path)

                print(f"new best model saved with reward: {best_reward}")

            pixels = renderer.render()

            frames.append(pixels)

            next_state = get_state(data)

            hips_x_coord = data.geom("HIPS").xpos[0]

            # print(hips_x_coord)
            # print(f"Hips x coord: {hips_x_coord}")
            hips_x_coord = data.geom("HIPS").xpos[2]

            done = check_termination(data, model, state)

            state_normalized = (state - state.mean()) / (state.std() + 1e-8)

            update_q_network(
                state,
                action,
                reward,
                next_state,
                done,
                q_network,
                optimizer,
                loss_function,
            )

            state = next_state




    """ next state is new and different than the first state, the loop updates itself i think"""


    print(f"Total Reward: {total_reward}")
    results.append(total_reward)

    media.write_video(
        "/content/results_videos/loop_"
        + str(k)
        + ".mp4",
        frames,
        fps=60
    )

    # if k % 100==0:

    #     media.write_video(
    #         "/Users/cameronpahl/projects/HIPS_DONT_LIE/LearningHumanoidWalking/results_videos/loop_"
    #         + str(k)
    #         + ".mp4",
    #         frames,
    #         fps=60
    #     )

    # /content/feedback

    with open("/content/feedback/feedback.txt","a") as f:

        print(f"Episode {str(k) + str(1)}: Total Reward = {total_reward:.2f}", file=f)
        print(f"Total reward FINAL: {total_reward}", file=f)
        print(f"ALL REWARDS FINAL: {results}", file=f)
        print("HIGHEST REWARD", file=f)
        print(max(enumerate(results), key=lambda x: x[1]), file=f)

        print(f"Episode {str(k) + str(1)}: Total Reward = {total_reward:.2f}", file=f)
        print("Total number of DoFs in the model:", model.nv, file=f)
        # print("Generalized positions:", data.qpos)
        # print("Generalized velocities:", data.qvel)

        # media.show_video(frames, fps=30)
        print(f"Joint Positions: {data.qpos}", file=f)
        print(f"Joint Velocities: {data.qvel}", file=f)
        print(f"Torques: {data.qfrc_actuator}", file=f)






[31mERROR: Could not find a version that satisfies the requirement xvfb (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for xvfb[0m[31m
[0m