## Mask R-CNN 

#### 라이브러리 import
 - **torch, torchvision** : pytorch 및 이미지 처리 위한 라이브러리
 - **pycocotools.coco** : COCO 형식의 데이터를 처리
 - **PIL, matplotlib** : 세그멘테이션 결과를 시각
 - **cv2** : 실시간 컴퓨터 비전을 위한 라이브러리
 - **sklearn.metrics** : 모델의 성능 평가를 위한 라이브러리
 - **tqdm** : 학습 진행 척도, 지표를 표시
 - **os, numpy, random** : 기본적인 데이터 처리를 위한 라이브러리

In [None]:
import os
import torch
import torchvision
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision.models.detection import maskrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor
from pycocotools.coco import COCO
from PIL import Image
from sklearn.metrics import precision_recall_curve, auc
import numpy as np
import random
import matplotlib.pyplot as plt
import cv2
from tqdm import tqdm

#### 난수 고정
 - **set_seed** 함수 : 난수를 고정하여 결과의 재현성 보장
     - `Pytorch` 에서 재현성을 위해 사용하는 난수를 고정
         - torch.manual_seed(seed)
         - torch.cuda.manual_seed(seed)
         - torch.cuda.manual_seed_all(seed)
     - `넘파이 및 내장 랜덤` 시드 고정 : 넘파이의 랜덤 시드, 내장 랜덤 시드의 난수 고정
         - np.random.seed(seed)
         - random.seed(seed)
     - `CUDA convolution 연산`의 알고리즘 제어
         - deterministic : non-deterministic 알고리즘을 제어하고 deterministic 알고리즘만 허용(True)
         - benchmark : CUDA에서 자동으로 알고리즘을 탐색하여 가장 빠른 알고리즘을 적용하는데 이를 해제함(False)
 - **seed** : 42로 고정(일반적으로 널리 사용되는 수)

In [None]:
def set_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


set_seed(42)

#### 데이터셋 클래스 정의 <br>
 전체 데이터셋을 모델에 적용하기 위해 전처리, 텐서 적용한 후 최종적으로 img, target 으로 반환<br>
 **ParkingDataset** 클래스는 초기화하는 `'__init__'`, 이미지와 어노테이션을 전처리하는 `'__getitem__'`, 데이터셋 길이를 반환하는 `'__len__'` 함수로 구성되어 있음

 - **\_\_init__** : 데이터셋 초기화 함수
     - **root** : 사용할 이미지 디렉터리
     - **COCO(annotation)** : 어노테이션을 COCO 데이터형식으로 처리
     - **ids** : 전체 이미지 리스트 
     - **max_images** : 최대 사용 이미지 수 지정
     - **remove_images** : 제거할 이미지(손상 등) 있으면 번호 지정
     - **transforms** : transforms 적용
 - **\_\_getitem__** : 주어진 인덱스에 대한 이미지와 어노테이션을 target으로 전처리
     - 이미지를 컬러로 사용("RGB")
     - `target` : 아래 항목이 텐서 변환되어 반환
         - "image_id" 
         - "boxes" : 어노테이션의 'bbox' 좌표를 COCO 형식으로 전환
         - "labels" : 어노테이션 클래스 번호(1~4)
         - "masks" : 어노테이션으로 생성된 mask
     -  transforms: 증강 또는 추가로 필요한 변형 작업, 입력된 경우 사용   
 - **\_\_len__** : 데이터셋의 전체 길이 반환

