<a href="https://colab.research.google.com/github/SJin765/class_AI4dl/blob/main/Team_project/code_220523_v1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Loading File

In [None]:
import os

# 현재 작업 디렉토리의 파일 경로
file_path = os.path.join(os.getcwd(), 'drive', 'MyDrive','open.zip')

# 경로 출력
print(file_path)

/content/drive/MyDrive/open.zip


In [None]:
# 구글 드라이브 마운트 하기 (필수!!)
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Extract open.zip File (이미 압축해제 후 드라이브 저장 완료)

import zipfile

zip_file_path = '/content/drive/MyDrive/open.zip'
extract_path = '/content/drive/MyDrive/extract/folder'

# 압축 파일 열기
with zipfile.ZipFile(zip_file_path, 'r') as zip_file:
    # 압축 파일 내 폴더 목록 확인
    folder_list = [name for name in zip_file.namelist() if name.endswith('/')]

    # 폴더 내용물 확인
    for folder_name in folder_list:
        print(f"Folder: {folder_name}")
        file_list = zip_file.namelist()

        # 폴더 내 파일 목록 확인
        for file_name in file_list:
            if file_name.startswith(folder_name) and not file_name.endswith('/'):
                print(f"File: {file_name}")

    # 압축 파일 해제
    zip_file.extractall(extract_path)

# Using Pytorch : Base code

## Import Packages

In [None]:
# 경고메시지 무시
import warnings
warnings.filterwarnings(action='ignore')

In [None]:
# 패키지 설치 (Pytorch 이용)
import random
import pandas as pd
import numpy as np
import os
import glob
import cv2
import matplotlib.pyplot as plt

import torch
import torch.nn as nn

import torchvision
import torchvision.models as models
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor, FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.models.detection.backbone_utils import resnet_fpn_backbone

import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import cv2
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
from tqdm.auto import tqdm

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

## Hyperparameter Setting

In [None]:
CFG = {
    'NUM_CLASS':34,
    'IMG_SIZE':512, #512고정
    'EPOCHS':10,
    'LR':3e-4, #Learning rate
    'BATCH_SIZE':16,
    'SEED':41
}

최적화 필요

## Fixed Random_seed

In [None]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

## Visualization

### Box 그리기

