In [1]:
import os
import numpy as np
import matplotlib.patches as patches
import matplotlib.pyplot as plt
from bs4 import BeautifulSoup
from PIL import Image
import torch
import torchvision
from torchvision import transforms, datasets, models
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import time

In [2]:
if torch.cuda.is_available():    
    device = torch.device("cuda")
    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('We will use the GPU:', torch.cuda.get_device_name(0))

else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")

No GPU available, using the CPU instead.


In [4]:
import random
import shutil

!mkdir test_images
!mkdir test_annotations

random.seed(1234)
total_images = len(os.listdir('images'))
num_images_to_select = int(total_images * 0.2)
idx = random.sample(range(total_images), num_images_to_select)

for img in np.array(sorted(os.listdir('images')))[idx]:
    shutil.move('images/'+img, 'test_images/'+img)

for annot in np.array(sorted(os.listdir('annotations')))[idx]:
    shutil.move('annotations/'+annot, 'test_annotations/'+annot)

In [5]:
print('images/', len(os.listdir('images/')))
print('annotations/', len(os.listdir('annotations/')))
print('test_annotations/', len(os.listdir('test_annotations/')))
print('test_images/', len(os.listdir('test_images/')))

images/ 256
annotations/ 256
test_annotations/ 64
test_images/ 64


In [29]:
def generate_box(obj):
    xmin = float(obj.find('xmin').text)
    ymin = float(obj.find('ymin').text)
    xmax = float(obj.find('xmax').text)
    ymax = float(obj.find('ymax').text)
    return [xmin, ymin, xmax, ymax]

classes = {
    1: "Alligator Cracks"
}

def generate_label(obj):
    if obj.find('name').text == "Alligator Cracks":         return 1
    
def plot_image_from_output(img, annotation):
    # Convert the image tensor to a format suitable for plotting
    img = img.cpu().permute(1, 2, 0)
    
    # Create a figure and axis
    fig, ax = plt.subplots(1, figsize=(20, 20))
    ax.imshow(img)
    
    # Define a dictionary to map labels to edge colors
    label_to_color = {
        1: '#ff007c',
        2: '#018477',
        3: '#b4a808',
        4: '#ddff33',
        5: '#ff00cc',
        6: '#fa3253',
        7: '#24b353',
        8: '#bac99a',
        9: '#d3a298',
        10: '#505016',
        11: '#75c50d'
    }
    
    # Loop through the annotations and draw rectangles and labels
    for idx in range(len(annotation["boxes"])):
        xmin, ymin, xmax, ymax = annotation["boxes"][idx]
        label = int(annotation['labels'][idx])
        edgecolor = label_to_color.get(label, 'r')  # Default to 'r' if label not in dictionary

        # Create and add the rectangle
        rect = patches.Rectangle((xmin, ymin), (xmax - xmin), (ymax - ymin), linewidth=1, edgecolor=edgecolor, facecolor='none')
        ax.add_patch(rect)
        
        # Add the label text inside the rectangle
        #text_x = xmin + (xmax - xmin) / 2
        #text_y = ymin + (ymax - ymin) / 2
        text_x = xmin
        text_y = ymin
        label_text = classes.get(label, 'Unknown') 
        label_text = label_text + " - " + str(label)
        ax.text(text_x, text_y, label_text, color='white', fontsize=8, ha='center', va='center', bbox=dict(facecolor=edgecolor, edgecolor=edgecolor, boxstyle='round,pad=0.2'))
    
    # Display the plot
    plt.show()
    
def generate_target(file): 
    with open(file) as f:
        data = f.read()
        soup = BeautifulSoup(data, "html.parser")
        objects = soup.find_all("object")

        boxes = []
        labels = []
        for i in objects:
            boxes.append(generate_box(i))
            labels.append(generate_label(i))

        boxes = torch.as_tensor(boxes, dtype=torch.float32) 
        labels = torch.as_tensor(labels, dtype=torch.int64) 
        
        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        
        return target

In [30]:
class MaskDataset(object):
    def __init__(self, transforms, path):
        '''
        path: path to train folder or test folder
        '''
        # define the path to the images and what transform will be used
        self.transforms = transforms
        self.path = path
        self.imgs = list(sorted(os.listdir(self.path)))


    def __getitem__(self, idx): #special method
        # load images ad masks
        file_image = self.imgs[idx]
        file_label = self.imgs[idx][:-3] + 'xml'
        img_path = os.path.join(self.path, file_image)
        
        if 'test' in self.path:
            label_path = os.path.join("test_annotations/", file_label)
        else:
            label_path = os.path.join("annotations/", file_label)

        #print(f'reading', img_path)
        
        img = Image.open(img_path).convert("RGB")
        #Generate Label
        target = generate_target(label_path)
        
        if self.transforms is not None:
            img = self.transforms(img)

        return img, target

    def __len__(self): 
        return len(self.imgs)