In [None]:
class ParkingDataset(Dataset):
    def __init__(self, root, annotation, transforms=None, max_images=None, remove_images=None):
        self.root = root
        self.coco = COCO(annotation)
        self.ids = list(self.coco.imgs.keys())

        if max_images is not None:
            self.ids = self.ids[:max_images]  # max번째 이미지까지만 사용
        if remove_images is not None:
            for remove_image in remove_images:
                self.ids.remove(remove_image)  # remove_image 제거

        self.transforms = transforms

    def __getitem__(self, index):
        coco = self.coco
        img_id = self.ids[index]
        ann_ids = coco.getAnnIds(imgIds=img_id)
        anns = coco.loadAnns(ann_ids)
        img_info = coco.loadImgs(img_id)[0]
        path = os.path.join(self.root, img_info['file_name'])

        img = Image.open(path).convert("RGB")
        boxes = []
        labels = []
        masks = []

        for ann in anns:
            xmin, ymin, width, height = ann['bbox']
            xmax = xmin + width
            ymax = ymin + height
            boxes.append([xmin, ymin, xmax, ymax])
            labels.append(ann['category_id'])
            masks.append(self.coco.annToMask(ann))

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        masks = torch.as_tensor(np.array(masks), dtype=torch.uint8)

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["masks"] = masks
        target["image_id"] = torch.tensor([img_id])

        if self.transforms:
            img = self.transforms(img)

        return img, target

    def __len__(self):
        return len(self.ids)


####  collate_fn 함수 정의

 - 데이터셋 길이가 다양하더라도 처리 가능하게 함(모델에 사용하는 데이터셋 길이가 항상 고정된 것은 아니므로 이 함수를 사용)

In [None]:
def collate_fn(batch):
    return tuple(zip(*batch))

####  이미지 전처리 함수 정의

 - 이미지를 모델에 적용할 수 있게 텐서로 변환하는 함수

In [None]:
def get_transform(train):
    transforms = []
    transforms.append(torchvision.transforms.ToTensor())
    return torchvision.transforms.Compose(transforms)

####  IoU 계산 로직

 - 예측 결과(pred_boxes)와 실제 타겟(gt_boxes)의 IoU를 계산
     - bbox의 intersection/union

In [None]:
def calculate_iou(pred_boxes, gt_boxes):
    intersection = np.logical_and(pred_boxes, gt_boxes)
    union = np.logical_or(pred_boxes, gt_boxes)
    iou = np.sum(intersection) / np.sum(union)
    return iou

####  정밀도-재현율 곡선에서 AUC 계산

 - model 예측의 성능 평가 지표 AUC 계산(AP : bbox 좌표로 계산
 - AP(Average Precision)는 Pprecision-Recall 그래프의 선 아래쪽의 면적으로, AP가 높을수록 그 모델의 성능이 높다고 평가할 수 있음
     - precisions = tp / (tp + fp) (posivite 예측한 것중 실제 positive 의 비율)
     - recalls = tp / (tp + fn) (실제 positive 중 positive 로 예측한 비율)

In [None]:
def calculate_ap(pred_boxes, gt_boxes, scores, iou_threshold=0.5):
    precisions = []
    recalls = []
    
    for threshold in np.linspace(0, 1, num=101):
        tp = np.sum((scores >= threshold) & (calculate_iou(pred_boxes, gt_boxes) >= iou_threshold))
        fp = np.sum((scores >= threshold) & (calculate_iou(pred_boxes, gt_boxes) < iou_threshold))
        fn = np.sum((scores < threshold))
        
        precision = tp / (tp + fp + 1e-6)
        recall = tp / (tp + fn + 1e-6)
        
        precisions.append(precision)
        recalls.append(recall)
    
    precisions = np.array(precisions)
    recalls = np.array(recalls)
    
    ap = auc(recalls, precisions)
    return ap

####  모델 평가 함수 <br>
  검증 데이터셋의 loss 를 계산하여 모델 성능을 평가

**`evaluate_model`**
 - **model.train()** : 모델을 학습 모드로 설정
 - **torch.no_grad()** : 검증 단계에서 gradient 연산 과정을 비활성화, 연산 속도 증가
 - **loss_dict** : 손실 계산을 위해 모델에 이미지와 타겟을 전달
 - total_loss / len(data_loader_test) : 결과로서 반환

In [None]:
def evaluate_model(model, data_loader_test, device):
    model.train()  
    total_loss = 0.0
    with torch.no_grad():
        for images, targets in data_loader_test:
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

          
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())
            total_loss += losses.item()

    return total_loss / len(data_loader_test)

