In [18]:
from torch_snippets import *
from torch.utils.data import TensorDataset, DataLoader
import selectivesearch
from torchvision import transforms, models, datasets
from torchvision.ops import nms
import pandas as pd

In [7]:
device = "cuda" if torch.cuda.is_available() else "cpu"
image_root = "/home/nekozo/newSpace/bus-trucks"
DF_RAW = pd.read_csv("/home/nekozo/newSpace/bus-trucks-df.csv")
print(DF_RAW.head())

In [10]:
class OpenImage(Dataset):
    def __init__(self, df, image_folder=image_root):
        self.root = image_folder
        self.df = df
        self.unique_images = df["ImageID"].unique()

    def __len__(self):
        return len(self.unique_images)

    def __getitem__(self, idx):
        image_id = self.unique_images[idx]
        image_path = f"{self.root}/{image_id}.jpg"
        image = cv2.imread(image_path, 1)[..., ::-1]
        h, w, _ = image.shape
        df = self.df.copy()
        df = df[df["ImageID"] == image_id]
        boxes = df["XMin,YMin,XMax,YMax".split(",")].values
        boxes = (boxes * np.array([w, h, w, h])).astype(np.uint16).tolist()
        classes = df["LabelName"].values.tolist()
        return image, boxes, classes, image_path


# img, bbs, clss, _ = ds[9]
# show(img, bbs=bbs, texts=clss, sz=10)


def extract_endicates(img):
    img_lbl, regions = selectivesearch.selective_search(img, scale=200, min_size=100)
    img_area = np.prod(img.shape[:2])
    candidates = []
    for r in regions:
        if (
            r["rect"] not in candidates
            and r["size"] > (0.05 * img_area)
            and r["size"] < (1 * img_area)
        ):
            x, y, w, h = r["rect"]
            candidates.append(list(r["rect"]))
    return candidates


def extract_iou(box1, box2, epsilon=1e-5):
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])
    w = x2 - x1
    h = y2 - y1
    if w < 0 or h < 0:
        return 0.0
    area_overlap = w * h
    area_a = (box1[2] - box1[0]) * (box1[3] - box1[1])
    area_b = (box2[2] - box2[0]) * (box2[3] - box2[3])
    area_combined = area_a + area_b - area_overlap
    iou = area_overlap / (area_combined + epsilon)
    return iou

In [11]:
ds = OpenImage(df=DF_RAW)

FPATHS, GTBBS, CLSS, DELTAS, ROIS, IOUS = [], [], [], [], [], []
N = 500
for ix, (im, bbs, labels, fpath) in enumerate(ds):
    if ix == N:
        break
    H, W, _ = im.shape
    candidates = extract_endicates(im)
    candidates = np.array([(x, y, x + w, y + h) for x, y, w, h in candidates])
    ious, rois, clss, deltas = [], [], [], []
    ious = np.array(
        [[extract_iou(candidate, _bb_) for candidate in candidates] for _bb_ in bbs]
    ).T
    for jx, candidate in enumerate(candidates):
        cx, cy, cX, cY = candidate
        candidate_ious = ious[jx]
        best_iou_idx = np.argmax(candidate_ious)
        best_iou = candidate_ious[best_iou_idx]
        best_bb = _x, _y, _X, _Y = bbs[best_iou_idx]
        if best_iou > 0.3:
            clss.append(labels[best_iou_idx])
        else:
            clss.append("background")
        delta = np.array([_x - cx, _y - cy, _X - cX, _Y - cY]) / np.array([W, H, W, H])
        deltas.append(delta)
        rois.append(candidate / np.array([W, H, W, H]))
    FPATHS.append(fpath)
    IOUS.append(ious)
    ROIS.append(rois)
    CLSS.append(clss)
    DELTAS.append(deltas)
    GTBBS.append(bbs)

FPATH = [f"{image_root}/{stem(f)}.jpg" for f in FPATHS]
FPATHS, GTBBS, CLSS, DELTAS, ROIS = [
    item for item in [FPATHS, GTBBS, CLSS, DELTAS, ROIS]
]



In [15]:
targets = pd.DataFrame(flatten(CLSS), columns=["label"])
label2target = {l: t for t, l in enumerate(targets["label"].unique())}
target2label = {t: l for l, t in label2target.items()}
background_class = label2target["background"]

In [16]:
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])


def preprocess_image(img):
    img = torch.tensor(img).permute(2, 0, 1)
    img = normalize(img)
    return img.to(device).float()


def decode(_y):
    _, preds = _y.max(-1)
    return preds

