In [1]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0, 1"

import torch
import torch.nn as nn
import torch.nn.init as init
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms

from torch.utils.data import Dataset, DataLoader

import seaborn as sns
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix

import numpy as np

from PIL import Image
from tqdm import tqdm

import matplotlib.pyplot as plt
import cv2

from accelerate import Accelerator

from src.models import FPathPredictor
from src.dataloaders import FPathDataset
from src.losses import FocalLoss, SketchMaskLoss

from src.utils import inference, inference_logits

In [4]:
BATCHSIZE = 16

train_yaml  = "/home/joono/fpath_infodraw/dataset/train.yaml"
val_yaml    = "/home/joono/fpath_infodraw/dataset/val.yaml"
test_yaml   = "/home/joono/fpath_infodraw/dataset/test.yaml"

train_dset  = FPathDataset(train_yaml)
valid_dset  = FPathDataset(val_yaml)
test_dset   = FPathDataset(test_yaml)

train_loader = DataLoader(
    train_dset,
    batch_size=BATCHSIZE,
    shuffle=True,
    pin_memory=True,
    num_workers=16,
    drop_last=True,
)
val_loader = DataLoader(
    valid_dset,
    batch_size=BATCHSIZE,
    shuffle=False,
    num_workers=16,
    pin_memory=True,
)
test_loader = DataLoader(
    test_dset,
    batch_size=BATCHSIZE,
    shuffle=False,
    num_workers=8,
    pin_memory=True,
)

vtf, target = train_dset[0]
print(f"{vtf.shape=}, {target.shape=}")
print(f"{type(vtf)=}, {type(target)=}")
print(f"{target[:3, :3]}")

vtf.shape=(1024, 1024, 21), target.shape=torch.Size([1024, 1024, 1])
type(vtf)=<class 'numpy.ndarray'>, type(target)=<class 'torch.Tensor'>
tensor([[[1.],
         [1.],
         [1.]],

        [[1.],
         [1.],
         [1.]],

        [[0.],
         [0.],
         [0.]]])


In [6]:
# Initialize the accelerator

def train(model, optimizer, objective, train_loader, accelerator):
    model, optimizer, train_loader = accelerator.prepare(model, optimizer, train_loader)
    model = model.train()
    
    cum_loss = 0
    for vtf, target in tqdm(train_loader):
        vtf = vtf.to(accelerator.device) # [B x W x H x 21]
        target = target.to(accelerator.device)
        
        pred_target = model(vtf) # [B x W x H x 1]
        
        loss = objective(pred_target, target)
        cum_loss += loss.detach().item() / len(train_loader)
        
        optimizer.zero_grad()
        accelerator.backward(loss)
        optimizer.step()
    print(f"train loss: {cum_loss}")

def val(model, objective, val_loader, epoch, accelerator, save_path='checkpoints/best_model.pth'):
    model, val_loader = accelerator.prepare(model, val_loader)
    model = model.eval()
    cum_loss = 0
    best_val_loss = 1e4
    with torch.no_grad():
        for vtf, target in tqdm(val_loader):
            vtf = vtf.to(accelerator.device) # [B x W x H x 21]
            target = target.to(accelerator.device)
        
            pred_target = model(vtf) # [B x W x H x 1]
        
            loss = objective(pred_target, target)
            cum_loss += loss.detach().item() / len(val_loader)
            
    print(f"val loss: {cum_loss}")

    # Check if the current validation loss is the best we've seen
    if cum_loss < best_val_loss:
        best_val_loss = cum_loss
        torch.save(model.state_dict(), f"checkpoints/best_model_epoch{epoch}.pt")
        print(f"Model saved with validation loss: {cum_loss:.4f}")