####  모델 학습 함수 (베스트 모델만 저장) <br>
  전체 데이터셋을 모델로 학습 및 평가하여 베스트 모델(validation_loss 가 최소인 모델)을 'best_model.pth' 파일로 저장

**`train_model`**
  - model.train(), os.makedirs : 학습 모드 전환, 결과 저장 디렉터리 생성
  - **best_loss** : 첫 epoch 시 저장되도록 초기값을 매우 크게 설정
  - **try** : num_epochs 만큼 학습 시작
      - **pbar** : 학습 진행 척도를 표시
      - **loss_dict** : loss 계산 위해 images, targets을 모델에 넣음
      - 역전파 : 학습 중 gradient 값이 계속 저장되어 가는데, 한 epoch 학습 후 다시 초기화하여 진행하여야 함
          - 아래 과정을 통해 학습동안 저장된 gradient를 0로 초기화, 역전파와 optimizer 를 다시 진행하도록 세팅 
          - **optimizer.zero_grad()** 
          - **losses.backward()** 
          - **optimizer.step()**
      - **pbar.set_postfix** : 학습 진행 정도와 함께 loss, gpu 사용량 표시
      - **evaluate_model** 호출, 검증 데이터셋을 사용하여 성능 확인
      - **validation loss** 출력
      - 현재 모델이 이전까지의 모델보다 나을 경우 파일명 'best_model.pth'으로 모델 가중치를 저장
  - 전체 epoch 학습 후 완료
  - **KeyboardInterrupt** : Ctrl+C 입력 시 중단, model state를 파일명 'interrupted_model.pth' 으로 저장

In [None]:
def train_model(model, data_loader, data_loader_test, optimizer, device, num_epochs=25, save_dir='model_weights'):
    model.train()
    os.makedirs(save_dir, exist_ok=True)
    best_loss = float('inf')  
    
    try:
        for epoch in range(num_epochs):
            print(f"Epoch {epoch + 1}/{num_epochs}")
            pbar = tqdm(enumerate(data_loader), total=len(data_loader), desc="Training", ncols=100)
            running_loss = 0.0
            for i, (images, targets) in pbar:
                images = list(image.to(device) for image in images)
                targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

                loss_dict = model(images, targets)
                losses = sum(loss for loss in loss_dict.values())

                optimizer.zero_grad()
                losses.backward()
                optimizer.step()

                running_loss += losses.item()
                pbar.set_postfix(
                    {"Loss": losses.item(), "memory_allocated(mb)": (torch.cuda.memory_allocated() / (1024 ** 2))})


            # 검증 데이터셋을 사용하여 성능 평가
            validation_loss = evaluate_model(model, data_loader_test, device)

            print(f"Epoch {epoch + 1} Validation Loss: {validation_loss}")


            # 현재 모델이 이전까지의 모델보다 나은지 판단하여 저장
            if validation_loss < best_loss:
                best_loss = validation_loss
                save_path = os.path.join(save_dir, 'best_model.pth')
                torch.save(model.state_dict(), save_path)
                print(f"New best model saved to {save_path}")



    except KeyboardInterrupt:
        print("Training interrupted. Saving current model weights.")
        save_path = os.path.join(save_dir, 'interrupted_model.pth')
        torch.save(model.state_dict(), save_path)
        print(f"Model saved to {save_path}. Proceeding to inference.")
        return False

    return True

####  이미지에 대한 예측 시각화 <br>
  예측하려는 이미지를 모델에 맞게 전처리 후 예측한 결과를 클래스별 지정된 색상의 color mask를 오버레이하여 시각화 함 