In [17]:
class RCNNDataset(Dataset):
    def __init__(self, fpaths, rois, labels, deltas, gtbbs):
        self.fpaths = fpaths
        self.rois = rois
        self.labels = labels
        self.deltas = deltas
        self.gtbbs = gtbbs

    def __len__(self):
        return len(self.fpaths)

    def __getitem__(self, idx):
        fpath = str(self.fpaths[idx])
        image = cv2.imread(fpath, 1)[..., ::-1]
        H, W, _ = image.shape
        sh = np.array([H, W, H, W])
        gtbbs = self.gtbbs[idx]
        rois = self.rois[idx]
        bbs = (np.array(rois) * sh).astype(np.uint16)
        labels = self.labels[idx]
        deltas = self.deltas[idx]
        crops = [image[y:Y, x:Y] for (x, y, X, Y) in bbs]
        return image, crops, bbs, labels, deltas, gtbbs, fpath

    def collate_fn(self, batch):
        input, rois, rixs, labels, deltas = [], [], [], [], []
        for idx in range(len(batch)):
            (
                image,
                crops,
                image_bbs,
                image_labels,
                image_deltas,
                image_gt_bbs,
                image_fpath,
            ) = batch[idx]
            crops = [cv2.resize(crop, (224, 224)) for crop in crops]
            crops = [preprocess_image(crop / 255.0)[None] for crop in crops]
            input.extend(crops)
            labels.extend([label2target[c] for c in image_labels])
            deltas.extend(image_deltas)
        input = torch.cat(input).to(device)
        labels = torch.Tensor(labels).long().to(device)
        deltas = torch.Tensor(deltas).float().to(device)
        return input, labels, deltas

In [19]:
n_trn = 9*len(FPATHS)//10
trn_ds = RCNNDataset(FPATHS[:n_trn], ROIS[:n_trn], CLSS[:n_trn], DELTAS[:n_trn],GTBBS[:n_trn])
val_ds = RCNNDataset(FPATHS[n_trn:], ROIS[n_trn:], CLSS[n_trn:], DELTAS[n_trn:],GTBBS[n_trn:])
trn_dl = DataLoader(trn_ds, batch_size=2, collate_fn=trn_ds.collate_fn, drop_last=True)
val_dl = DataLoader(val_ds, batch_size=2, collate_fn=val_ds.collate_fn, drop_last=True)

In [None]:
vgg_backone = models.vgg16(pretrained=True)
vgg_backone.classifier = nn.Sequential()
for param in vgg_backone.parameters():
    param.requires_grad = False
vgg_backone.eval().to(device)


class RCNN(nn.Module):
    def __init__(self):
        super().__init__()
        feature_dim = 25088
        self.backone = vgg_backone
        self.cls_score = nn.Linear(feature_dim, len(label2target))
        self.bbox = nn.Sequential(
            nn.Linear(feature_dim, 512), nn.ReLU(), nn.Linear(512, 4), nn.Tanh()
        )
        self.cel = nn.CrossEntropyLoss()
        self.sl1 = nn.L1Loss()

    def forward(self, input):
        feat = self.backone(input)
        cls_score = self.cls_score(feat)
        bbox = self.bbox(feat)
        return cls_score, bbox

    def calc_loss(self, probs, _deltas, labels, deltas):
        detection_loss = self.cel(probs, labels)
        (ixs,) = torch.where(labels != 0)
        _deltas = _deltas[ixs]
        deltas = deltas[ixs]
        self.lmb = 10.0
        if len(ixs) > 0:
            regression_loss = self.sl1(_deltas, deltas)
            return (
                detection_loss + self.lmb * regression_loss,
                detection_loss.detach(),
                regression_loss.detach(),
            )
        else:
            regression_loss = 0
            return (
                detection_loss + self.lmb * regression_loss,
                detection_loss.detach(),
                regression_loss,
            )


def train_batch(inputs, model, opt, criterion):
    input, clss, deltas = inputs
    model.train()
    opt.zero_grad()
    _clss, _deltas = model(input)
    loss, loc_loss, regr_loss = criterion(_clss, _deltas, clss, deltas)
    accs = clss == decode(_clss)
    loss.backward()
    opt.step()
    return loss.detach(), loc_loss, regr_loss, accs.cpu().numpy()


def validation_batch(inputs, model, criterion):
    input, clss, deltas = inputs
    with torch.no_grad():
        model.eval()
        _clss, _deltas = model(input)
        loss, loc_loss, regr_loss = criterion(_clss, _deltas, clss, deltas)
        _, _clss = _clss.max(-1)
        accs = clss == _clss
    return _clss, _deltas, loss.detach(), loc_loss, regr_loss, accs.cpu().numpy()