In [None]:
%pip install mcap mcap-ros2-support

In [None]:
import sys

from mcap_ros2.decoder import DecoderFactory
from mcap.reader import make_reader
from matplotlib import pyplot as plt
import numpy as np
import io
from PIL import Image

with open("../../../rosbags/rosbag2_2023_11_29-11_57_09/rosbag2_2023_11_29-11_57_09_0.mcap", "rb") as f:
    reader = make_reader(f, decoder_factories=[DecoderFactory()])

    count = 1
    topics = ["/limo/depth_camera_link/depth/image_raw", "/limo/depth_camera_link/image_raw"]
    for schema, channel, message, ros_msg in reader.iter_decoded_messages(topics=topics):
        print(f"{channel.topic} {schema.name} [{message.log_time}]: {ros_msg.width}, {ros_msg.height}, {ros_msg.encoding}")

        encoding = "RGB"
        if ros_msg.encoding == "32FC1":
            encoding = "F"
        image = Image.frombytes(encoding, (ros_msg.width, ros_msg.height), ros_msg.data)
        image_array = np.array(image)
        print(np.min(image_array), np.max(image_array))
        if encoding == "F":
            for i in range(ros_msg.height):
                for j in range(ros_msg.width):
                    if image_array[i][j] == 100.0:
                        image_array[i][j] = 2
        image = Image.fromarray(image_array)
        
        # the histogram of the data

        plt.figure()
        n, bins, patches = plt.hist(image_array.flatten(), 30, density=True)
        plt.figure()
        plt.imshow(image_array, cmap='hot')
        print()

        if count == 2:
            break

        count += 1


In [None]:
with open("../../../rosbags/rosbag2_2023_11_29-11_57_09/rosbag2_2023_11_29-11_57_09_0.mcap", "rb") as f:
    reader = make_reader(f, decoder_factories=[DecoderFactory()])

    count = 1
    topics = ["/limo/depth_camera_link/points"]
    for schema, channel, message, ros_msg in reader.iter_decoded_messages(topics=topics):
        print(f"{channel.topic} {schema.name} [{message.log_time}]: {ros_msg.width}, {ros_msg.height}, {ros_msg.fields}")
        
        point_cloud = np.zeros([ros_msg.height, ros_msg.width, 3])
        if count == 2:
            break

        count += 1
        

In [None]:
from datetime import datetime

with open("../../../rosbags/rosbag2_2023_11_29-11_57_09/rosbag2_2023_11_29-11_57_09_0.mcap", "rb") as f:
    reader = make_reader(f, decoder_factories=[DecoderFactory()])

    count = 1
    topics = ["/limo/depth_camera_link/image_raw"]
    for schema, channel, message, ros_msg in reader.iter_decoded_messages(topics=topics):
        # print(f"{channel.topic} {schema.name} [{message.log_time}]: {ros_msg.width}, {ros_msg.height}, {ros_msg.encoding}")

        print(message.log_time)
        str_time = datetime.utcfromtimestamp(message.log_time/1000/1000/1000).strftime('%Y-%m-%d %H:%M:%S')
        print(f"{str_time}")
        encoding = "RGB"
        if ros_msg.encoding == "32FC1":
            encoding = "F"
        image = Image.frombytes(encoding, (ros_msg.width, ros_msg.height), ros_msg.data)
        image.save(f"../pothole_images/{count}.jpg")
        
        count += 1

In [None]:
%pip install torchvision

In [None]:
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

# load a model pre-trained on COCO
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")

# replace the classifier with a new one, that has
# num_classes which is user-defined
num_classes = 2  # 1 class (pothole) + background
# get number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features
# replace the pre-trained head with a new one
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

In [None]:
%pip install beautifulsoup4

In [None]:
# define a dataset
from glob import glob
from pathlib import Path
from torchvision.io import read_image
import torch
from bs4 import BeautifulSoup
from torchvision import tv_tensors
from torchvision.transforms.v2 import functional as F

class PotholeDataset(torch.utils.data.Dataset):
    def __init__(self, root, transforms):
        self.transforms = transforms
        self.pothole_images_folder_path = root
        self.pothole_xml_paths = glob(self.pothole_images_folder_path + "*.xml")

    def __getitem__(self, idx):
        xml_paths = self.pothole_xml_paths[idx]

        data = ""
        with open(xml_paths, 'r') as f:
            data = f.read()
        
        soup = BeautifulSoup(data, "xml")
        image_file_name = soup.filename.string
        img_tensor = read_image(self.pothole_images_folder_path + image_file_name)

        boxes = []
        labels = []
        areas = []
        for elem in soup.annotation.children:
            if elem.name == "object":
                # class_str = elem.find_next("name").string
                labels.append(1)

                box = [
                float(elem.bndbox.xmin.string),
                float(elem.bndbox.ymin.string),
                float(elem.bndbox.xmax.string),
                float(elem.bndbox.ymax.string)]

                boxes.append(box)
                areas.append((box[1]- box[0]) * (box[2]- box[1]))
        
        num_objs = len(boxes)
        target = {
            "boxes": tv_tensors.BoundingBoxes(boxes, format="XYXY", canvas_size=F.get_size(img_tensor)),
            "labels": torch.ones((num_objs,), dtype=torch.int64),
            "image_id": int(image_file_name.split(".")[0]),
            "area": torch.tensor(areas), 
            "iscrowd": torch.zeros((num_objs,), dtype=torch.uint8)
        }
        
        img = tv_tensors.Image(img_tensor)
        if self.transforms is not None:
            img, target = self.transforms(img, target)

        return img, target

    def __len__(self):
        return len(self.pothole_xml_paths)