**`visualize_predictions`** <br>
  - **예측 이미지 전처리 및 모델 예측**
      - **model.eval()** : 평가 모드 전환, 드롭아웃 비활성화/학습 중 사용하는 평균, 이동평균 업데이트하지 않음
      - **get_transform** : 예측 이미지를 모델에 적용하도록 텐서 변환
      - **GPU(device)** 사용 설정
      - **torch.no_grad()** : gradient 비활성화
      - **prediction** : 모델에 이미지를 넣어서 예측 <br>
  - **시각화**
      - 이미지를 넘파이 배열 형태로 처리 
      - 각 클래스의 색상 지정 (주차 공간: 초록, 도로: 빨강, 사람: 파랑, 탈것: 노랑) 
      - 예측된 이미지의 클래스(0:배경 제외)별 mask에 색상을 적용
      - mask = mask > 0.3 : mask threshold 0.3으로 설정 
      - 원본 이미지에 마스크 오버레이된 이미지 출력 및 저장("result.jpg")

In [None]:
def visualize_predictions(model, img_path, device):
    model.eval()
    img = Image.open(img_path).convert("RGB")
    transform = get_transform(train=False)
    img_tensor = transform(img).to(device)
    with torch.no_grad():
        prediction = model([img_tensor])

    plt.figure(figsize=(10, 10))
    img_np = np.array(img)

    # 각 클래스의 색상 지정 (주차 공간, 도로, 탈것, 사람)
    colors = {
        1: [0, 255, 0],  # 주차 공간: 초록
        2: [255, 0, 0],  # 도로: 빨강
        3: [0, 0, 255],  # 사람: 파랑
        4: [255, 255, 0]  # 탈것: 노랑
    }

    for i in range(len(prediction[0]['masks'])):
        label = prediction[0]['labels'][i].item()
        if label in colors:  # 배경(label == 0)은 제외하고 카테고리 시각화
            mask = prediction[0]['masks'][i, 0].cpu().numpy()
            mask = mask > 0.3  # Threshold 적용
            color_mask = np.zeros_like(img_np)
            for c in range(3):
                color_mask[:, :, c] = np.where(mask, colors[label][c], 0)
            img_np = cv2.addWeighted(img_np, 1.0, color_mask, 0.7, 0)  # 원본 이미지에 색상 마스크 오버레이

    plt.imshow(img_np)
    plt.show()
    plt.savefig("result.jpg")


####  메인 실행 <br>
  데이터셋 전처리, 모델 로드, 학습 및 평가, 예측 후 시각화의 전체 과정을 정의된 함수를 사용하여 실행<br>
  데이터 경로, 사전학습 모델 로드, 출력층 커스터마이즈, 모델 가중치, optimizer, 학습 파라미터 등을 학습 환경과 모델 성능에 따라 변경 가능

1. `데이터셋 로드 및 전처리`<br>
    - device : GPU 사용 가능 여부 확인(아닌 경우 CPU 사용)
    - dataset : 데이터셋 로드(전체 데이터셋 사용)  
    - 학습 데이터셋과 테스트 데이터셋 나누기 (8:2 비율)
        - 랜덤 스플릿을 적용하여 데이터셋을 나눔
        - dataset_train(학습데이터), dataset_test (검증데이터)
    - 데이터 로더 생성
        - batch size, shuffle(셔플 여부), num_workers(프로세싱 수), collate_fn : 데이터셋에 파라미터를 적용하여 로드하도록 함
        - data_loader(학습), data_loader_test(검증)

