In [1]:
import mmengine
from pycocotools.coco import COCO
import json
import os
import torch
import torch.utils.data
import torchvision
import matplotlib.patches as patches
import matplotlib.pyplot as plt
from PIL import Image
import utils_ObjectDetection as utils
from tqdm import tqdm
import torchvision
from PIL import Image
import utils_ObjectDetection as utils

In [2]:

# Reading annotations.json
TRAIN_ANNOTATIONS_PATH = "./train/_annotations.coco.json"
TRAIN_IMAGE_DIRECTIORY = "./train/"

VAL_ANNOTATIONS_PATH = "./valid/_annotations.coco.json"
VAL_IMAGE_DIRECTIORY = "./valid/"

train_coco = COCO(TRAIN_ANNOTATIONS_PATH)
valid_coco = COCO(VAL_ANNOTATIONS_PATH)

loading annotations into memory...
Done (t=0.01s)
creating index...
index created!
loading annotations into memory...
Done (t=0.01s)
creating index...
index created!


In [3]:
class CavityDataset(torch.utils.data.Dataset):
    def __init__(self, root, annotation, transforms=None):
        self.root = root
        self.transforms = transforms
        self.coco = COCO(annotation)
        self.ids = list(sorted(self.coco.imgs.keys()))

    def __getitem__(self, index):
        # Own coco file
        coco = self.coco
        # Image ID
        img_id = self.ids[index]
        # List: get annotation id from coco
        ann_ids = coco.getAnnIds(imgIds=img_id)
        # Dictionary: target coco_annotation file for an image
        coco_annotation = coco.loadAnns(ann_ids)
        # path for input image
        path = coco.loadImgs(img_id)[0]['file_name']
        # open the input image
        img = Image.open(os.path.join(self.root, path))

        # number of objects in the image
        num_objs = len(coco_annotation)

        # Bounding boxes for objects
        # In coco format, bbox = [xmin, ymin, width, height]
        # In pytorch, the input should be [xmin, ymin, xmax, ymax]
        boxes = []
        for i in range(num_objs):
            xmin = coco_annotation[i]['bbox'][0]
            ymin = coco_annotation[i]['bbox'][1]
            xmax = xmin + coco_annotation[i]['bbox'][2]
            ymax = ymin + coco_annotation[i]['bbox'][3]
            boxes.append([xmin, ymin, xmax, ymax])
        boxes = torch.as_tensor(boxes, dtype=torch.float32)

        # labels = torch.ones((num_objs,), dtype=torch.int64)  # 충치있으면 1 없으면 2 ### 라벨 문제임
        labels = []
        for i in range(num_objs):
            labels.append(coco_annotation[i]['category_id'])
        labels = torch.as_tensor(labels, dtype=torch.int64)
        


        # Annotation is in dictionary format
        my_annotation = {}
        my_annotation["boxes"] = boxes
        my_annotation["labels"] = labels
 

        if self.transforms is not None:
            img = self.transforms(img)

        return img, my_annotation

    def __len__(self):
        return len(self.ids)

In [4]:
def get_transform():
    custom_transforms = []
    custom_transforms.append(torchvision.transforms.ToTensor())
    return torchvision.transforms.Compose(custom_transforms)

In [5]:
def train_set():
    train_data_dir = './train/'
    train_coco = './train/_train_anno.json'



    cavity_dataset = CavityDataset(root=train_data_dir,
                                annotation=train_coco,
                                transforms=get_transform())




    def collate_fn(batch): # 이거를 사용하면 resize안해도되네? x 상자모양 이상함
        return tuple(zip(*batch))


    train_batch_size = 16

    train_loader = torch.utils.data.DataLoader(dataset=cavity_dataset, 
                                            batch_size=train_batch_size,
                                            shuffle=True,
                                            num_workers=0,
                                            collate_fn=collate_fn
                                            )
    return cavity_dataset,train_loader
cavity_dataset,train_loader = train_set() 
print(cavity_dataset,train_loader)

loading annotations into memory...
Done (t=0.01s)
creating index...
index created!
<__main__.CavityDataset object at 0x000002442A1E42B0> <torch.utils.data.dataloader.DataLoader object at 0x000002441ECC7580>


In [6]:
# torch.save(model.state_dict(),f'model_{num_epochs}.pt')

# INFERENCE

In [7]:
def test_set():
    test_data_dir = './valid/'
    test_coco = './valid/_annotations.coco.json'
    def collate_fn(batch): 
        return tuple(zip(*batch))

    test_cavity_dataset = CavityDataset(root=test_data_dir,
                                        annotation=test_coco,
                                        transforms=get_transform())
    test_loader = torch.utils.data.DataLoader(dataset=test_cavity_dataset,
                                            batch_size=16,
                                            shuffle=True,
                                            collate_fn=collate_fn
                                            )
    return test_cavity_dataset, test_loader

test_cavity_dataset, test_loader=test_set()
print(test_cavity_dataset, test_loader)

loading annotations into memory...
Done (t=0.00s)
creating index...
index created!
<__main__.CavityDataset object at 0x000002442A1E4910> <torch.utils.data.dataloader.DataLoader object at 0x000002441EE21190>


In [8]:
# test_loader.__len__()
len_dataloader = len(test_loader)
print(len_dataloader)

6


In [9]:
def make_prediction(model, img, threshold):
    model.eval()
    preds = model(img)
    for id in range(len(preds)) :
        idx_list = []

        for idx, score in enumerate(preds[id]['scores']) :
            if score > threshold : 
                idx_list.append(idx)

        preds[id]['boxes'] = preds[id]['boxes'][idx_list]
        preds[id]['labels'] = preds[id]['labels'][idx_list]
        preds[id]['scores'] = preds[id]['scores'][idx_list]

    return preds

