<a href="https://colab.research.google.com/github/atlasrikanth/Real-Time-Multimodal-AI-for-Autonomous-Vehicle-Perception-and-Decision-Making/blob/main/Untitled9.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
{
  "cells": [
    {
      "cell_type": "code",
      "source": [
        "# Mount Google Drive\n",
        "from google.colab import drive\n",
        "drive.mount('/content/drive')\n",
        "\n",
        "# Install dependencies\n",
        "!pip install torch torchvision opencv-python-headless numpy\n",
        "!pip install ultralytics  # YOLOv8\n",
        "!pip install torch-geometric  # PointNet++\n",
        "!pip install stable-baselines3 gymnasium  # RL\n",
        "\n",
        "# Verify installations\n",
        "import torch\n",
        "print(torch.__version__, torch.cuda.is_available())\n",
        "from ultralytics import YOLO\n",
        "import cv2\n",
        "print(cv2.__version__)\n",
        "\n",
        "# Create directories for datasets and models\n",
        "!mkdir -p /content/drive/MyDrive/autonomous_ai/datasets\n",
        "!mkdir -p /content/drive/MyDrive/autonomous_ai/models\n"
      ],
      "metadata": {},
      "execution_count": None,
      "outputs": []
    }
  ]
}

{'cells': [{'cell_type': 'code',
   'source': ['# Mount Google Drive\n',
    'from google.colab import drive\n',
    "drive.mount('/content/drive')\n",
    '\n',
    '# Install dependencies\n',
    '!pip install torch torchvision opencv-python-headless numpy\n',
    '!pip install ultralytics  # YOLOv8\n',
    '!pip install torch-geometric  # PointNet++\n',
    '!pip install stable-baselines3 gymnasium  # RL\n',
    '\n',
    '# Verify installations\n',
    'import torch\n',
    'print(torch.__version__, torch.cuda.is_available())\n',
    'from ultralytics import YOLO\n',
    'import cv2\n',
    'print(cv2.__version__)\n',
    '\n',
    '# Create directories for datasets and models\n',
    '!mkdir -p /content/drive/MyDrive/autonomous_ai/datasets\n',
    '!mkdir -p /content/drive/MyDrive/autonomous_ai/models\n'],
   'metadata': {},
   'execution_count': None,
   'outputs': []}]}

In [8]:
{
  "cells": [
    {
      "cell_type": "code",
      "source": [
        "import os\n",
        "import cv2\n",
        "import numpy as np\n",
        "import torch\n",
        "from torch.utils.data import Dataset, DataLoader\n",
        "\n",
        "# Download nuScenes mini dataset\n",
        "!wget -P /content/drive/MyDrive/autonomous_ai/datasets https://www.nuscenes.org/data/nuscenes-mini-v1.0.tar.gz\n",
        "!tar -xzf /content/drive/MyDrive/autonomous_ai/datasets/nuscenes-mini-v1.0.tar.gz -C /content/drive/MyDrive/autonomous_ai/datasets\n",
        "\n",
        "class NuScenesDataset(Dataset):\n",
        "    def __init__(self, data_dir, split='mini_train'):\n",
        "        from nuscenes.nuscenes import NuScenes\n",
        "        self.nusc = NuScenes(version='v1.0-mini', dataroot=data_dir, verbose=True)\n",
        "        self.samples = [s for s in self.nusc.sample if s['scene_token'] in self.nusc.get('scene', split)]\n",
        "        self.camera = 'CAM_FRONT'\n",
        "    \n",
        "    def __len__(self):\n",
        "        return len(self.samples)\n",
        "    \n",
        "    def __getitem__(self, idx):\n",
        "        sample = self.nusc.sample[idx]\n",
        "        # Load camera image\n",
        "        cam_data = self.nusc.get('sample_data', sample['data'][self.camera])\n",
        "        img_path = os.path.join(self.nusc.dataroot, cam_data['filename'])\n",
        "        img = cv2.imread(img_path)\n",
        "        img = cv2.resize(img, (640, 640))\n",
        "        img = torch.tensor(img.transpose(2, 0, 1), dtype=torch.float32) / 255.0\n",
        "        \n",
        "        # Load LiDAR point cloud (limit to 1000 points to fit RAM)\n",
        "        lidar_data = self.nusc.get('sample_data', sample['data']['LIDAR_TOP'])\n",
        "        lidar_path = os.path.join(self.nusc.dataroot, lidar_data['filename'])\n",
        "        points = np.fromfile(lidar_path, dtype=np.float32).reshape(-1, 4)[:1000, :3]\n",
        "        points = torch.tensor(points, dtype=torch.float32)\n",
        "        \n",
        "        # Load annotations\n",
        "        boxes = []\n",
        "        for ann_token in sample['anns']:\n",
        "            ann = self.nusc.get('sample_annotation', ann_token)\n",
        "            if ann['category_name'] in ['vehicle.car', 'human.pedestrian']:\n",
        "                box = ann['bbox']\n",
        "                boxes.append(box)\n",
        "        boxes = torch.tensor(boxes, dtype=torch.float32) if boxes else torch.zeros((0, 4))\n",
        "        \n",
        "        return img, points, boxes\n",
        "\n",
        "# Initialize dataset\n",
        "dataset = NuScenesDataset(data_dir='/content/drive/MyDrive/autonomous_ai/datasets/nuscenes-mini-v1.0')\n",
        "dataloader = DataLoader(dataset, batch_size=4, shuffle=True)  # Smaller batch size for Colab\n"
      ],
      "metadata": {},
      "execution_count": None,
      "outputs": []
    }
  ]
}

