In [11]:
import gymnasium as gym
from gymnasium import spaces

import mujoco

import numpy as np
import matplotlib.pyplot as plt

import torch
from torch import nn
from torch import optim
from torch.nn import functional as F
from torch.utils.data import DataLoader, Dataset

import os
import random
from tqdm import tqdm

from collections import deque


In [12]:
class QuadExEnv(gym.Env):
    metadata = {"render_modes": ["human"], "render_fps": 60}

    def __init__(self, render_mode=None):
        super().__init__()
        
        # Load the URDF model
        model_path = "tesbot.urdf"
        if not os.path.exists(model_path):
            raise FileNotFoundError(f"Missing URDF: {model_path}")

        # Convert URDF to MJCF model
        self.model = mujoco.MjModel.from_xml_path(model_path)
        self.data = mujoco.MjData(self.model)

        # Action and observation space (8 joints)
        self.n_actuators = self.model.nu
        self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(self.n_actuators,), dtype=np.float32)

        obs_size = self.model.nq + self.model.nv
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(obs_size,), dtype=np.float32)

        self.render_mode = render_mode
        self.viewer = None

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        # Reset state
        self.data.qpos[:] = 0
        self.data.qvel[:] = 0
        mujoco.mj_forward(self.model, self.data)

        observation = self._get_obs()
        return observation, {}

    def step(self, action):
        # Clip to action limits
        action = np.clip(action, self.action_space.low, self.action_space.high)
        self.data.ctrl[:] = action

        mujoco.mj_step(self.model, self.data)

        obs = self._get_obs()

        # Basic reward for staying upright
        reward = 1.0 if self.data.qpos[2] > 0.05 else 0.0
        terminated = False
        truncated = False

        return obs, reward, terminated, truncated, {}

    def _get_obs(self):
        return np.concatenate([self.data.qpos, self.data.qvel]).copy()

    def render(self):
        if self.render_mode == "human":
            if self.viewer is None:
                self.viewer = mujoco.viewer.launch_passive(self.model, self.data)
            self.viewer.sync()

    def close(self):
        if self.viewer:
            self.viewer.close()
            self.viewer = None
env = QuadExEnv(render_mode="human")
env.reset()





(array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]), {})

In [None]:
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim, hidden_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [None]:
class ReplayMemory():
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = deque([], maxlen=capacity)
        self.position = 0

    def append(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

class QuadExAgent():
    def __init__(
            self,
            env: QuadExEnv,
            learning_rate: float,
            hidden_dim: int,
            batch_size: int,

    ):
        self.env = env
        self.learning_rate = learning_rate
        self.hidden_dim = hidden_dim
        self.batch_size = batch_size

        # Create the neural network model
        self.model = nn.Sequential(
            nn.Linear(env.observation_space.shape[0], hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, env.action_space.shape[0]),
            nn.Tanh()
        )

        # Optimizer
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        # Loss function
        self.criterion = nn.MSELoss()
        # Replay buffer
        self.buffer = []
    def run(self, is_training: bool, num_episodes: int, render: bool = False):
        env = QuadExEnv(render_mode="human") if render else QuadExEnv()
        policy_dqn = DQN(env.observation_space.shape[0], env.action_space.shape[0], self.hidden_dim).to(device)

        if is_training:
            memory = ReplayMemory(10000)
        obs, _ = env.reset()

        while True:
            action = env.action_space.sample()

            new_state, reward, terminated, _, info = env.step(action)
            if is_training:
                memory.append((new_state, action, reward, terminated))
                
            if terminated:
                break
        env.close()