In [10]:

from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
from torch.utils.data import Dataset, DataLoader

def get_model_instance_segmentation(num_classes):
    # load an instance segmentation model pre-trained pre-trained on COCO
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    # get number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

def list2text(data_dic):
    with open('dictionary.txt', 'w') as file:
        for key, value in data_dic.items():
            file.write(key + ': ' + str(value) + '\n')
            
def main():
    num_classes = 3  #### Faster R-CNN 사용 시 주의할 점은 background 클래스를 포함한 개수를 num_classes에 명시해야됨 ㅡㅡ 
    num_epochs = 50
    model = get_model_instance_segmentation(num_classes)
    
    #model.load_state_dict(torch.load(f'model_{num_epochs}.pt'))  
    # move model to the right device
    model.to(device)
    # parameters
    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

    train_dataset, train_loader = train_set()
    test_dataset, test_loader = test_set()
    
    data_dic = {
        'train_loss':[],
        'test_loss':[],
        'train_map':[],
        'test_map':[]
    }
    
    for epoch in range(num_epochs):
        train_loss, train_mAP = train(model, train_loader, optimizer)
        test_loss, test_mAP = test(model, test_loader)

        data_dic['train_loss'].append(train_loss)
        data_dic['test_loss'].append(test_loss)
        data_dic['train_map'].append(train_mAP)
        data_dic['test_map'].append(test_mAP)

        print('[================================================={0} epoch=================================================]'.format(epoch+1))
        print('++++++++++RESULT+++++++++')
        print('train l: ',train_loss.item(), 'train m: ',train_mAP.item())
        print('test l: ',test_loss.item(), 'test m:', test_mAP.item())
        print('[===========================================================================================================]')
    
    
    list2text(data_dic)

    torch.save(model.state_dict(),f'model_{num_epochs}.pt')
    
def train(model, train_loader, optimizer):
    len_dataloader = train_loader.__len__()
    i = 0    
    labels = []
    preds_adj_all = []
    annot_all = []
    sample_metrics = []
    train_total_losses = 0

    for imgs, annotations in train_loader:
        model.train()
        i += 1
        imgs = list(img.to(device) for img in imgs)
        annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
        loss_dict = model(imgs, annotations)
        losses = sum(loss for loss in loss_dict.values())
        train_total_losses += losses
        
        for t in annotations:
            labels += t['labels']
        
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        
        with torch.no_grad():
            preds_adj = make_prediction(model, imgs, 0.5)
            preds_adj = [{k: v.to(device) for k, v in t.items()} for t in preds_adj]
            preds_adj_all.append(preds_adj)
            annot_all.append(annotations)
            
        print(f'TRAIN Iteration: {i}/{len_dataloader}, Loss: {losses}')    
        
    for batch_i in range(len(preds_adj_all)):
        sample_metrics += utils.get_batch_statistics(preds_adj_all[batch_i], annot_all[batch_i], iou_threshold=0.5) 
        
    true_positives, pred_scores, pred_labels = [torch.cat(x, 0) for x in list(zip(*sample_metrics))]  # 배치가 전부 합쳐짐
    precision, recall, AP, f1, ap_class = utils.ap_per_class(true_positives, pred_scores, pred_labels, torch.tensor(labels))

    train_total_losses /= len_dataloader
    train_mAP = torch.mean(AP)
    return train_total_losses, train_mAP


def test(model, test_loader):
    print("###########################################################")
    len_dataloader = test_loader.__len__()
    labels = []
    preds_adj_all = []
    annot_all = []
    sample_metrics = []
    test_total_losses = 0

    with torch.no_grad():
        i = 0 
        for imgs, annotations in test_loader:
            model.train()
            i += 1
            imgs = list(img.to(device) for img in imgs)
            annota = [{k: v.to(device) for k, v in t.items()} for t in annotations]
            loss_dict = model(imgs, annota)
            losses = sum(loss for loss in loss_dict.values())
            test_total_losses += losses
            
            for t in annota:
                labels += t['labels']

            preds_adj = make_prediction(model, imgs, 0.5)
            preds_adj = [{k: v.to(device) for k, v in t.items()} for t in preds_adj]
            preds_adj_all.append(preds_adj)
            annot_all.append(annota)  
            
            print(f' Iteration: {i}/{len_dataloader}, Loss: {losses}')
            
    for batch_i in range(len(preds_adj_all)):
        sample_metrics += utils.get_batch_statistics(preds_adj_all[batch_i], annot_all[batch_i], iou_threshold=0.5) 

    true_positives, pred_scores, pred_labels = [torch.cat(x, 0) for x in list(zip(*sample_metrics))]  # 배치가 전부 합쳐짐
    precision, recall, AP, f1, ap_class = utils.ap_per_class(true_positives, pred_scores, pred_labels, torch.tensor(labels))
    
    test_total_losses /= len_dataloader
    test_mAP = torch.mean(AP)
    return test_total_losses, test_mAP
    
    
main()

Downloading: "https://download.pytorch.org/models/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth" to C:\Users\asiclab/.cache\torch\hub\checkpoints\fasterrcnn_resnet50_fpn_coco-258fb6c6.pth


  0%|          | 0.00/160M [00:00<?, ?B/s]

loading annotations into memory...
Done (t=0.01s)
creating index...
index created!
loading annotations into memory...
Done (t=0.00s)
creating index...
index created!