In [None]:
%pip install pycocotools
# import os
# os.system("wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/engine.py")
# os.system("wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/utils.py")
# os.system("wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/coco_utils.py")
# os.system("wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/coco_eval.py")
# os.system("wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/transforms.py")

In [None]:
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.transforms import v2 as T
import utils


def get_transform(train):
    transforms = []
    if train:
        transforms.append(T.RandomHorizontalFlip(0.5))
    transforms.append(T.ToDtype(torch.float, scale=True))
    transforms.append(T.ToPureTensor())
    return T.Compose(transforms)

import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

def get_model():
    # load a model pre-trained on COCO
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")

    # replace the classifier with a new one, that has
    # num_classes which is user-defined
    num_classes = 2  # 1 class (person) + background
    # get number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

model = get_model()

dataset = PotholeDataset('../pothole_images/', get_transform(train=True))
data_loader = torch.utils.data.DataLoader(
    dataset,
    batch_size=2,
    shuffle=True,
    num_workers=4,
    collate_fn=utils.collate_fn
)


In [None]:
# For Training
images, targets = next(iter(data_loader))
images = list(image for image in images)
targets = [{k: v for k, v in t.items()} for t in targets]
output = model(images, targets)  # Returns losses and detections
print(output)

# For inference
model.eval()
x = [torch.rand(3, 300, 400), torch.rand(3, 500, 400)]
predictions = model(x)  # Returns predictions
print(predictions[0])

In [None]:
from engine import train_one_epoch, evaluate

# train on the GPU or on the CPU, if a GPU is not available
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# our dataset has two classes only - background and person
num_classes = 2
# use our dataset and defined transformations
dataset = PotholeDataset('../pothole_images/', get_transform(train=True))
dataset_test = PotholeDataset('../pothole_images/', get_transform(train=False))

# split the dataset in train and test set
indices = torch.randperm(len(dataset)).tolist()
dataset = torch.utils.data.Subset(dataset, indices[:-20])
dataset_test = torch.utils.data.Subset(dataset_test, indices[-20:])

# define training and validation data loaders
data_loader = torch.utils.data.DataLoader(
    dataset,
    batch_size=2,
    shuffle=True,
    num_workers=4,
    collate_fn=utils.collate_fn
)

data_loader_test = torch.utils.data.DataLoader(
    dataset_test,
    batch_size=1,
    shuffle=False,
    num_workers=4,
    collate_fn=utils.collate_fn
)

# get the model using our helper function
model = get_model()

# move model to the right device
model.to(device)

# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
    params,
    lr=0.005,
    momentum=0.9,
    weight_decay=0.0005
)

# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size=3,
    gamma=0.1
)

# let's train it just for 2 epochs
num_epochs = 20

for epoch in range(num_epochs):
    # train for one epoch, printing every 10 iterations
    train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=10)
    # update the learning rate
    lr_scheduler.step()
    # evaluate on the test dataset
    evaluate(model, data_loader_test, device=device)

print("That's it!")

In [None]:
dataset = PotholeDataset('../pothole_images/', get_transform(train=False))
data_loader = torch.utils.data.DataLoader(
    dataset,
    batch_size=2,
    shuffle=True,
    num_workers=4,
    collate_fn=utils.collate_fn
)

images, targets = next(iter(data_loader))
images = list(image.to("cuda") for image in images)
targets = [{k: v for k, v in t.items()} for t in targets]
output = model(images, targets)  # Returns losses and detections


In [None]:
from matplotlib import pyplot as plt
import matplotlib.patches as patches
from torchvision.utils import draw_bounding_boxes

def draw_image_with_boxes(img, pred):
    image = (255.0 * (img - img.min()) / (img.max() - img.min())).to(torch.uint8)
    image = image[:3, ...]
    pred_labels = [f"pothole: {score:.3f}" for label, score in zip(pred["labels"], pred["scores"])]
    pred_boxes = pred["boxes"].long()
    output_image = draw_bounding_boxes(image, pred_boxes, pred_labels, colors="red")
    return output_image.to("cpu").permute(1, 2, 0)

for img, pred in zip(images, output):
    plt.figure()
    plt.imshow(draw_image_with_boxes(img, pred))

plt.show()

In [None]:
from glob import glob
image_paths = glob("../pothole_images/*.jpg")

for image in image_paths[:5]:
    transforms = get_transform(False)
    img = read_image(image)
    x = transforms(img)
    print(x.size())
    pred = model([x.to("cuda")])
    plt.figure()
    plt.imshow(draw_image_with_boxes(x, pred[0]))
    

In [None]:
model_scripted = torch.jit.script(model) # Export to TorchScript
model_scripted.save("../models/pothole_detector.pt") # Save

In [None]:
pred