In [2]:
from torch_snippets import *
from PIL import Image
import glob
images_path = '.././data/aug_img/'
df = pd.read_csv('.././data/df.csv',encoding = 'utf-8',index_col=0)

In [7]:
label2target = {l:t+1 for t,l in enumerate(df['Label'].unique())}
label2target['background'] = 0
target2label = {t:l for l,t in label2target.items()}
background_class = label2target['background']
num_classes = len(label2target)
print('class 개수 : {}'.format(num_classes))

class 개수 : 750


In [6]:
# 이미지 형태가 w,h,channel 형태로 되어있어서 channerl,w,h로 바꿔줘야한다.
def preprocess_image(img):
    img = torch.tensor(img).permute(2,0,1)
    return img.to(device).float()

In [19]:
# 데이터셋 열어주는 class인데, torch.utils.data.Dataset를 상속받아준다.
class OpenDataset(torch.utils.data.Dataset):
    # OpenDataset 속성
    w, h = 224, 224
    
    def __init__(self, df : pd.DataFrame, image_dir : str = images_path):
        """ 객체 생성 함수

        Args:
            df (pd.DataFrame): 이미지 정보들이 저장되어 있는 데이터 프레임을 넣어주면 된다.
            image_dir (str, optional): 이미지 경로를 넣어주면 된다.. Defaults to images_path.
        """        
        self.image_dir = image_dir

        # 이미지 경로를 받아오면 경로에 있는 이미지 파일들을 self.files 라는 객체 속성값으로 받아준다.
        self.files = glob.glob(self.image_dir+'/*')
        self.df = df

        # image_infos 객체 속성값으로 데이터 프레임의 ImageID 의 unique 값을 넣어준다. 
        self.image_infos = df.Image_Name.unique()

    def __getitem__(self, ix : int) -> torch.tensor :
        """ index 접근 함수

        Args:
            ix (int): instance에 접근하고 싶은 인덱스

        Returns:
            torch.tensor: 해당 instance 의 index 번째에 위치한 이미지 
            torch.tensor: 해당 instance 의 index 번째에 위치한 target
        """        
        # 객체 instance의 ix 번째에 위치한 이미지 이름을 먼저 받아온다.
        image_id = self.image_infos[ix]

        # 예를 들어, image_id 가 '100.jpg' 라면 self.files 안에서 '100.jpg' 가 포함된 '.././data/aug_img/100.jpg' 를 출력해준다.
        img_path = find(image_id, self.files)

        # 이미지를 열어주는 RGB값 변환 후 열어준다.
        img = Image.open(img_path).convert("RGB")

        # 이미지 크기를 class 속성에 미리 선언해준 224, 224로 변환해주고 resampling 은 BILINEAR 기법을 이용한다.
        # 이후 이미지 색상 값들을 255로 나눠 정규화 실시
        img = np.array(img.resize((self.w, self.h), resample=Image.BILINEAR))/255.

        # 데이터 프레임에서 ImageID가 imgae_id(예를 들어, '100.jpg') 인 것들을 찾아 data에 할당한다.
        data = df[df['ImageID'] == image_id]

        # data에 할당된 데이터프레임에서 'Label'에 해당하는 것들을 찾아 labels에 담아준다.
        labels = data['Label'].values.tolist()

        # data에서 Xmin, Ymin, XMax, YMax 를 찾아 값들을 data에 재할당한다.
        data = data[['Xmin','Ymin','XMax','YMax']].values

        # 현재 data에는 resize 되기 전 값이 들어가 있기 때문에 사이즈에 맞게 변형해줘야한다.
        data[:,[0,2]] *= (self.w/1500)
        data[:,[1,3]] *= (self.h/1000)

        # data는 어차피 양의 정수이기 때문에 uint32로 변형하고 list에 담아준다.
        boxes = data.astype(np.uint32).tolist() 

        # torch에서 bbbox 정보는 dictionary 형식을 받는다.
        target = {}

        # torch['boxes'] 는 boundingbox의 Xmin, Ymin, XMax, YMax 값들이 들어가 있다.
        target["boxes"] = torch.Tensor(boxes).float()

        # torch['labels'] 는 labels에 있는 값들을 숫자형태로 변형시켜준다.
        target["labels"] = torch.Tensor([label2target[i] for i in labels]).long()

        # 현재 이미지는 w,h,channel로 들어가있기 때문에 channel, w, h 의 형태로 바꿔준다.
        img = preprocess_image(img)

        return img, target

    def collate_fn(self, batch):
        return tuple(zip(*batch)) 

    #instance의 길이정보를 원하면 이미지들의 개수를 출력해준다.
    def __len__(self):
        return len(self.image_infos)

In [20]:
df

Unnamed: 0,Image_Name,Json_Name,Label,Xmin,Ymin,XMax,YMax
0,0.jpg,0.json,신용카드전표,1501.68640,826.04950,1992.25500,1095.79480
1,0.jpg,0.json,고객용,2019.32010,863.77844,2269.65280,1115.33030
2,0.jpg,0.json,영수증,1248.48500,1145.79300,1478.32530,1276.87150
3,0.jpg,0.json,20211226,1568.83960,1160.13260,1884.71330,1285.59910
4,0.jpg,0.json,01,1901.62120,1186.74160,1983.29300,1287.05030
...,...,...,...,...,...,...,...
4107182,66933.jpg,66933.json,1,481.45120,747.88367,485.40433,757.10864
4107183,66933.jpg,66933.json,층,486.41376,747.62000,493.54416,758.26890
4107184,66933.jpg,66933.json,대표자,443.89963,758.25130,469.56470,769.21027
4107185,66933.jpg,66933.json,이진희,477.60703,757.58620,503.55720,769.28810


