In [1]:
import importlib
import labs

importlib.reload(labs)
from labs import *

# INIT. Processor 

In [2]:
from transformers import LayoutLMv3Processor

In [3]:
processor = LayoutLMv3Processor.from_pretrained("microsoft/layoutlmv3-base", apply_ocr=False)

# DEF. Augmentation
### 주의
- augmentaion은 LayoutLM processor 전 단계에서 수행되고 processor는 이미지의 pixel이 양수인 이미지를 기대함
- aug 단계에서 이미지 normalize를 해버리면 Processor에서 받는 픽셀 intensity가 음수가 될 수 있어 에러가 발생

In [4]:
def make_augmentation(aug_prob=0.8, target_size=224):
    augs = [
        # 아핀 변환: 이동, 스케일, 회전
        A.Affine(
            translate_percent=0.2,  # 이동 범위 (이미지 크기 대비 %)
            scale=(0.8, 1.2),       # 스케일 범위 (배율)
            rotate=(-85, 85),       # 회전 범위 (각도)
            shear=(-10, 10),        # 전단 변형 추가 
            p=0.7
        ),
        
        # 가우시안 노이즈
        A.GaussNoise(std_range=(0.1, 0.2), p=0.3),
        # 가우시안 블러
        A.GaussianBlur(blur_limit=(3, 7), p=0.2),                                     
        
        # dropout
        A.CoarseDropout(num_holes_range=(3, 6), hole_height_range=(10, 20), hole_width_range=(10, 20), 
                        fill="random_uniform", p=0.2),
        # crop
        A.RandomCrop(height=target_size, width=target_size),
        
        # 흑백 전환
        A.ToGray(p=0.15),                                                             
        # 색상 전환
        A.HueSaturationValue(hue_shift_limit=10, sat_shift_limit=20, val_shift_limit=10, p=0.3), 
        # 밝기/대비
        A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.3),  
        
        # 좌우 플립
        A.HorizontalFlip(p=0.5),                                                      
        # 상하 플립
        A.VerticalFlip(p=0.2),                                                        
    ]
    
    return A.Compose([
        A.OneOf([ 
            A.RandomOrder(augs, p=aug_prob),
            A.NoOp(p=1-aug_prob)
        ], p=1.0),
        # A.Normalize(normalization="image", p=0.9),
        ToTensorV2()
    ], 
    bbox_params=A.BboxParams(
        format='pascal_voc',     # pascal_voc가 x0, y0, x1, y1 포맷 
        label_fields=['words'],  # bbox와 대응되어 양항을 받는 속성들
        min_area=0,                    
        min_visibility=0.0,            
        check_each_transform=True,     
        clip=True                      
    ))

# DEF. Dataset and DataModule 
### 주의
- 또한, bbox의 좌표가 layoutlm v3 (0,1000) 스케일이 아닌 픽셀 스케일을 원함
  - 이에 따라, norm_box 변환 시점을 augment 뒤로 미뤄야 함

```
return_tensors (str, optional, defaults to "pt") — The type of Tensor to return. Allowable values are “np”, “pt” and “tf”.
```

In [5]:
def prepare_example(image_path, processor, transform=None):
    # load image
    if not transform:
        image = Image.open(image_path).convert("RGB")
        image = ImageOps.exif_transpose(image)  # correct orientation
    else:
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # load metas
    json_path = Path(image_path).with_suffix(".json")
    meta = load_json(json_path)

    # words and boxes
    words, boxes = get_words_and_boxes(image, meta, use_norm=transform is None)

    if transform is not None:
        augmented = transform(image=image, bboxes=boxes, words=words)
        image = augmented['image']
        words = augmented['words']
        boxes = augmented['bboxes']
        h, w = image.shape[:2]
        boxes = [to_norm_box_with_size(b, h, w) for b in boxes] 

    encoding = processor(
        images=image,
        text=words,
        boxes=boxes,
        max_length=512,
        padding="max_length",
        truncation=True,
        return_tensors="pt"
    )
    
    return encoding