In [None]:
def draw_boxes_on_image(image_path, annotation_path):
    # 이미지 불러오기
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # txt 파일에서 Class ID와 Bounding Box 정보 읽기
    with open(annotation_path, 'r') as file:
        lines = file.readlines()

    for line in lines:
        values = list(map(float, line.strip().split(' ')))
        class_id = int(values[0])
        x_min, y_min = int(round(values[1])), int(round(values[2]))
        x_max, y_max = int(round(max(values[3], values[5], values[7]))), int(round(max(values[4], values[6], values[8])))

        # 이미지에 바운딩 박스 그리기
        cv2.rectangle(image, (x_min, y_min), (x_max, y_max), (255, 0, 0), 2)
        cv2.putText(image, str(class_id), (x_min, y_min - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)

    # 이미지와 바운딩 박스 출력
    plt.figure(figsize=(25, 25))
    plt.imshow(image)
    plt.show()
    

In [None]:
# 파일 경로 설정 - 첫번째 파일
train_path = '/content/drive/MyDrive/extract/folder/train'
test_path = '/content/drive/MyDrive/extract/folder/test'
img_name = '/syn_00000.png'
ant_name = '/syn_00000.txt'
image_file = train_path + img_name
annotation_file = train_path + ant_name
# 함수 실행
draw_boxes_on_image(image_file, annotation_file)

In [None]:
# 파일 경로 설정 - 두번째 파일
train_path = '/content/drive/MyDrive/extract/folder/train'
img_name = '/syn_00001.png'
ant_name = '/syn_00001.txt'
image_file = train_path + img_name
annotation_file = train_path + ant_name
# 함수 실행
draw_boxes_on_image(image_file, annotation_file)

## Custom dataset

In [None]:
#collate_fn : 주어진 이미지 - target box, target label추출
def collate_fn(batch): #Pytorch의 dataloader에서 batch를 처리하기 위한 함수
    images, targets_boxes, targets_labels = tuple(zip(*batch))
    images = torch.stack(images, 0)
    targets = []
   
    for i in range(len(targets_boxes)):
        target = {
            "boxes": targets_boxes[i],
            "labels": targets_labels[i]
        }
        targets.append(target)

    return images, targets

In [None]:
class CustomDataset(Dataset):
    def __init__(self, root, train=True, transforms=None):
        self.root = root #폴더경로 받기
        self.train = train
        self.transforms = transforms
        self.imgs = sorted(glob.glob(root+'/*.png')) #png파일은 모두 저장
        
        if train:
            self.boxes = sorted(glob.glob(root+'/*.txt')) #txt파일 모두 저장

    def parse_boxes(self, box_path): #box_path : annotation 경로
        with open(box_path, 'r') as file:
            lines = file.readlines()

        boxes = [] 
        labels = []

        for line in lines:
            values = list(map(float, line.strip().split(' ')))
            class_id = int(values[0])
            x_min, y_min = int(round(values[1])), int(round(values[2]))
            x_max, y_max = int(round(max(values[3], values[5], values[7]))), int(round(max(values[4], values[6], values[8])))

            boxes.append([x_min, y_min, x_max, y_max]) #박스좌표형태
            labels.append(class_id)
          #불러온 박스좌표와 레이블을 torch.tensor형태로
        return torch.tensor(boxes, dtype=torch.float32), torch.tensor(labels, dtype=torch.int64)

    #idx : 데이터셋에서 가져온 샘플의 인덱스
    def __getitem__(self, idx):
        img_path = self.imgs[idx] #이미지 경로
        img = cv2.imread(self.imgs[idx]) #cv2이용하여 읽고 0~1범위의 float32형태로 정규화
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32)
        img /= 255.0 #255size
        height, width = img.shape[0], img.shape[1]

        if self.train:
            box_path = self.boxes[idx]
            boxes, labels = self.parse_boxes(box_path)
            labels += 1 # Background = 0

            if self.transforms is not None:
                transformed = self.transforms(image=img, bboxes=boxes, labels=labels)
                img, boxes, labels = transformed["image"], transformed["bboxes"], transformed["labels"]
                
            return img, torch.tensor(boxes, dtype=torch.float32), torch.tensor(labels, dtype=torch.int64)

        else:
            if self.transforms is not None:
                transformed = self.transforms(image=img)
                img = transformed["image"]
            file_name = img_path.split('/')[-1]
            return file_name, img, width, height

    def __len__(self):
        return len(self.imgs)

In [None]:
def get_train_transforms():
    return A.Compose([
        A.Resize(CFG['IMG_SIZE'], CFG['IMG_SIZE']),
        ToTensorV2(),
    ], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels']))

def get_test_transforms():
    return A.Compose([
        A.Resize(CFG['IMG_SIZE'], CFG['IMG_SIZE']),
        ToTensorV2(),
    ])

In [None]:
train_dataset = CustomDataset(train_path, train=True, transforms=get_train_transforms())
test_dataset = CustomDataset(test_path, train=False, transforms=get_test_transforms())

In [None]:
# DataLoader 생성
train_loader = DataLoader(train_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False)

## Define Model : Faster-RCNN

In [None]:
# Faster-RCNN 모델
def build_model(num_classes=CFG['NUM_CLASS']+1):
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes)
    return model

## Train & Validation

In [None]:
def train(model, train_loader, optimizer, scheduler, device):
    model.to(device)

    best_loss = 9999999
    best_model = None
    
    for epoch in range(1, CFG['EPOCHS']+1):
        model.train()
        train_loss = []
        for images, targets in tqdm(iter(train_loader)):
            images = [img.to(device) for img in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            
            optimizer.zero_grad()

            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())

            losses.backward()
            optimizer.step()

            train_loss.append(losses.item())

        if scheduler is not None:
            scheduler.step()
        
        tr_loss = np.mean(train_loss)

        print(f'Epoch [{epoch}] Train loss : [{tr_loss:.5f}]\n')
        
        if best_loss > tr_loss:
            best_loss = tr_loss
            best_model = model
    
    return best_model