def test(model, test_loader, accelerator):
    model, test_loader = accelerator.prepare(model, test_loader)
    model = model.eval()
    all_preds = []
    all_targets = []

    with torch.no_grad():
        for vtf, target in tqdm(test_loader):
            vtf = vtf.to(accelerator.device)
            target = target.to(accelerator.device)
            
            pred_target = inference(model, vtf)

            all_preds.append(pred_target.cpu().numpy())
            all_targets.append(target.cpu().numpy())

    all_preds = np.concatenate(all_preds).flatten()
    all_targets = np.concatenate(all_targets).flatten()
    all_preds = 1 - all_preds # 스케치(검정==0)와 배경(흰색==1) 을 역전
    all_targets = 1 - all_targets # 스케치(검정==0)와 배경(흰색==1) 을 역전

    precision = precision_score(all_targets, all_preds)
    recall = recall_score(all_targets, all_preds)
    f1 = f1_score(all_targets, all_preds)

    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")

In [7]:
model = FPathPredictor().to('cuda')
optimizer = optim.Adam(model.parameters(), lr=5e-4)
accelerator = Accelerator()

# objective = nn.CrossEntropyLoss().to('cuda')
# objective = FocalLoss(alpha=10000, gamma=100).to('cuda') # gamma = focusing factor, easy negative case에 대하여 더 큰 패널티를 줌.
# objective = nn.MSELoss().to('cuda')
# objective = nn.BCEWithLogitsLoss().to('cuda')
objective = SketchMaskLoss().to('cuda')


EPOCHS = 10

for epoch in range(EPOCHS):
    train(model, optimizer, objective, train_loader, accelerator)
    val(model, objective, val_loader, epoch, accelerator)
    
    if epoch % 1 == 0:
        test(model, test_loader, accelerator)
                
test(model, test_loader, accelerator)

  0%|          | 0/45 [00:12<?, ?it/s]


KeyboardInterrupt: 

In [None]:
vtf, _ = test_dset[0]

pixel_vtf = vtf[100, 100]
pixel_vtf = torch.tensor(pixel_vtf).to('cuda')

pred_target = inference_logits(model, pixel_vtf.unsqueeze(0))

In [None]:
vtf, _ = test_dset[0]

vtf = torch.tensor(vtf).to('cuda')

pred_target = inference_logits(model, vtf.unsqueeze(0))

result = pred_target.squeeze().detach().cpu().numpy()
print(f"{np.min(result)=}, {np.max(result)=}")
# _min, _max = np.min(result), np.max(result)
# result -= _min
# result /= (_max - _min)
# result = (result > 0.9) 

plt.ylim(-1, 100)
plt.xlim(0, 1)
plt.hist(result, bins=10)

In [None]:
pred_sktch = result.transpose((1, 0)) > 0.5

cv2.imwrite("sktch.png", pred_sktch * 255)

In [None]:
plt.imshow(result.transpose((1, 0)) > 0.5, cmap='gray')
plt.axis('off')
plt.show()

In [None]:
RGBtoGray_conversion_weights = np.array([0.2989, 0.5870, 0.1140], dtype=np.float32)
BGRtoGray_conversion_weights = np.array([0.1140, 0.5870, 0.2989], dtype=np.float32)

input_fpath_test = "/home/joono/fpath_infodraw/dataset/test/fpath_npzs/color_901_fpath_of_infodraw.npz"
# info_test = "/home/joono/fpath_infodraw/dataset/test/imgs/color_902/color_902_infodraw.png"
target_test = "/home/joono/fpath_infodraw/dataset/test/targets/line_903.png"
threshold = 0.8

output_png_file_name = f"circle_903_{threshold}.png"


# load target image
target_img = Image.open(target_test).convert('L')
target_img = np.array(target_img)

W, H = target_img.shape
raio_of_black = (np.sum(target_img < 0.01) / (W * H))

print(f"{np.sum(target_img < 0.01)}, {W=}, {H=} {raio_of_black=}")

# load fpath
input_fpath = np.load(input_fpath_test)
fpath_tensor = torch.tensor(input_fpath["data"])