In [23]:
# 데이터 나눠주기
from sklearn.model_selection import train_test_split

# 데이터 프레임의 df.Image_Name 들의 unique 값들을 기준으로 test_size는 0.1로 하여 나눈다.
trn_ids, val_ids = train_test_split(df.Image_Name.unique(), test_size=0.1, random_state=99)

# train_df, val_df 를 나눠준다.
trn_df, val_df = df[df['Image_Name'].isin(trn_ids)], df[df['Image_Name'].isin(val_ids)]

print(len(trn_df), len(val_df))

train_ds = OpenDataset(trn_df)
test_ds = OpenDataset(val_df)

train_loader = DataLoader(train_ds, batch_size=4, collate_fn=train_ds.collate_fn, drop_last=True)
test_loader = DataLoader(test_ds, batch_size=4, collate_fn=test_ds.collate_fn, drop_last=True)

3702977 404210


In [24]:
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

device = 'cuda' if torch.cuda.is_available() else 'cpu'

def get_model():
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model

In [25]:
# Defining training and validation functions for a single batch
def train_batch(inputs, model, optimizer):
    model.train()
    input, targets = inputs
    input = list(image.to(device) for image in input)
    targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
    optimizer.zero_grad()
    losses = model(input, targets)
    loss = sum(loss for loss in losses.values())
    loss.backward()
    optimizer.step()
    return loss, losses

@torch.no_grad() # this will disable gradient computation in the function below
def validate_batch(inputs, model):
    model.train() # to obtain the losses, model needs to be in train mode only. # #Note that here we are not defining the model's forward method 
#and hence need to work per the way the model class is defined
    input, targets = inputs
    input = list(image.to(device) for image in input)
    targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

    optimizer.zero_grad()
    losses = model(input, targets)
    loss = sum(loss for loss in losses.values())
    return loss, losses

In [26]:
model = get_model().to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=0.005,
                            momentum=0.9, weight_decay=0.0005)
n_epochs = 5
log = Report(n_epochs)

In [None]:
for epoch in range(n_epochs):
    _n = len(train_loader)
    for ix, inputs in enumerate(train_loader):
        loss, losses = train_batch(inputs, model, optimizer)
        loc_loss, regr_loss, loss_objectness, loss_rpn_box_reg = \
            [losses[k] for k in ['loss_classifier','loss_box_reg','loss_objectness','loss_rpn_box_reg']]
        pos = (epoch + (ix+1)/_n)
        log.record(pos, trn_loss=loss.item(), trn_loc_loss=loc_loss.item(), 
                   trn_regr_loss=regr_loss.item(), trn_objectness_loss=loss_objectness.item(),
                   trn_rpn_box_reg_loss=loss_rpn_box_reg.item(), end='\r')

    _n = len(test_loader)
    for ix,inputs in enumerate(test_loader):
        loss, losses = validate_batch(inputs, model)
        loc_loss, regr_loss, loss_objectness, loss_rpn_box_reg = \
          [losses[k] for k in ['loss_classifier','loss_box_reg','loss_objectness','loss_rpn_box_reg']]
        pos = (epoch + (ix+1)/_n)
        log.record(pos, val_loss=loss.item(), val_loc_loss=loc_loss.item(), 
                  val_regr_loss=regr_loss.item(), val_objectness_loss=loss_objectness.item(),
                  val_rpn_box_reg_loss=loss_rpn_box_reg.item(), end='\r')
    if (epoch+1)%(n_epochs//5)==0: log.report_avgs(epoch+1)

In [None]:
log.plot_epochs(['trn_loss','val_loss'])

In [None]:
from torchvision.ops import nms
def decode_output(output):
    'convert tensors to numpy arrays'
    bbs = output['boxes'].cpu().detach().numpy().astype(np.uint16)
    labels = np.array([target2label[i] for i in output['labels'].cpu().detach().numpy()])
    confs = output['scores'].cpu().detach().numpy()
    ixs = nms(torch.tensor(bbs.astype(np.float32)), torch.tensor(confs), 0.05)
    bbs, confs, labels = [tensor[ixs] for tensor in [bbs, confs, labels]]

    if len(ixs) == 1:
        bbs, confs, labels = [np.array([tensor]) for tensor in [bbs, confs, labels]]
    return bbs.tolist(), confs.tolist(), labels.tolist()

In [None]:
model.eval()
for ix, (images, targets) in enumerate(test_loader):
    if ix==3: break
    images = [im for im in images]
    outputs = model(images)
    for ix, output in enumerate(outputs):
        bbs, confs, labels = decode_output(output)
        info = [f'{l}@{c:.2f}' for l,c in zip(labels, confs)]
        show(images[ix].cpu().permute(1,2,0), bbs=bbs, texts=labels, sz=5)