In [None]:
model = build_model()

optimizer = torch.optim.Adam(model.parameters(), lr=CFG['LR'])
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

Downloading: "https://download.pytorch.org/models/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth" to /root/.cache/torch/hub/checkpoints/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth
100%|██████████| 160M/160M [00:01<00:00, 99.0MB/s]


In [None]:
infer_model = train(model, train_loader, optimizer, scheduler, device)

  0%|          | 0/406 [00:00<?, ?it/s]

## Inference & Submission

In [None]:
import torch
import torchvision.models as models

# 모델 저장
torch.save(model, 'base_model.pth')

In [None]:
#추후 loading
#loaded_model = torch.load('model.pth')

In [None]:
def box_denormalize(x1, y1, x2, y2, width, height):
    x1 = (x1 / CFG['IMG_SIZE']) * width
    y1 = (y1 / CFG['IMG_SIZE']) * height
    x2 = (x2 / CFG['IMG_SIZE']) * width
    y2 = (y2 / CFG['IMG_SIZE']) * height
    return x1.item(), y1.item(), x2.item(), y2.item()

def inference(model, test_loader, device):
    model.eval()
    model.to(device)
    
    results = pd.read_csv('drive/MyDrive/extract/folder/sample_submission.csv')

    for img_files, images, img_width, img_height in tqdm(iter(test_loader)):
        images = [img.to(device) for img in images]

        with torch.no_grad():
            outputs = model(images)

        for idx, output in enumerate(outputs):
            boxes = output["boxes"].cpu().numpy()
            labels = output["labels"].cpu().numpy()
            scores = output["scores"].cpu().numpy()

            for box, label, score in zip(boxes, labels, scores):
                x1, y1, x2, y2 = box
                x1, y1, x2, y2 = box_denormalize(x1, y1, x2, y2, img_width[idx], img_height[idx])
                results = results.append({
                    "file_name": img_files[idx],
                    "class_id": label-1,
                    "confidence": score,
                    "point1_x": x1, "point1_y": y1,
                    "point2_x": x2, "point2_y": y1,
                    "point3_x": x2, "point3_y": y2,
                    "point4_x": x1, "point4_y": y2
                }, ignore_index=True)

    # 결과를 CSV 파일로 저장
    results.to_csv('/content/drive/MyDrive/extract/folder/baseline_submit.csv', index=False)
    print('Done.')

In [None]:
inference(infer_model, test_loader, device)

 데이콘에 저장된 .csv 파일을 제출하여 accuracy 확인

# Using keras tensorflow?

수업에서 Tensorflow를 이용한 object detection 를 했던거같기도 하고

# Test dataset test

In [None]:
predict = pd.read_csv('drive/MyDrive/extract/folder/baseline_submit.csv')

In [None]:
predict

Unnamed: 0,file_name,class_id,confidence,point1_x,point1_y,point2_x,point2_y,point3_x,point3_y,point4_x,point4_y
0,064442001.png,7,0.974970,1139.299072,187.208511,1434.458862,187.208511,1434.458862,488.342621,1139.299072,488.342621
1,064507368.png,27,0.719952,1051.268066,0.000000,1242.251831,0.000000,1242.251831,156.810089,1051.268066,156.810089
2,064507368.png,7,0.639494,495.682922,138.948288,758.156860,138.948288,758.156860,423.630127,495.682922,423.630127
3,064507368.png,19,0.613908,503.497742,121.485382,750.992310,121.485382,750.992310,417.015381,503.497742,417.015381
4,064507368.png,15,0.346194,503.294159,124.433746,752.900879,124.433746,752.900879,421.978149,503.294159,421.978149
...,...,...,...,...,...,...,...,...,...,...,...
15401,183928410.png,21,0.954212,1093.692383,30.119907,1351.610474,30.119907,1351.610474,314.890411,1093.692383,314.890411
15402,183928410.png,28,0.945929,683.260925,47.868477,923.469788,47.868477,923.469788,317.335480,683.260925,317.335480
15403,183928410.png,27,0.282097,1122.819092,52.531029,1344.326050,52.531029,1344.326050,302.190582,1122.819092,302.190582
15404,183928410.png,27,0.252784,682.231384,56.113010,912.275879,56.113010,912.275879,315.737701,682.231384,315.737701