W, H, L, C = fpath_tensor.shape
if C == 3:
    input_fpath_gray = np.dot(fpath_tensor, BGRtoGray_conversion_weights)
    fpath_tensor = torch.tensor(input_fpath_gray)

fpath_tensor = fpath_tensor.squeeze()
print(fpath_tensor.shape)
fpath_tensor = fpath_tensor.unsqueeze(0)
print(fpath_tensor.shape)
fpath_tensor = fpath_tensor.to('cuda')

# input = torch.tensor(input).to('cuda')
# info_img = torch.tensor(info_img).to('cuda')

# inference result
model = model.eval()
pred_target = model(fpath_tensor)
pred_target = nn.functional.softmax(pred_target, dim=3)

print(f"{pred_target.shape=}")

print(f"{torch.min(pred_target[0, :, :, 0]).item()}, {torch.max(pred_target[0, :, :, 0]).item()}, {torch.min(pred_target[0, :, :, 1]).item()}, {torch.max(pred_target[0, :, :, 1]).item()}")
    
pred_target = pred_target.squeeze()
pred_target = pred_target.detach().cpu().numpy()

print(f"{pred_target.shape=}")

result = pred_target.transpose(2, 1, 0)

canvas = np.zeros((W, H))
canvas[np.where(result[:, :, 1] > 0.5)] = 0
canvas[np.where(result[:, :, 1] <= 0.5)] = 255
print(f"{np.min(canvas)=}, {np.max(canvas)=}")

plt.imshow(canvas, cmap='gray')
cv2.imwrite("res.png", canvas)

# load infodraw img
# infodraw_test_img = Image.open(info_test).convert('L')
# infodraw_test_img = target_transforms(infodraw_test_img) 
# infodraw_test_img = infodraw_test_img.squeeze().numpy()

# # load target img
# target_img = Image.open(target_test).convert('L')
# target_img = target_transforms(target_img) 
# target_img = target_img.squeeze().numpy()

# # calculate pred target
# info_plus_error = infodraw_test_img + output
# output = normalize(output)
# mask_for_noise_filtering = infodraw_test_img < threshold

# result = output * mask_for_noise_filtering
# canvas = np.ones_like(output, np.float32)
# canvas[np.where(mask_for_noise_filtering)] = result[np.where(mask_for_noise_filtering)]

# error = target_img - info_plus_error

# canvas = np.concatenate([output, infodraw_test_img, info_plus_error, error], axis=1)
# plt.imshow(canvas, cmap="gray")
# cv2.imwrite(output_png_file_name, canvas * 255)
# cv2.imwrite("res.png", info_plus_error * 255)

In [None]:
np.where(result[:, :, 0] > 0.5)

In [None]:
dset = FPathDataset("/home/joono/fpath_infodraw/dataset/fpath_target_train.yaml", transforms.ToTensor())

In [None]:
infodraw_test_img = Image.open(info_test).convert('L')
infodraw_test_img = target_transforms(infodraw_test_img) 

input_fpath = np.load(input_fpath_test)['data'].squeeze()

infodraw_test_img = infodraw_test_img.numpy().transpose(1, 2, 0)

input_fpath.shape, infodraw_test_img.shape

input = np.concatenate([input_fpath, infodraw_test_img], axis=2)

input.shape

In [None]:
np.zeros((2, 3, 4, 5)).shape

In [None]:
a = torch.rand((2, 2)) *  10
target = torch.tensor([[1., 0.], [1., 1.]])
mask = torch.tensor([[1, 0], [1, 1]])

loss = nn.BCEWithLogitsLoss(reduce='none')

In [None]:
print(a)

a = torch.sigmoid(a)

print(a)

l = -(target * torch.log(a) + (1-target) * torch.log(1-a))
print(l)
l = (1/torch.sum(mask)) * torch.sum(mask * l) + (1/torch.sum(1-mask)) * torch.sum((1-mask) * l)
print(l)