In [None]:
import os
import cv2
import torch
import torchvision.transforms as transforms
import torch.nn.functional as F
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import shutil

#### Testing stage

In [None]:
# test the training data split by ourselves
test_img_dir = "Training_dataset/test/imgs"
test_label_dir = "Training_dataset/test/gts"

test_imgs = sorted([os.path.join(test_img_dir, f) for f in os.listdir(test_img_dir) if f.endswith(".jpg")])
test_labels = sorted([os.path.join(test_label_dir, f) for f in os.listdir(test_label_dir) if f.endswith(".png")])

test_imgs_length = len(test_imgs)
test_labels_length = len(test_labels)
print("test_imgs 長度:", test_imgs_length)
print("test_labels 長度:", test_labels_length)

In [None]:
class TestlDataset(Dataset):  # Used for the testing set dataset
    def __init__(self, images, img_size):
        self.images = images
        self.img_size = img_size
        self.img_transform = transforms.Compose([
            transforms.Resize((self.img_size, self.img_size)),
            transforms.ToTensor(),
            transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])
        ])

    def __getitem__(self, index):
        image = Image.open(self.images[index]).convert('RGB') # 打開圖像文件，轉換為RGB
        image = self.img_transform(image)
        name = os.path.basename(self.images[index]).replace('.jpg', '.png')
        return image, name

    def __len__(self):
        return len(self.images)

In [None]:
ts_dataset = TestlDataset(images=test_imgs, img_size=256)
ts_loader = DataLoader(dataset=ts_dataset, batch_size=1, shuffle=False, num_workers=2, pin_memory=False)

In [None]:
from models.EncoderDecoderPatchGAN import VGG16Generator
from models.UnetPatchGAN import UnetGenerator

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# 使用 best_model 資料夾內的模型權重
model_Unet_465 = UnetGenerator().to(device)
model_Unet_645 = UnetGenerator().to(device)
model_Unet_680 = UnetGenerator().to(device)
model_Unet_775 = UnetGenerator().to(device)
model_EncoderDecoder_230 = VGG16Generator().to(device)

model_Unet_465.load_state_dict(torch.load("best_models_weight/UNet_epoch_465.pth"))
model_Unet_645.load_state_dict(torch.load("best_models_weight/UNet_epoch_645.pth"))
model_Unet_680.load_state_dict(torch.load("best_models_weight/UNet_epoch_680.pth"))
model_Unet_775.load_state_dict(torch.load("best_models_weight/UNet_epoch_775.pth"))
model_EncoderDecoder_230.load_state_dict(torch.load("best_models_weight/EncoderDecoder_epoch_230.pth"))

model_Unet_465.eval()
model_Unet_645.eval()
model_Unet_680.eval()
model_Unet_775.eval()
model_EncoderDecoder_230.eval()

In [None]:
pred_save_path = "test_result"
os.makedirs(pred_save_path, exist_ok=True)

# ensemble weights
# 最終 private leaderboard 成績之各模型 ensemble weight 依序為：0.12 ,0.12, 0.33, 0.1, 0.33
weight_Unet_465 = 0.12
weight_Unet_645 = 0.12
weight_Unet_680 = 0.33
weight_Unet_775 = 0.1
weight_EncoderDecoder_230 = 0.33 

with torch.no_grad():
    for images, names in ts_loader:
        images = images.cuda()
        
        result_Unet_465 = model_Unet_465(images) * weight_Unet_465
        result_Unet_645 = model_Unet_645(images) * weight_Unet_645
        result_Unet_680 = model_Unet_680(images) * weight_Unet_680
        result_Unet_775 = model_Unet_775(images) * weight_Unet_775
        result_EncoderDecoder_230 = model_EncoderDecoder_230(images) * weight_EncoderDecoder_230

        results =  result_Unet_465 + result_Unet_645 + result_Unet_680 + result_Unet_775 + result_EncoderDecoder_230

        results = F.interpolate(results, size=(240, 428), mode='bicubic', align_corners=True)  
        results = results.data.cpu().numpy()
        for idx, name in enumerate(names):
            res = results[idx].squeeze()
            success = cv2.imwrite(os.path.join(pred_save_path, name), res * 255)
            if success:
                print(f"File saved successfully: {name}")
            else:
                print(f"Failed to save file: {name}")

##### Calculate mean F-measure score

In [None]:
!pip install git+https://github.com/lartpang/PySODMetrics.git

In [None]:
import py_sod_metrics

# define mean F-measure function
def calculate_mean_F_measure(pred_save_path, test_labels, opening=False):
    FMv2 = py_sod_metrics.FmeasureV2(
        metric_handlers={
            "fm": py_sod_metrics.FmeasureHandler(with_dynamic=True, with_adaptive=False, beta=0.3),
        }
    )
    if opening:
        kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))

        file_names = os.listdir(pred_save_path)
        for file_name in file_names:
            file_path = os.path.join(pred_save_path, file_name)
            if file_path.lower().endswith(('.png')):
                image = cv2.imread(file_path, cv2.IMREAD_GRAYSCALE)
                image = cv2.morphologyEx(image, cv2.MORPH_OPEN, kernel)
                cv2.imwrite(file_path, image)

    for i, mask_name in enumerate(test_labels):                    
        mask_path = mask_name
        pred_path = os.path.join(pred_save_path, os.path.basename(mask_name))

        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        pred = cv2.imread(pred_path, cv2.IMREAD_GRAYSCALE)


        FMv2.step(pred=pred, gt=mask)
    fmv2 = FMv2.get_results()
    results = {
        "meanfm": fmv2["fm"]["dynamic"].mean()
    }

    return results