# Memo

## Colab 데이터 저장하기 (Pytorch 패키지 이용)

In [None]:
import torch
import torchvision.models as models

# 모델 저장
torch.save(model, 'base_model.pth')

## Train & Validation 데이터세트 분리

In [None]:
# 예시 코드?

train_df, test_df = train_test_split(df, test_size = 0.2, shuffle = False)

## - hyperparameter 최적화
    - GridSearch CV ?
    - RandomSearch CV ?

In [None]:
## Gridsearch CV 예시코드

from sklearn.model_selection import GridSearchCV
from sklearn import svm

# 예시에 사용된 모델 : SVM
# SVM 모델의 하이퍼파라미터 : C, Gamma

# GridSearchCV를 위해 매개변수마다 그리드 설정
param_grid = {'C': [0.1, 1, 10], 'gamma': [0.001, 0.01, 0.1]}

# 모델 및 매개변수 그리드를 사용하여 GridSearchCV 객체 생성
model = svm.SVC()
grid_search = GridSearchCV(model, param_grid)

# GridSearchCV를 통한 최적의 하이퍼파라미터 탐색
grid_search.fit(X_train, y_train)

# 최적의 하이퍼파라미터와 평가 점수 출력
print("Best Parameters: ", grid_search.best_params_)
print("Best Score: ", grid_search.best_score_)

In [None]:
## Randomsearch CV 예시코드

from sklearn.model_selection import RandomizedSearchCV
from sklearn import svm
from scipy.stats import uniform

# RandomizedSearchCV를 위한 매개변수 분포 설정 (예시로 uniform dstn 설정함)
param_dist = {'C': uniform(loc=0, scale=4), 'gamma': uniform(loc=0, scale=0.1)}

# 모델 및 매개변수 분포를 사용하여 RandomizedSearchCV 객체 생성
model = svm.SVC()
random_search = RandomizedSearchCV(model, param_distributions=param_dist, n_iter=10)

# RandomizedSearchCV를 통한 최적의 하이퍼파라미터 탐색
random_search.fit(X_train, y_train)

# 최적의 하이퍼파라미터와 평가 점수 출력
print("Best Parameters: ", random_search.best_params_)
print("Best Score: ", random_search.best_score_)


## 이미지 전처리

### 사진 선명도 보정?

In [None]:
# Test file -> 실제 이미지 흐린 문제 있음
import cv2
import numpy as np
from matplotlib import pyplot as plt

image = cv2.imread(image_path) # image를 convolution layer로 변환
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

kernel = np.array([[0, -1, 0],
                   [-1, 5,-1],
                   [0, -1, 0]]) # 커널을 만듭니다.

# 이미지를 선명하게 만들어줌
image_sharp = cv2.filter2D(image, -1, kernel)

# 선명도 바꾼 이미지 출력
plt.imshow(image_sharp, cmap="JET"), plt.axis("off") # 이미지 출력
plt.show()

### 픽셀 정규화

- 픽셀 값 범위 일정하게 조정: 다양한 이미지에서 동일한 범위로 표현 -> 모델에 이미지 입력 용이

- 데이터 분포 조정: 픽셀 값의 분포를 조정(통계적 특성을 개선)


Test data를 살펴보니 이미지 크기가 모두 1920*1080인데 정규화를 하는 이유가 따로 있나?

In [None]:
# Min-Max 정규화
normalized_value = (pixel_value - min_value) / (max_value - min_value)

In [None]:
# Z-score 정규화
normalized_value = (pixel_value - mean_value) / std_deviation

 NumPy, OpenCV, scikit-image 등의 라이브러리에서 함수 제공하는것을 쓰는게 좋겠음

## Ensemble method 위한 모델 선택

- Faster RCNN : 예시 코드로 나와서 사용이 용이함
- YOLOv6
- InternImage-H
- TridentNet
- 그외?