data_transform = transforms.Compose([  # transforms.Compose : a class that calls the functions in a list consecutively
        transforms.ToTensor(),
    ])

def collate_fn(batch):
    return tuple(zip(*batch))

dataset = MaskDataset(data_transform, 'images/')
test_dataset = MaskDataset(data_transform, 'test_images/')

data_loader = torch.utils.data.DataLoader(dataset, batch_size=4, collate_fn=collate_fn)
test_data_loader = torch.utils.data.DataLoader(test_dataset, batch_size=2, collate_fn=collate_fn)

In [23]:
def get_model_instance_segmentation(num_classes):
  
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

In [24]:
model = get_model_instance_segmentation(2)

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') 
model.to(device)

FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=0.0)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=0.0)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=0.0)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=0.0)
          (relu): ReLU(

In [31]:
import torch.optim as optim
import torch.nn as nn
import torchvision.models as models
from torch.optim.lr_scheduler import StepLR

num_epochs = 10
#params = [p for p in model.parameters() if p.requires_grad]
#optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

# Optimization 1
optimizer = optim.Adam(model.parameters(), lr=1e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)


In [32]:
print('----------------------train start--------------------------')

for epoch in range(num_epochs):
    start = time.time()
    model.train()
    epoch_loss = 0
    for imgs, annotations in data_loader:
        imgs = list(img.to(device) for img in imgs)
        annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
        loss_dict = model(imgs, annotations) 
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step() 
        epoch_loss += losses
    scheduler.step()
    print(f'epoch : {epoch+1}, Loss : {epoch_loss}, time : {time.time() - start}')

print('----------------------train end--------------------------')

----------------------train start--------------------------
reading images/GX010061.MP4_frame_0_part_2_2.png
reading images/GX010061.MP4_frame_0_part_3_1.png
reading images/GX010061.MP4_frame_10_part_2_2.png
reading images/GX010061.MP4_frame_10_part_2_3.png
reading images/GX010061.MP4_frame_10_part_3_3.png
reading images/GX010061.MP4_frame_12_part_2_2.png
reading images/GX010061.MP4_frame_12_part_2_3.png
reading images/GX010061.MP4_frame_12_part_3_1.png
reading images/GX010061.MP4_frame_13_part_2_1.png
reading images/GX010061.MP4_frame_13_part_3_1.png
reading images/GX010061.MP4_frame_14_part_2_2.png
reading images/GX010061.MP4_frame_14_part_3_0.png
reading images/GX010061.MP4_frame_14_part_3_2.png
reading images/GX010061.MP4_frame_14_part_3_3.png
reading images/GX010061.MP4_frame_15_part_2_1.png
reading images/GX010061.MP4_frame_15_part_2_2.png


KeyboardInterrupt: 

In [10]:
torch.save(model.state_dict(),f'model_tcc7a_{num_epochs}.pt')

In [11]:
def make_prediction(model, img, threshold):
    model.eval()
    preds = model(img)

    for id in range(len(preds)) :
        idx_list = []

        for idx, score in enumerate(preds[id]['scores']) :
            if score > threshold : 
                idx_list.append(idx)

        preds[id]['boxes'] = preds[id]['boxes'][idx_list]
        preds[id]['labels'] = preds[id]['labels'][idx_list]
        preds[id]['scores'] = preds[id]['scores'][idx_list]

    return preds

In [12]:
from tqdm import tqdm

labels = []
preds_adj_all = []
annot_all = []

for im, annot in tqdm(test_data_loader, position = 0, leave = True):
    im = list(img.to(device) for img in im)

    for t in annot:
        labels += t['labels']

    with torch.no_grad():
        preds_adj = make_prediction(model, im, 0.5)
        preds_adj = [{k: v.to(torch.device('cpu')) for k, v in t.items()} for t in preds_adj]
        preds_adj_all.append(preds_adj)
        annot_all.append(annot)


100%|██████████| 9/9 [00:02<00:00,  4.23it/s]


In [13]:
from utils import formulas

In [14]:
sample_metrics = []
for batch_i in range(len(preds_adj_all)):
    sample_metrics += formulas.getstatistics(preds_adj_all[batch_i], annot_all[batch_i], iou_threshold=0.2) 

true_positives, pred_scores, pred_labels = [torch.cat(x, 0) for x in list(zip(*sample_metrics))]  # all the batches get concatenated
precision, recall, AP, f1, ap_class = formulas.ap_per_class(true_positives, pred_scores, pred_labels, torch.tensor(labels))
mAP = torch.mean(AP)
print(f'mAP : {mAP}')
print(f'AP : {AP}')

mAP : 0.19435285584274276
AP : tensor([0.1944], dtype=torch.float64)