{'cells': [{'cell_type': 'code',
   'source': ['import os\n',
    'import cv2\n',
    'import numpy as np\n',
    'import torch\n',
    'from torch.utils.data import Dataset, DataLoader\n',
    '\n',
    '# Download nuScenes mini dataset\n',
    '!wget -P /content/drive/MyDrive/autonomous_ai/datasets https://www.nuscenes.org/data/nuscenes-mini-v1.0.tar.gz\n',
    '!tar -xzf /content/drive/MyDrive/autonomous_ai/datasets/nuscenes-mini-v1.0.tar.gz -C /content/drive/MyDrive/autonomous_ai/datasets\n',
    '\n',
    'class NuScenesDataset(Dataset):\n',
    "    def __init__(self, data_dir, split='mini_train'):\n",
    '        from nuscenes.nuscenes import NuScenes\n',
    "        self.nusc = NuScenes(version='v1.0-mini', dataroot=data_dir, verbose=True)\n",
    "        self.samples = [s for s in self.nusc.sample if s['scene_token'] in self.nusc.get('scene', split)]\n",
    "        self.camera = 'CAM_FRONT'\n",
    '    \n',
    '    def __len__(self):\n',
    '        return len(self.sam

In [9]:
{
  "cells": [
    {
      "cell_type": "code",
      "source": [
        "from ultralytics import YOLO\n",
        "import torch\n",
        "\n",
        "# Load pre-trained YOLOv8 nano model\n",
        "model = YOLO('yolov8n.pt')\n",
        "\n",
        "# Custom training configuration\n",
        "data_config = \"\"\"\n",
        "train: /content/drive/MyDrive/autonomous_ai/datasets/nuscenes-mini-v1.0/train\n",
        "val: /content/drive/MyDrive/autonomous_ai/datasets/nuscenes-mini-v1.0/val\n",
        "nc: 2\n",
        "names: ['car', 'pedestrian']\n",
        "\"\"\"\n",
        "\n",
        "# Save config\n",
        "with open('/content/nuscenes.yaml', 'w') as f:\n",
        "    f.write(data_config)\n",
        "\n",
        "# Train model\n",
        "model.train(data='/content/nuscenes.yaml', epochs=20, imgsz=640, batch=4, device=0)  # Reduced epochs and batch\n",
        "\n",
        "# Save model to Drive\n",
        "model.save('/content/drive/MyDrive/autonomous_ai/models/yolo_autonomous.pt')\n"
      ],
      "metadata": {},
      "execution_count": None,
      "outputs": []
    }
  ]
}

{'cells': [{'cell_type': 'code',
   'source': ['from ultralytics import YOLO\n',
    'import torch\n',
    '\n',
    '# Load pre-trained YOLOv8 nano model\n',
    "model = YOLO('yolov8n.pt')\n",
    '\n',
    '# Custom training configuration\n',
    'data_config = """\n',
    'train: /content/drive/MyDrive/autonomous_ai/datasets/nuscenes-mini-v1.0/train\n',
    'val: /content/drive/MyDrive/autonomous_ai/datasets/nuscenes-mini-v1.0/val\n',
    'nc: 2\n',
    "names: ['car', 'pedestrian']\n",
    '"""\n',
    '\n',
    '# Save config\n',
    "with open('/content/nuscenes.yaml', 'w') as f:\n",
    '    f.write(data_config)\n',
    '\n',
    '# Train model\n',
    "model.train(data='/content/nuscenes.yaml', epochs=20, imgsz=640, batch=4, device=0)  # Reduced epochs and batch\n",
    '\n',
    '# Save model to Drive\n',
    "model.save('/content/drive/MyDrive/autonomous_ai/models/yolo_autonomous.pt')\n"],
   'metadata': {},
   'execution_count': None,
   'outputs': []}]}

In [10]:
{
  "cells": [
    {
      "cell_type": "code",
      "source": [
        "import torch\n",
        "import torch.nn as nn\n",
        "from torch_geometric.nn import PointNetConv, global_max_pool\n",
        "from torch_geometric.data import Data, DataLoader as GeoDataLoader\n",
        "\n",
        "class PointNetSeg(nn.Module):\n",
        "    def __init__(self, num_classes=2):\n",
        "        super(PointNetSeg, self).__init__()\n",
        "        self.conv1 = PointNetConv(local_nn=nn.Sequential(\n",
        "            nn.Linear(3, 32), nn.ReLU(), nn.Linear(32, 32)))  # Reduced size\n",
        "        self.conv2 = PointNetConv(local_nn=nn.Sequential(\n",
        "            nn.Linear(35, 64), nn.ReLU(), nn.Linear(64, 64)))\n",
        "        self.fc = nn.Linear(64, num_classes)\n",
        "    \n",
        "    def forward(self, data):\n",
        "        x, pos, batch = data.x, data.pos, data.batch\n",
        "        x = self.conv1(x, pos, batch)\n",
        "        x = self.conv2(x, pos, batch)\n",
        "        x = global_max_pool(x, batch)\n",
        "        x = self.fc(x)\n",
        "        return x\n",
        "\n",
        "def create_pointnet_data(points, labels):\n",
        "    return Data(pos=torch.tensor(points, dtype=torch.float32), y=torch.tensor(labels, dtype=torch.long))\n",
        "\n",
        "# Training loop\n",
        "model = PointNetSeg(num_classes=2).cuda()\n",
        "optimizer = torch.optim.Adam(model.parameters(), lr=0.001)\n",
        "criterion = nn.CrossEntropyLoss()\n",
        "\n",
        "for epoch in range(20):  # Reduced epochs\n",
        "    model.train()\n",
        "    for batch in dataloader:\n",
        "        points, labels = batch[1].cuda(), batch[2].cuda()\n",
        "        data_list = [create_pointnet_data(p, l[:1]) for p, l in zip(points, labels)]  # Simplified labels\n",
        "        batch_data = GeoDataLoader(data_list, batch_size=4)\n",
        "        optimizer.zero_grad()\n",
        "        out = model(batch_data)\n",
        "        loss = criterion(out, batch_data.y)\n",
        "        loss.backward()\n",
        "        optimizer.step()\n",
        "    print(f'Epoch {epoch}, Loss: {loss.item()}')\n",
        "\n",
        "torch.save(model.state_dict(), '/content/drive/MyDrive/autonomous_ai/models/pointnet_seg.pt')\n"
      ],
      "metadata": {},
      "execution_count": None,
      "outputs": []
    }
  ]
}

{'cells': [{'cell_type': 'code',
   'source': ['import torch\n',
    'import torch.nn as nn\n',
    'from torch_geometric.nn import PointNetConv, global_max_pool\n',
    'from torch_geometric.data import Data, DataLoader as GeoDataLoader\n',
    '\n',
    'class PointNetSeg(nn.Module):\n',
    '    def __init__(self, num_classes=2):\n',
    '        super(PointNetSeg, self).__init__()\n',
    '        self.conv1 = PointNetConv(local_nn=nn.Sequential(\n',
    '            nn.Linear(3, 32), nn.ReLU(), nn.Linear(32, 32)))  # Reduced size\n',
    '        self.conv2 = PointNetConv(local_nn=nn.Sequential(\n',
    '            nn.Linear(35, 64), nn.ReLU(), nn.Linear(64, 64)))\n',
    '        self.fc = nn.Linear(64, num_classes)\n',
    '    \n',
    '    def forward(self, data):\n',
    '        x, pos, batch = data.x, data.pos, data.batch\n',
    '        x = self.conv1(x, pos, batch)\n',
    '        x = self.conv2(x, pos, batch)\n',
    '        x = global_max_pool(x, batch)\n',
    '   

In [11]:
{
  "cells": [
    {
      "cell_type": "code",
      "source": [
        "import torch\n",
        "import torch.nn as nn\n",
        "from ultralytics import YOLO\n",
        "\n",
        "class FusionTransformer(nn.Module):\n",
        "    def __init__(self, input_dim=68, hidden_dim=128, num_heads=4):  # Reduced size\n",
        "        super(FusionTransformer, self).__init__()\n",
        "        self.encoder = nn.TransformerEncoder(\n",
        "            nn.TransformerEncoderLayer(d_model=input_dim, nhead=num_heads, dim_feedforward=hidden_dim),\n",
        "            num_layers=1  # Single layer for Colab\n",
        "        )\n",
        "        self.fc = nn.Linear(input_dim, 64)\n",
        "    \n",
        "    def forward(self, yolo_features, lidar_features):\n",
        "        combined = torch.cat((yolo_features, lidar_features), dim=-1)\n",
        "        fused = self.encoder(combined)\n",
        "        return self.fc(fused)\n",
        "\n",
        "# Load models\n",
        "yolo_model = YOLO('/content/drive/MyDrive/autonomous_ai/models/yolo_autonomous.pt')\n",
        "pointnet = PointNetSeg().cuda()\n",
        "pointnet.load_state_dict(torch.load('/content/drive/MyDrive/autonomous_ai/models/pointnet_seg.pt'))\n",
        "fusion_model = FusionTransformer().cuda()\n",
        "\n",
        "optimizer = torch.optim.Adam(fusion_model.parameters(), lr=0.001)\n",
        "criterion = nn.MSELoss()\n",
        "\n",
        "for epoch in range(10):  # Reduced epochs\n",
        "    for batch in dataloader:\n",
        "        img, points, boxes = batch\n",
        "        img, points = img.cuda(), points.cuda()\n",
        "        yolo_out = yolo_model(img)\n",
        "        yolo_features = yolo_out[0].boxes.xywh\n",
        "        point_data = create_pointnet_data(points, boxes)\n",
        "        lidar_features = pointnet(point_data)\n",
        "        fused = fusion_model(yolo_features, lidar_features)\n",
        "        target = torch.zeros_like(fused)  # Dummy target\n",
        "        loss = criterion(fused, target)\n",
        "        optimizer.zero_grad()\n",
        "        loss.backward()\n",
        "        optimizer.step()\n",
        "    print(f'Epoch {epoch}, Loss: {loss.item()}')\n",
        "\n",
        "torch.save(fusion_model.state_dict(), '/content/drive/MyDrive/autonomous_ai/models/fusion_transformer.pt')\n"
      ],
      "metadata": {},
      "execution_count": None,
      "outputs": []
    }
  ]
}

{'cells': [{'cell_type': 'code',
   'source': ['import torch\n',
    'import torch.nn as nn\n',
    'from ultralytics import YOLO\n',
    '\n',
    'class FusionTransformer(nn.Module):\n',
    '    def __init__(self, input_dim=68, hidden_dim=128, num_heads=4):  # Reduced size\n',
    '        super(FusionTransformer, self).__init__()\n',
    '        self.encoder = nn.TransformerEncoder(\n',
    '            nn.TransformerEncoderLayer(d_model=input_dim, nhead=num_heads, dim_feedforward=hidden_dim),\n',
    '            num_layers=1  # Single layer for Colab\n',
    '        )\n',
    '        self.fc = nn.Linear(input_dim, 64)\n',
    '    \n',
    '    def forward(self, yolo_features, lidar_features):\n',
    '        combined = torch.cat((yolo_features, lidar_features), dim=-1)\n',
    '        fused = self.encoder(combined)\n',
    '        return self.fc(fused)\n',
    '\n',
    '# Load models\n',
    "yolo_model = YOLO('/content/drive/MyDrive/autonomous_ai/models/yolo_autonomous.p

In [12]:
{
  "cells": [
    {
      "cell_type": "code",
      "source": [
        "import gymnasium as gym\n",
        "import numpy as np\n",
        "from stable_baselines3 import PPO\n",
        "import torch\n",
        "\n",
        "class ProxyDrivingEnv(gym.Env):\n",
        "    def __init__(self):\n",
        "        super(ProxyDrivingEnv, self).__init__()\n",
        "        self.env = gym.make('CarRacing-v2')\n",
        "        self.action_space = self.env.action_space\n",
        "        self.observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(64,), dtype=np.float32)\n",
        "        self.fusion_model = FusionTransformer().cuda()\n",
        "        self.fusion_model.load_state_dict(torch.load('/content/drive/MyDrive/autonomous_ai/models/fusion_transformer.pt'))\n",
        "    \n",
        "    def reset(self, seed=None):\n",
        "        obs, _ = self.env.reset(seed=seed)\n",
        "        return self._get_obs(obs), {}\n",
        "    \n",
        "    def step(self, action):\n",
        "        obs, reward, done, truncated, info = self.env.step(action)\n",
        "        return self._get_obs(obs), reward, done or truncated, info\n",
        "    \n",
        "    def _get_obs(self, obs):\n",
        "        # Simulate fused features (replace with real YOLO/PointNet inputs if available)\n",
        "        dummy_yolo = torch.ones((1, 4)).cuda()\n",
        "        dummy_lidar = torch.ones((1, 64)).cuda()\n",
        "        fused = self.fusion_model(dummy_yolo, dummy_lidar)\n",
        "        return fused.detach().cpu().numpy().flatten()\n",
        "\n",
        "# Train RL agent\n",
        "env = ProxyDrivingEnv()\n",
        "model = PPO('MlpPolicy', env, verbose=1)\n",
        "model.learn(total_timesteps=50000)  # Reduced timesteps\n",
        "\n",
        "# Save model\n",
        "model.save('/content/drive/MyDrive/autonomous_ai/models/ppo_autonomous.zip')\n"
      ],
      "metadata": {},
      "execution_count": None,
      "outputs": []
    }
  ]
}

{'cells': [{'cell_type': 'code',
   'source': ['import gymnasium as gym\n',
    'import numpy as np\n',
    'from stable_baselines3 import PPO\n',
    'import torch\n',
    '\n',
    'class ProxyDrivingEnv(gym.Env):\n',
    '    def __init__(self):\n',
    '        super(ProxyDrivingEnv, self).__init__()\n',
    "        self.env = gym.make('CarRacing-v2')\n",
    '        self.action_space = self.env.action_space\n',
    '        self.observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(64,), dtype=np.float32)\n',
    '        self.fusion_model = FusionTransformer().cuda()\n',
    "        self.fusion_model.load_state_dict(torch.load('/content/drive/MyDrive/autonomous_ai/models/fusion_transformer.pt'))\n",
    '    \n',
    '    def reset(self, seed=None):\n',
    '        obs, _ = self.env.reset(seed=seed)\n',
    '        return self._get_obs(obs), {}\n',
    '    \n',
    '    def step(self, action):\n',
    '        obs, reward, done, truncated, info = self.env.step

In [13]:
{
  "cells": [
    {
      "cell_type": "code",
      "source": [
        "import torch\n",
        "from ultralytics import YOLO\n",
        "import time\n",
        "\n",
        "# Load models\n",
        "yolo_model = YOLO('/content/drive/MyDrive/autonomous_ai/models/yolo_autonomous.pt')\n",
        "pointnet = PointNetSeg().cuda()\n",
        "pointnet.load_state_dict(torch.load('/content/drive/MyDrive/autonomous_ai/models/pointnet_seg.pt'))\n",
        "fusion_model = FusionTransformer().cuda()\n",
        "fusion_model.load_state_dict(torch.load('/content/drive/MyDrive/autonomous_ai/models/fusion_transformer.pt'))\n",
        "\n",
        "# Test inference latency\n",
        "x_img = torch.ones((1, 3, 640, 640)).cuda()\n",
        "x_points = torch.ones((1, 1000, 3)).cuda()\n",
        "x_fusion = torch.ones((1, 68)).cuda()\n",
        "\n",
        "start = time.time()\n",
        "yolo_model(x_img)\n",
        "pointnet(create_pointnet_data(x_points, None))\n",
        "fusion_model(x_fusion, x_fusion)\n",
        "latency = (time.time() - start) * 1000\n",
        "print(f'Inference time: {latency:.2f} ms')\n",
        "\n",
        "# Save models (already optimized for size)\n",
        "torch.save(yolo_model.model.state_dict(), '/content/drive/MyDrive/autonomous_ai/models/yolo_optimized.pt')\n",
        "torch.save(pointnet.state_dict(), '/content/drive/MyDrive/autonomous_ai/models/pointnet_optimized.pt')\n",
        "torch.save(fusion_model.state_dict(), '/content/drive/MyDrive/autonomous_ai/models/fusion_optimized.pt')\n"
      ],
      "metadata": {},
      "execution_count": None,
      "outputs": []
    }
  ]
}

{'cells': [{'cell_type': 'code',
   'source': ['import torch\n',
    'from ultralytics import YOLO\n',
    'import time\n',
    '\n',
    '# Load models\n',
    "yolo_model = YOLO('/content/drive/MyDrive/autonomous_ai/models/yolo_autonomous.pt')\n",
    'pointnet = PointNetSeg().cuda()\n',
    "pointnet.load_state_dict(torch.load('/content/drive/MyDrive/autonomous_ai/models/pointnet_seg.pt'))\n",
    'fusion_model = FusionTransformer().cuda()\n',
    "fusion_model.load_state_dict(torch.load('/content/drive/MyDrive/autonomous_ai/models/fusion_transformer.pt'))\n",
    '\n',
    '# Test inference latency\n',
    'x_img = torch.ones((1, 3, 640, 640)).cuda()\n',
    'x_points = torch.ones((1, 1000, 3)).cuda()\n',
    'x_fusion = torch.ones((1, 68)).cuda()\n',
    '\n',
    'start = time.time()\n',
    'yolo_model(x_img)\n',
    'pointnet(create_pointnet_data(x_points, None))\n',
    'fusion_model(x_fusion, x_fusion)\n',
    'latency = (time.time() - start) * 1000\n',
    "print(f'Inference

In [14]:
{
  "cells": [
    {
      "cell_type": "code",
      "source": [
        "import gymnasium as gym\n",
        "from ultralytics import YOLO\n",
        "import torch\n",
        "from stable_baselines3 import PPO\n",
        "\n",
        "# Load models\n",
        "yolo_model = YOLO('/content/drive/MyDrive/autonomous_ai/models/yolo_optimized.pt')\n",
        "pointnet = PointNetSeg().cuda()\n",
        "pointnet.load_state_dict(torch.load('/content/drive/MyDrive/autonomous_ai/models/pointnet_optimized.pt'))\n",
        "fusion_model = FusionTransformer().cuda()\n",
        "fusion_model.load_state_dict(torch.load('/content/drive/MyDrive/autonomous_ai/models/fusion_optimized.pt'))\n",
        "rl_model = PPO.load('/content/drive/MyDrive/autonomous_ai/models/ppo_autonomous.zip')\n",
        "\n",
        "# Test in CarRacing-v2\n",
        "env = gym.make('CarRacing-v2', render_mode='human')\n",
        "obs, _ = env.reset()\n",
        "for _ in range(1000):\n",
        "    img = torch.tensor(obs.transpose(2, 0, 1), dtype=torch.float32).unsqueeze(0).cuda()\n",
        "    yolo_out = yolo_model(img)\n",
        "    dummy_points = torch.ones((1, 1000, 3)).cuda()\n",
        "    point_data = create_pointnet_data(dummy_points, None)\n",
        "    lidar_out = pointnet(point_data)\n",
        "    fused = fusion_model(yolo_out[0].boxes.xywh, lidar_out)\n",
        "    action, _ = rl_model.predict(fused.detach().cpu().numpy())\n",
        "    obs, reward, done, truncated, _ = env.step(action)\n",
        "    if done or truncated:\n",
        "        obs, _ = env.reset()\n",
        "env.close()\n"
      ],
      "metadata": {},
      "execution_count": None,
      "outputs": []
    }
  ]
}

{'cells': [{'cell_type': 'code',
   'source': ['import gymnasium as gym\n',
    'from ultralytics import YOLO\n',
    'import torch\n',
    'from stable_baselines3 import PPO\n',
    '\n',
    '# Load models\n',
    "yolo_model = YOLO('/content/drive/MyDrive/autonomous_ai/models/yolo_optimized.pt')\n",
    'pointnet = PointNetSeg().cuda()\n',
    "pointnet.load_state_dict(torch.load('/content/drive/MyDrive/autonomous_ai/models/pointnet_optimized.pt'))\n",
    'fusion_model = FusionTransformer().cuda()\n',
    "fusion_model.load_state_dict(torch.load('/content/drive/MyDrive/autonomous_ai/models/fusion_optimized.pt'))\n",
    "rl_model = PPO.load('/content/drive/MyDrive/autonomous_ai/models/ppo_autonomous.zip')\n",
    '\n',
    '# Test in CarRacing-v2\n',
    "env = gym.make('CarRacing-v2', render_mode='human')\n",
    'obs, _ = env.reset()\n',
    'for _ in range(1000):\n',
    '    img = torch.tensor(obs.transpose(2, 0, 1), dtype=torch.float32).unsqueeze(0).cuda()\n',
    '    yolo_ou