# google mount

In [None]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


# data dir load

In [None]:
!mkdir -p "/content/dataset/images/train"
!mkdir -p "/content/dataset/images/val"
!mkdir -p "/content/dataset/labels/train_p2"
!mkdir -p "/content/dataset/labels/val_p2"
!unzip -q "/content/drive/MyDrive/Colab Notebooks/dataset/labels/TL_KS_LINE.zip" -d "/content/dataset/labels/train_p2"
!unzip -q "/content/drive/MyDrive/Colab Notebooks/dataset/labels/VL_KS_LINE.zip" -d "/content/dataset/labels/val_p2"
!unzip -q "/content/drive/MyDrive/Colab Notebooks/dataset/images/TS_KS.zip" -d "/content/dataset/images/train"
!unzip -q "/content/drive/MyDrive/Colab Notebooks/dataset/images/VS_KS.zip" -d "/content/dataset/images/val"


[1;30;43m스트리밍 출력 내용이 길어서 마지막 5000줄이 삭제되었습니다.[0m


# requirement

In [None]:
!pip install -q torch transformers datasets Pillow numpy matplotlib scikit-learn
!pip install -q tqdm
!pip install albumentations opencv-python-headless



# huggingface login

In [None]:
from huggingface_hub import login
login(new_session=False)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

# preprocess

In [None]:

import json
import numpy as np
import torch
from numpy import random
def load_json(json_path:str)->dict:
    with open(json_path, "r" , encoding="utf-8") as file:
        data = json.load(file)
    return data

def extract_bbox(json:dict)->tuple[list,list]:
    """
    Arg:
        json: law json dict data
    returns:
        ([[xmin,ymin,xmax,ymax]] (normalized) , [int]) albumentations format of bbox

    """
    data = list(json.values())[0]
    inner = data['regions']
    bbox = []
    height =[]
    for r in inner:
        r_ = r['shape_attributes']
        all_x =( int(r_['all_points_x'][0]) , int(r_['all_points_x'][1]))
        xmax = max(all_x)
        xmin = min(all_x)
        all_y =( int(r_['all_points_y'][0]) ,int( r_['all_points_y'][1]))
        ymax = max(all_y)
        ymin = min(all_y)

        img_height = float(data["file_attributes"]["img_height"])
        img_width = float(data["file_attributes"]["img_width"])
        bbox.append([xmin/img_width,ymin/img_height,xmax/img_width,ymax/img_height])
        height.append(float(r['region_attributes']['chi_height_m']))
    return(bbox ,height )




def get_enclosing_rect(point_pairs, w=60 / 512):

    enclosing_rects = []
    half_w = w / 2

    for pair in point_pairs:
        x1, y1, x2, y2 = pair

        xmin = min(x1, x2) - half_w
        ymin = min(y1, y2) - half_w
        xmax = max(x1, x2) + half_w
        ymax = max(y1, y2) + half_w
        xmin = max(xmin,0)
        xmax = min(xmax,1)
        ymin = max(ymin,0)
        ymax = min(ymax,1)

        enclosing_rects.append((xmin, ymin, xmax, ymax))

    return enclosing_rects
import numpy as np

def crop(image: np.ndarray, bbox: list) -> np.ndarray:
    masked_image = np.zeros_like(image)
    h, w = image.shape[:2]
    xmin = int(bbox[0] * w)
    ymin = int(bbox[1] * h)
    xmax = int(bbox[2] * w)
    ymax = int(bbox[3] * h)


    xmin = max(0, xmin)
    ymin = max(0, ymin)
    xmax = min(w, xmax)
    ymax = min(h, ymax)


    if xmin < xmax and ymin < ymax:
        masked_image[ymin:ymax, xmin:xmax] = image[ymin:ymax, xmin:xmax]

    return masked_image
def crop_and_adjust_bbox(

    imgs: torch.Tensor,
    bbox:torch.Tensor,
    padding_factor: float = 0.25,
    target_size: tuple = (224, 224)
) -> (torch.Tensor, torch.Tensor):
    crops = []
    new_bboxes_list = []
    img_h, img_w = imgs.shape[2:]

    for i in range(imgs.size(0)):

        nx1, ny1, nx2, ny2 = bbox[i].tolist()

        orig_x1 = int(nx1 * img_w)
        orig_y1 = int(ny1 * img_h)
        orig_x2 = int(nx2 * img_w)
        orig_y2 = int(ny2 * img_h)


        bbox_w = orig_x2 - orig_x1
        bbox_h = orig_y2 - orig_y1

        side_len = max(bbox_w, bbox_h)
        padded_side_len = int(side_len * (1 + padding_factor))

        slack_w = padded_side_len - bbox_w
        slack_h = padded_side_len - bbox_h


        offset_x = random.randint(0, max(0, slack_w))
        offset_y = random.randint(0, max(0, slack_h))

        crop_x1 = orig_x1 - offset_x
        crop_y1 = orig_y1 - offset_y
        crop_x2 = crop_x1 + padded_side_len
        crop_y2 = crop_y1 + padded_side_len


        shift_x = 0
        if crop_x1 < 0: shift_x = -crop_x1
        elif crop_x2 > img_w: shift_x = img_w - crop_x2

        shift_y = 0
        if crop_y1 < 0: shift_y = -crop_y1
        elif crop_y2 > img_h: shift_y = img_h - crop_y2

        crop_x1 += shift_x
        crop_x2 += shift_x
        crop_y1 += shift_y
        crop_y2 += shift_y


        cropped_img = imgs[i:i+1, :, crop_y1:crop_y2, crop_x1:crop_x2]


        new_bbox_x1 = orig_x1 - crop_x1
        new_bbox_y1 = orig_y1 - crop_y1
        new_bbox_x2 = orig_x2 - crop_x1
        new_bbox_y2 = orig_y2 - crop_y1

        crop_w = crop_x2 - crop_x1
        crop_h = crop_y2 - crop_y1


        final_nx1 = new_bbox_x1 / crop_w if crop_w > 0 else 0
        final_ny1 = new_bbox_y1 / crop_h if crop_h > 0 else 0
        final_nx2 = new_bbox_x2 / crop_w if crop_w > 0 else 0
        final_ny2 = new_bbox_y2 / crop_h if crop_h > 0 else 0

        new_bboxes_list.append(torch.tensor([final_nx1, final_ny1, final_nx2, final_ny2], device=imgs.device))


        resized_crop = nn.functional.interpolate(
            cropped_img, size=target_size, mode="bilinear", align_corners=False
        )
        crops.append(resized_crop)


    final_crops = torch.cat(crops, dim=0)
    final_bboxes = torch.stack(new_bboxes_list, dim=0)

    return final_crops, final_bboxes


# augmentation

In [None]:

import albumentations as A
from albumentations.pytorch import ToTensorV2
import numpy as np
from numpy.random import rand

class Transformer():
    def __init__(self):
        pass
    def __call__(self,image:np.ndarray,bbox:list ):
        image,bbox = self.hflip(image,bbox,p=0.5)
        image,bbox = self.vflip(image,bbox,p=0.5)
        image,bbox = self.gaussian_noise(image,bbox,p=0.3)

        return image,bbox
    def vflip(self,image,bbox,p):
        if(rand() < p):
            image = A.VerticalFlip(p=1)(image=image)['image']

            bbox = [bbox[0],1 - bbox[3] , bbox[2] , 1 - bbox[1]]
        return image,bbox
    def hflip(self,image,bbox,p):
        if(rand() < p):
            image = A.HorizontalFlip(p=1)(image=image)['image']

            bbox = [1-bbox[2],bbox[1] , 1-bbox[0] , bbox[3]]
        return image,bbox
    def gaussian_noise(self,image,bbox,p):
        if(rand()<p):
            image= A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=1)(image=image)['image']
        # if(rand() < p):
        #     image = A.GaussNoise(p=1)(image=image)['image']
        return image,bbox


# dataset

In [None]:

import torch
from torch.utils.data import Dataset
from pathlib import Path
import cv2
from PIL import Image
class ResDataset(Dataset):
    def __init__(self, img_dir , label_dir,preprocessor,device,transformer = None,w = 60/512):
        self.img_dir = Path(img_dir)
        self.label_dir = Path(label_dir)
        self.preprocessor =preprocessor
        self.transformer = transformer
        self.device = device
        self.datas =[] ##(img_path ,  bbox,height)
        for file in self.label_dir.glob('*.json'):
            json = load_json(file)
            bboxes,heights = extract_bbox(json)
            bboxes = get_enclosing_rect(bboxes,w)
            img_path = self.img_dir / (file.stem + '.jpg')
            for bbox,height in zip(bboxes,heights):
                self.datas.append((img_path,bbox,height))
        print('dataset:',len(self.datas))
    def __len__(self):
        return len(self.datas)
    def __getitem__(self,index)->dict[str,torch.tensor]:
        """
        Arg:
            index
        Returns:
            {'image':tensor[512,512] ,'crop','bbox':tensor[4],'height':tensor[1] }

        """
        data = self.datas[index]

        # img = Image.open(data[0])
        img = cv2.imread(data[0]) #[h,w,channel]
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        if self.transformer:

            img,bbox = self.transformer(image=img , bbox=data[1])
        else:
            bbox = data[1]

        # croped_img = crop(img,bbox)
        img = self.preprocessor(img,return_tensors = 'pt',device = 'cpu')['pixel_values'].squeeze()
        # croped_img = self.preprocessor(croped_img,return_tensors = 'pt',device = 'cpu')['pixel_values'].squeeze()
        bbox = torch.tensor(bbox,dtype=torch.float32).to(self.device)

        height = torch.tensor(data[2],dtype=torch.float32).to(self.device)

        return {
            'img':img,
            # 'crop':croped_img,

            'bbox': bbox,
            'height':height
        }


# metric calculater

In [None]:

import torch
import numpy as np
import json
import matplotlib.pyplot as plt
from typing import List
from pathlib import Path
class MetricController():
    def __init__(self):
        self.state_dic: dict[str,list[np.ndarray]] = {} #[batch , batch ~]
        self.best:dict[str,tuple[int,float]]={}
    def reset(self):
        self.state_dic = {}
        self.best = {}
    def add(self,metric_name:str , value:list[float],epoch:int)->None:
        """
        Args:
            metric_name: mse or mae ~
            values: 1 epoch losses list
            epoch
        """

        if metric_name not in self.state_dic.keys():
            v = torch.tensor(value).cpu().numpy()
            self.state_dic[metric_name] = [v]
            self.best[metric_name] = (0,self.mean(metric_name,v))
        else:
            v = torch.tensor(value).cpu().numpy()
            self.state_dic[metric_name].append(v)
            if (self.best[metric_name][1] > self.mean(metric_name,v)):
                self.best[metric_name] = (epoch,self.mean(metric_name,v))
    def check(self,metric_name):
        if metric_name not in self.state_dic.keys():
            print(metric_name,'is not in ',self.state_dic.keys())
            return False
        return True
    def recent_mean(self,metric_name:str)->float:
        """
        recent epoch mean loss
        """
        if self.check(metric_name):
            arr = self.state_dic[metric_name][-1]
            return self.mean(metric_name,arr)
        else:
            print('error')
            return
    def show_keys(self):
        print('keys in metriccontroller:',self.state_dic.keys())
    def best_mean(self,metric_name)->tuple[int , float]:
        """
        Args:
        return:
            (index(epoch) , best loss)
        """

        return self.best[metric_name]

    def mean(self,metric_name,losses:np.ndarray)->float:
      if(metric_name =="RMSE"):
        return float(np.sqrt(np.mean(losses)))
      else:
        return float(np.mean(losses))
    def plot(self,metric_name,path,name =''):
        """
        Args:
            path: img folder path (not file path)
            name: name of the experiment
        """
        if(self.check(metric_name) == False):
            print('error')
            return
        train_log = self.state_dic[metric_name]
        val_log = self.state_dic[metric_name]
        train_log = [self.mean(metric_name,epoch_losses) for epoch_losses in train_log]
        val_log = [self.mean(metric_name,epoch_losses) for epoch_losses in val_log]
        epochs = range(1,len(train_log) + 1)
        plt.figure(figsize=(10, 6))
        plt.plot(epochs, train_log, 'o-', label='Train Loss')
        plt.plot(epochs, val_log, 'o-', label='Validation Loss')
        plt.title('Training & Validation '+metric_name +' loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.grid(True)
        plt.legend()
        path = Path(path) / (name + 'log_img.jpg')
        plt.savefig(path)
        plt.close()
    def save(self,metrci_name,path,name = ''):
        if self.check(metrci_name):
            path = Path(path) / (name + ' ' + metrci_name + ' log.json')
            temp = [arr.tolist() for arr in self.state_dic[metrci_name]]
            with open(path,'w',encoding='utf-8') as f:
                json.dump(temp,f,indent =4)
                print('trainning log saved at',path)
    def get_log(self,metric_name):
        if(self.check(metric_name)):
            return self.state_dic[metric_name]






# model

In [None]:

import torch
import torch.nn as nn
from transformers import AutoImageProcessor , AutoModel

class ResNetRegressor(nn.Module):
    def __init__(self, checkpoint,freeze_resnet = False ,head_layers = 3,head_dim = 512):
        super().__init__()
        self.is_train = True
        self.image_encoder1 = AutoModel.from_pretrained(checkpoint) #for original image


        self.processor = AutoImageProcessor.from_pretrained(checkpoint)
        if freeze_resnet:
            print('freeze resnet')
            for param in self.image_encode1r.parameters():
                param.requires_grad = False

        num_image_features = self.image_encoder1.config.hidden_sizes[-1]
#
        bbox_input_dim = 4
        bbox_embedded_dim = 64
        self.bbox_encoder = nn.Sequential(
            nn.Linear(bbox_input_dim,32),
            nn.ReLU(),
            nn.Linear(32,bbox_embedded_dim)
        )
        combined_dim = num_image_features  + bbox_embedded_dim
        # combined_dim = 2*num_image_features
        self.regressor = []
        in_dim = combined_dim
        for _ in range(head_layers -1):
            self.regressor.append(nn.Linear(in_dim , head_dim))
            self.regressor.append(nn.ReLU())
            self.regressor.append(nn.Dropout(0.5))
            in_dim = head_dim
            head_dim = head_dim //2

        self.regressor.append(nn.Linear(in_dim,1))
        self.regressor = nn.Sequential(*self.regressor)
    def set_is_train(self,t):
        self.is_train = t
    def forward(self, image,bbox):
        if self.is_train and (rand() < 0.3):
            image,bbox = crop_and_adjust_bbox(imgs=image,bbox=bbox)
        image_feature = self.image_encoder1(image).pooler_output.squeeze(dim = (2,3))    #[batch , num_image_features]


        bbox_feature = self.bbox_encoder(bbox) #[batch , 64]

        feature = torch.cat([image_feature,bbox_feature],dim = 1)
        # feature = torch.cat([image_feature,crop_feature],dim=1)
        height_prediction = self.regressor(feature)
        return height_prediction.squeeze(-1)





# trainer

In [None]:

import torch
import torch.optim as optim
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
from pathlib import Path
import numpy as np
import json
class Trainer():
    def __init__(self,device,config,
                checkpoint='microsoft/resnet-50'
                ):
        self.checkpoint = checkpoint
        self.device = device
        self.config  = config
        self.load_items()
        self.start_epoch = 0

    def load_items(self):
        self.model =ResNetRegressor(checkpoint=self.checkpoint).to(self.device)
        self.processor = self.model.processor
        self.train_transform = Transformer()

        train_dataset = ResDataset(img_dir=self.config['train_img_dir'] ,
                                   label_dir= self.config['train_label_dir'],
                                   preprocessor=self.processor,
                                   device = self.device,
                                   transformer=self.train_transform,
                                   w = self.config['w'])
        val_dataset = ResDataset(img_dir=self.config['validation_img_dir'] ,
                                 label_dir= self.config['validation_label_dir'],
                                 preprocessor=self.processor,
                                 device = self.device,
                                 w = self.config['w'])
        self.loss = torch.nn.MSELoss() ## 평가는 RMSE
        self.optimizer = optim.Adam(self.model.parameters(),lr = self.config['lr'],weight_decay=1e-5)
        self.train_dataloader = DataLoader(train_dataset,batch_size=self.config['batch'],shuffle=True)
        self.val_dataloader = DataLoader(val_dataset,batch_size = self.config['batch'],shuffle = True)
        self.train_state = MetricController()
        self.val_state = MetricController()
    def train(self):
        cnt = 0
        print('train start\n')
        self.train_state.reset()
        self.model.set_is_train(True)
        for _ in range(self.start_epoch,self.config['epoch']):
            self.train_epoch(_)

            print('epoch:',_  , ',RMSE:',self.train_state.recent_mean('RMSE'))
            val_loss = self.validation(_)
            if(val_loss <= self.val_state.best_mean('MSE')[1]):
                print('best model at epoch ',_)
                self.save(epoch = _)
                cnt = 0
            else:
              cnt = cnt+1
            if(cnt >= self.config['early_stop']):
              print('early stop')
              break
        print('trainning end')
        print('best train results(epoch,RMSE loss) = ',self.train_state.best_mean('RMSE'))
        print('best val results(epoch,RMSE loss) = ',self.val_state.best_mean('RMSE'))

        self.plot('RMSE')

        self.save_log('RMSE')
    def train_epoch(self,epoch):
        MSE_losses = []
        self.model.train()
        for data in self.train_dataloader:
            self.optimizer.zero_grad()
            img = data['img'].to(self.device)
            # crop = data['crop'].to(self.device)
            bbox = data['bbox'].to(self.device)
            height = data['height'].to(self.device)

            out = self.model(img,bbox)
            loss = self.loss(out.squeeze(-1),height)
            MSE_losses.append(loss.item())
            loss.backward()
            self.optimizer.step()
        self.train_state.add('MSE',MSE_losses,epoch)
        self.train_state.add('RMSE',MSE_losses,epoch)
    def validation(self,epoch = 0):
        self.model.set_is_train(False)
        MSE_losses =[]
        self.model.eval()
        with torch.no_grad():
            for data in self.val_dataloader:
                img = data['img'].to(self.device)
                bbox = data['bbox'].to(self.device)
                # crop = data['crop'].to(self.device)
                height = data['height'].to(self.device)
                out = self.model(img,bbox)
                loss = self.loss(out.squeeze(-1),height)
                MSE_losses.append(loss.item())

            self.val_state.add("MSE",MSE_losses,epoch)
            self.val_state.add("RMSE",MSE_losses,epoch)
            print('val: ',', RMSE=',self.val_state.recent_mean("RMSE"))

        return self.val_state.recent_mean("MSE")

    def plot(self,metric_name):
        train_log = self.train_state.get_log(metric_name)
        val_log = self.val_state.get_log(metric_name)
        train_log = [self.train_state.mean(metric_name,epoch_losses) for epoch_losses in train_log]
        val_log = [self.val_state.mean(metric_name,epoch_losses) for epoch_losses in val_log]
        epochs = range(1,len(train_log) + 1)
        plt.figure(figsize=(10, 6))
        plt.plot(epochs, train_log, 'o-', label='Train Loss')
        plt.plot(epochs, val_log, 'o-', label='Validation Loss')
        plt.title('Training & Validation '+metric_name +' loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.grid(True)
        plt.legend()
        path = Path(self.config['img_path']) / (self.config['name'] + ' ' + metric_name+ ' log_img.jpg')
        plt.savefig(path)
        plt.close()

    def save_log(self,metric_name):
        train_log = self.train_state.get_log(metric_name)
        val_log = self.val_state.get_log(metric_name)
        path = Path(self.config['log_path']) / (self.config['name'] + ' ' + metric_name + ' log.json')
        t = [arr.tolist() for arr in train_log]
        v = [arr.tolist() for arr in val_log]
        dic = {'train':t,'validation':v}
        with open(path,'w',encoding='utf-8') as f:
                json.dump(dic,f,indent =4)
                print('trainning log saved at',path)
    def save(self,epoch,path = 'best_model.pth'):
        p = Path(self.config['checkpoint']) /(self.config['name']+ path)
        torch.save({
        'epoch': epoch,
        'model_state_dict': self.model.state_dict(),
        'optimizer_state_dict': self.optimizer.state_dict(),
    }, p)
    def load_trained_model(self,path = 'best_model.pth'):
        p = Path(self.config['checkpoint']) /(self.config['name']+ path)
        c = torch.load(p)
        self.model.load_state_dict(c['model_state_dict'])
        self.optimizer.load_state_dict(c['optimizer_state_dict'])
        self.start_epoch = c['epoch']


# config

In [None]:
config = {
  #  'model':'facebook/dinov3-vitl16-pretrain-sat493m',
  'model':'microsoft/resnet-18',
  'processor':'microsoft/resnet-34',#not used
  'train_img_dir':'/content/dataset/images/train',
  'validation_img_dir':'/content/dataset/images/val',
  'train_label_dir':'/content/dataset/labels/train_p2',
  'validation_label_dir':'/content/dataset/labels/val_p2',
  'img_path':'/content/drive/MyDrive/Colab Notebooks/p2_resnet/img',
  'log_path':'/content/drive/MyDrive/Colab Notebooks/p2_resnet/log',
  'checkpoint':'/content/drive/MyDrive/Colab Notebooks/p2_resnet/checkpoint',
  'name':'p2_resnet34_no_val_crop_001',
  'epoch':60,
  'lr':0.0001,
  'batch':320,
  'early_stop':5,
  'w':60/512
}

# train

In [None]:
device = torch.device("cuda")
trainer = Trainer(device,config,checkpoint=config['model'])
trainer.train()


dataset: 10590
dataset: 1323
train start



  return self.preprocess(images, **kwargs)


epoch: 0 ,RMSE: 114.3176498413086
val:  , RMSE= 83.4245376586914
best model at epoch  0
epoch: 1 ,RMSE: 99.06150817871094
val:  , RMSE= 93.68115997314453
epoch: 2 ,RMSE: 75.90862274169922
val:  , RMSE= 66.38504791259766
best model at epoch  2
epoch: 3 ,RMSE: 51.03300094604492
val:  , RMSE= 36.83180236816406
best model at epoch  3
epoch: 4 ,RMSE: 33.93246078491211
val:  , RMSE= 40.15113830566406
epoch: 5 ,RMSE: 26.852033615112305
val:  , RMSE= 30.64974594116211
best model at epoch  5
epoch: 6 ,RMSE: 24.913663864135742
val:  , RMSE= 27.964431762695312
best model at epoch  6
epoch: 7 ,RMSE: 23.872955322265625
val:  , RMSE= 30.578548431396484
epoch: 8 ,RMSE: 23.697004318237305
val:  , RMSE= 29.35548973083496
epoch: 9 ,RMSE: 22.60011100769043
val:  , RMSE= 27.75360870361328
best model at epoch  9
epoch: 10 ,RMSE: 22.35025405883789
val:  , RMSE= 26.418031692504883
best model at epoch  10
epoch: 11 ,RMSE: 21.19952964782715
val:  , RMSE= 25.43686294555664
best model at epoch  11
epoch: 12 ,RMS