In [6]:
class D4Dataset(Dataset):
    def __init__(self, image_paths, targets, processor, transform=None):
        self.targets = targets
        self.processor = processor
        self.transform = transform
        self.image_paths = image_paths

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        encoding = prepare_example(image_path, self.processor, self.transform)
        target = int(self.targets[os.path.basename(image_path)])

        return {
            "input_ids": encoding["input_ids"].squeeze(0),
            "attention_mask": encoding["attention_mask"].squeeze(0),
            "bbox": encoding["bbox"].squeeze(0),
            "pixel_values": encoding["pixel_values"].squeeze(0),
            "labels": torch.tensor(target, dtype=torch.long)
        }

In [7]:
class D4DataModule(LightningDataModule):
    def __init__(
        self,
        train_paths,
        valid_paths,
        trial_paths,
        target_dict,
        processor,
        batch_size=32,
        num_workers=4,
    ):
        super().__init__()
        self.train_paths = train_paths
        self.valid_paths = valid_paths
        self.trial_paths = trial_paths
        self.targets = target_dict
        self.processor = processor
        self.batch_size = batch_size
        self.num_workers = num_workers
        transform = make_augmentation(aug_prob=0.8, target_size=224)

    def setup(self, stage=None):
        if stage == "fit":
            self.train_ds = D4Dataset(self.train_paths, 
                                      self.targets, 
                                      self.processor,
                                      make_augmentation(aug_prob=0.8, target_size=224))
            self.valid_ds = D4Dataset(self.valid_paths, 
                                      self.targets, 
                                      self.processor,
                                      make_augmentation(aug_prob=0.8, target_size=224))
        if stage == "test" or stage is None:
            self.trial_ds = D4Dataset(self.trial_paths, self.targets, self.processor)

    def train_dataloader(self):
        return DataLoader(
            self.train_ds,
            batch_size=self.batch_size,
            shuffle=True,
            num_workers=self.num_workers,
            collate_fn=default_data_collator
        )

    def val_dataloader(self):
        return DataLoader(
            self.valid_ds,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=self.num_workers,
            collate_fn=default_data_collator 
        )

    def test_dataloader(self):
        return DataLoader(
            self.trial_ds,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=self.num_workers,
            collate_fn=default_data_collator 
        )

# INIT. DM

In [8]:
image_paths = grep_files("/data/ephemeral/home/dataset/dtc/train", exts=['jpg'])
target_dict = load_csv_targets("/data/ephemeral/home/dataset/dtc/train.csv")
label_path = "/data/ephemeral/home/dataset/dtc/doc_classes.json"
label2id, id2label = make_doc_class_mapper(label_path)

0it [00:00, ?it/s]

In [9]:
train_images, valid_images, trial_images = split_ds(image_paths,  train_ratio=0.6,  valid_ratio=0.4, test_ratio=0)

data_module = D4DataModule(
    train_paths=train_images,
    valid_paths=valid_images,
    trial_paths=trial_images,
    target_dict=target_dict,
    processor=processor,
    batch_size=16,
    num_workers=8
)

In [10]:
# s0 = data_module.train_ds[0]
# print(s0.keys())
# print(s0['input_ids'].shape, s0['attention_mask'].shape, s0['bbox'].shape, s0['pixel_values'].shape, s0['labels'].shape)

# DEF) Model
- ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight'] 크기 조절

In [11]:
from transformers import LayoutLMv3ForSequenceClassification as LyLmv3, LayoutLMv3Processor