2. `모델 로드 및 커스터마이즈`<br>
    - 사전 학습된 Mask R-CNN 모델 로드 및 수정 
        - **maskrcnn_resnet50_fpn** 모델 로드 : 사전 학습된 ResNet-FPN backbone의 Mask R-CNN 모델 사용
    - 모델의 출력층을 커스터마이즈
        - num_classes = 5 (배경 포함 5개의 클래스)
        - FastRCNNPredictor : bbox 출력
        - MaskRCNNPredictor : 세그멘테이션 mask 출력
        - model.to(device) : gpu 사용하여 학습하도록 설정
    - 사용자에게 가중치 파일을 직접 지정하여 불러오기
        - **load_weights** : 사용자가 지정한 가중치 파일을 사용할지 묻고 없는 경우 scratch 진행
    - 옵티마이저 정의 (Adam)
        - **optimizer** : Adam 사용(learning rate = 0.001)
        - 전 단계에서 학습 후 gradient를 어느 정도 줄일(늘일)지 학습률(learning rate) 설정

3. `학습 및 예측, 시각화`<br>
   - 모델 학습
        - train_model 함수 호출 : 위에 정의된 model, data_loader, data_loader_test, optimizer, device 와 에폭 수(num_epochs),
          저장 디랙터리(save_dir)를 입력값으로 설정
   - 예측 시각화
        - img_path : 예측할 이미지 경로
        - visualize_predictions 함수 호출 예측 및 시각화
   - 최종 모델 저장 (추가 저장)
        - 최종적인 모델 가중치를 'mask_rcnn_parking_final.pth' 파일로 저장

In [None]:
if __name__ == "__main__":
    # GPU 사용 가능 여부 확인
    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

    # 데이터셋 로드 (전체 데이터셋 사용)
    dataset = ParkingDataset(root='/home/elicer/merged_data/merged_segmentimage',
                             annotation='/home/elicer/merged_data/merged_coco.json',
                             transforms=get_transform(train=True)
                             )

    # 학습 데이터셋과 테스트 데이터셋 나누기 (8:2 비율)
    train_size = int(0.8 * len(dataset))
    test_size = len(dataset) - train_size
    dataset_train, dataset_test = random_split(dataset, [train_size, test_size])

    # 데이터 로더 생성
    data_loader = DataLoader(dataset_train, batch_size=32, shuffle=True, num_workers=4, collate_fn=collate_fn)
    data_loader_test = DataLoader(dataset_test, batch_size=32, shuffle=False, num_workers=4, collate_fn=collate_fn)

    # 사전 학습된 Mask R-CNN 모델 로드 및 수정
    model = maskrcnn_resnet50_fpn(weights="DEFAULT")

    # 모델의 출력층을 커스터마이즈 (배경 포함 5개의 클래스)
    num_classes = 5  # background + 4 classes
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    model.roi_heads.mask_predictor = MaskRCNNPredictor(
        model.roi_heads.mask_predictor.conv5_mask.in_channels, 256, num_classes)

    model.to(device)

    # 사용자에게 가중치 파일을 직접 지정하여 불러오기
    load_weights = input(
        "Enter the name of the weights file to load (leave blank to start training from scratch): ").strip()

    if load_weights:
        weight_path = os.path.join('model_weights', load_weights)
        if os.path.exists(weight_path):
            model.load_state_dict(torch.load(weight_path))
            print(f"Model loaded from {weight_path}")
        else:
            print(f"File {weight_path} does not exist. Starting training from scratch.")
    else:
        print("Starting training from scratch...")

    # 옵티마이저 정의 (Adam으로 변경)
    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.Adam(params, lr=0.001, betas=(0.9, 0.999), eps=1e-08)

    # 모델 학습
    training_complete = train_model(model, data_loader, data_loader_test, optimizer, device, num_epochs=10,
                                    save_dir='model_weights')

    # 예측 시각화
    img_path = '/home/elicer/parking.jpg'  
    visualize_predictions(model, img_path, device)

    # 최종 모델 저장 (추가 저장)
    final_save_path = os.path.join('model_weights', f'mask_rcnn_parking_final.pth')
    torch.save(model.state_dict(), final_save_path)
    print(f"Final model saved to {final_save_path}")