In [None]:
# 計算 Mean F-measure
# before binarizing
print("before binarizing")
print(calculate_mean_F_measure(pred_save_path=pred_save_path, test_labels=test_labels))
# ---------------------------------------------------------------------------------------------------------------
# do binarize
file_names = os.listdir(pred_save_path)
for file_name in file_names:
    file_path = os.path.join(pred_save_path, file_name)
    if file_path.lower().endswith(('.png')):
        image = cv2.imread(file_path, cv2.IMREAD_GRAYSCALE)
        _, binary_image = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
        cv2.imwrite(file_path, binary_image)
# ---------------------------------------------------------------------------------------------------------------
# after binarizing
print("after binarizing")
print(calculate_mean_F_measure(pred_save_path=pred_save_path, test_labels=test_labels))
# ---------------------------------------------------------------------------------------------------------------
# opening and after binarizing
print("after binarizing and opening")
print(calculate_mean_F_measure(pred_save_path=pred_save_path, test_labels=test_labels, opening=True))

#### Submission

In [None]:
public_img_dir = "Public_testing_dataset/img"
private_img_dir = "Private_testing_dataset/img"
public_imgs = sorted([os.path.join(public_img_dir, f) for f in os.listdir(public_img_dir) if f.endswith(".jpg")])
private_imgs = sorted([os.path.join(private_img_dir, f) for f in os.listdir(private_img_dir) if f.endswith(".jpg")])

public_ts_dataset = TestlDataset(images=public_imgs, img_size=256)
public_ts_loader = DataLoader(dataset=public_ts_dataset, batch_size=1, shuffle=False, num_workers=2, pin_memory=True)
private_ts_dataset = TestlDataset(images=private_imgs, img_size=256)
private_ts_loader = DataLoader(dataset=private_ts_dataset, batch_size=1, shuffle=False, num_workers=2, pin_memory=True)

In [None]:
# evaluate public and private datasets
submission_save_path = "submission/"
os.makedirs(submission_save_path, exist_ok=True)

with torch.no_grad():
    # public dataset
    for images, names in public_ts_loader:
        images = images.cuda()

        result_Unet_465 = model_Unet_465(images) * weight_Unet_465
        result_Unet_645 = model_Unet_645(images) * weight_Unet_645
        result_Unet_680 = model_Unet_680(images) * weight_Unet_680
        result_Unet_775 = model_Unet_775(images) * weight_Unet_775
        result_EncoderDecoder_230 = model_EncoderDecoder_230(images) * weight_EncoderDecoder_230

        results =  result_Unet_465 + result_Unet_645 + result_Unet_680 + result_Unet_775 + result_EncoderDecoder_230

        results = F.interpolate(results, size=(240, 428), mode='bicubic', align_corners=True)

        results = results.data.cpu().numpy()
        for idx, name in enumerate(names):
            res = results[idx].squeeze()
            success = cv2.imwrite(os.path.join(submission_save_path, name), res * 255)
            if success:
                print(f"File saved successfully: {name}")
            else:
                print(f"Failed to save file: {name}")
    
    # private dataset
    for images, names in private_ts_loader:
        images = images.cuda()

        result_Unet_465 = model_Unet_465(images) * weight_Unet_465
        result_Unet_645 = model_Unet_645(images) * weight_Unet_645
        result_Unet_680 = model_Unet_680(images) * weight_Unet_680
        result_Unet_775 = model_Unet_775(images) * weight_Unet_775
        result_EncoderDecoder_230 = model_EncoderDecoder_230(images) * weight_EncoderDecoder_230

        results =  result_Unet_465 + result_Unet_645 + result_Unet_680 + result_Unet_775 + result_EncoderDecoder_230

        results = F.interpolate(results, size=(240, 428), mode='bicubic', align_corners=True)

        results = results.data.cpu().numpy()
        for idx, name in enumerate(names):
            res = results[idx].squeeze()
            success = cv2.imwrite(os.path.join(submission_save_path, name), res * 255)
            if success:
                print(f"File saved successfully: {name}")
            else:
                print(f"Failed to save file: {name}")


# binarizing and opening
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
file_names = os.listdir(submission_save_path)
for file_name in file_names:
    file_path = os.path.join(submission_save_path, file_name)
    if file_path.lower().endswith(('.png')):
        image = cv2.imread(file_path, cv2.IMREAD_GRAYSCALE)
        _, binary_image = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

        binary_image = cv2.morphologyEx(binary_image, cv2.MORPH_OPEN, kernel)
        
        cv2.imwrite(file_path, binary_image)

In [None]:
# 壓縮檔案成 zip 檔
shutil.make_archive('submission_file', 'zip', submission_save_path)