In [12]:
class Lym(pl.LightningModule):
    def __init__(self, label2id, id2label):
        super().__init__()
        num_classes = len(label2id)
        self.model = LyLmv3.from_pretrained("microsoft/layoutlmv3-base", num_labels=num_classes)
        self.model.config.label2id = label2id
        self.model.config.id2label = id2label

        metrics = {
            "accuracy": Accuracy(task="multiclass", num_classes=num_classes),
            # "top-3 accuracy" : MulticlassAccuracy(num_classes=10, top_k=3),
            "roc_auc": AUROC(task="multiclass", num_classes=num_classes),
            "precision": Precision(task="multiclass", num_classes=num_classes, average="macro"),
            "recall": Recall(task="multiclass", num_classes=num_classes, average="macro"),
            "F1": F1Score(task="multiclass", num_classes=num_classes, average="macro"),
        }

        self.train_metrics = MetricCollection(metrics, prefix="train_")
        self.valid_metrics = MetricCollection(metrics, prefix="valid_")

    def forward(self, input_ids, attention_mask, bbox, pixel_values, labels=None):
        return self.model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            bbox=bbox,
            pixel_values=pixel_values,
            labels=labels
        )

    def feed(self, batch):
        return self(
            batch["input_ids"],
            batch["attention_mask"],
            batch["bbox"],
            batch["pixel_values"],
            batch["labels"]
        )

    def training_step(self, batch, batch_idx):
        labels = batch["labels"]
        outputs = self.feed(batch)
    
        self.train_metrics.update(outputs.logits, labels)
        
        self.log("train_loss", outputs.loss)
        for name, metric in self.train_metrics.items():
            self.log(name, metric.compute(), prog_bar=True)
        
        return outputs.loss

    def validation_step(self, batch, batch_idx):
        labels = batch["labels"]
        outputs = self.feed(batch)

        self.valid_metrics.update(outputs.logits, labels)
        
        self.log("valid_loss", outputs.loss)
        for name, metric in self.valid_metrics.items():
            self.log(name, metric.compute(), prog_bar=True)
        return outputs.loss

    def configure_optimizers(self):
        return torch.optim.AdamW(self.model.parameters(), lr=1e-5)
        
    def on_train_epoch_start(self):
        self.train_metrics.reset()

    def on_train_epoch_end(self):
        metrics = self.train_metrics.compute()
        for name, value in metrics.items():
            self.log(name, value)

    def on_validation_epoch_start(self):
        self.valid_metrics.reset()
    
    def on_validation_epoch_end(self):
        try:
            metrics = self.valid_metrics.compute()
            for k, v in metrics.items():
                self.log(k, v)
        except Exception as e:
            print(f"Metric compute error: {e}")

# Init Dashboard

In [13]:
exp_name = 'exp-llv3-aug-test'
wandb.init(project='docsy', name=exp_name)
wandb_logger = WandbLogger()

[34m[1mwandb[0m: Currently logged in as: [33mcatchy[0m ([33mcat2oon[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


# RUN. Train

In [None]:
early_stopping = EarlyStopping(monitor='valid_loss', patience=5, mode='min')
model_checkpoint = ModelCheckpoint(monitor="valid_loss", mode="min", save_top_k=3)

trainer = pl.Trainer(
    accelerator="gpu",
    precision="16-mixed",
    max_epochs=100,
    logger=wandb_logger,
    callbacks=[model_checkpoint, early_stopping]
)

model = Lym(label2id, id2label)
trainer.fit(model, datamodule=data_module)

Using 16bit Automatic Mixed Precision (AMP)
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
Some weights of LayoutLMv3ForSequenceClassification were not initialized from the model checkpoint at microsoft/layoutlmv3-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
You are using a CUDA device ('NVIDIA GeForce RTX 3090') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
/data/ephemeral/home/.pyenv/versions/py12/lib/python3.12/site-packages/pytorch_lightning/loggers/wa

Sanity Checking: |          | 0/? [00:00<?, ?it/s]



Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

In [None]:
wandb.finish()

# 모델 저장

In [None]:
def save_model_tensor(model, name):
    if not model:
        model = model.load_from_checkpoint("checkpoint.ckpt")
    state_dict = model.state_dict()
    save_file(state_dict, f"{name}.safetensors")

In [None]:
# save_model_tensor(model, exp_name)