In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision

from torch.nn import functional as F
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt
import torch
from torch import nn
from torch.nn import LazyLinear
from PIL import Image
import requests
from torch.optim import Adam

import warnings
warnings.filterwarnings('ignore')

image_size = (256,256)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models

class BasicBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, kernel_size = 3):
        super(BasicBlock, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size = kernel_size, stride = stride, padding = (kernel_size - 1)//2)
        self.bn = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.dropout = nn.Dropout(p=0.1)
            
    def forward(self, x):
        out = self.conv(x)
        out = self.bn(out)
        out = self.relu(out)
        out = self.dropout(out) 
        return out
    
class EncoderBlock(nn.Module):
    def __init__(self, in_channels, strided = True):
        super(EncoderBlock, self).__init__()
        out_channels = in_channels*2 if strided else in_channels
        self.layer1 = BasicBlock(in_channels, out_channels, stride=2 if strided else 1)
        self.layer2 = BasicBlock(out_channels, out_channels)
        self.layer3 = BasicBlock(out_channels, out_channels)
        self.layer4 = BasicBlock(out_channels, out_channels)
        self.downsample = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=2 if strided else 1, bias=False),
            nn.BatchNorm2d(out_channels),
        )
        
    def forward(self, x):
        out = self.layer1(x)
        residual1 = self.downsample(x)
        out = self.layer2(out) + residual1
        residual2 = out
        out = self.layer3(out)
        out = self.layer4(out) + residual2
        return out
    
class DecoderBlock(nn.Module):
    def __init__(self, in_channels):
        super(DecoderBlock, self).__init__()
        self.layer1 = BasicBlock(in_channels, in_channels // 4, kernel_size = 1)
        self.layer2 = nn.Sequential(
            nn.ConvTranspose2d(in_channels // 4, in_channels // 4, kernel_size = 3, stride = 2, padding=1, output_padding=1),
            nn.BatchNorm2d(in_channels // 4)
        )
        self.layer3 = BasicBlock(in_channels // 4, in_channels // 2, kernel_size = 1)
        
    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        return out
    
class InitialBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(InitialBlock, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=7, stride=2, padding=3)
        self.bn = nn.BatchNorm2d(out_channels)
        self.maxpool = nn.MaxPool2d(kernel_size = 2, stride = 2)
    
    def forward(self, x):
        out = self.conv(x)
        out = self.bn(out)
        out = self.maxpool(out)
        return out
    
class FinalBlock(nn.Module):
    def __init__(self, in_channels, num_classes):
        super(FinalBlock, self).__init__()
        
        self.transposeconv1 = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size = 3, stride = 2, output_padding=0)
        self.bn1 = nn.BatchNorm2d(in_channels // 2)
        
        self.conv1 = nn.Conv2d(in_channels // 2, in_channels // 2, kernel_size = 2)
        self.bn2 = nn.BatchNorm2d(in_channels // 2)
        
        self.conv2 = nn.Conv2d(in_channels // 2, out_channels=num_classes, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(num_classes)
        
        self.sigmoid = nn.Sigmoid()      
        
    def forward(self, x):
        out = self.transposeconv1(x)
        out = self.bn1(out)
        out = self.conv1(out)
        out = self.bn2(out)
        out = self.conv2(out)
        out = self.bn3(out)
        out = self.sigmoid(out)
        return out
    
class LinkNet(nn.Module):
    def __init__(self, num_classes):
        super(LinkNet, self).__init__()
        self.initblock = InitialBlock(3, 64)
        self.encoder1 = EncoderBlock(64, strided = False)
        self.encoder2 = EncoderBlock(64)
        self.encoder3 = EncoderBlock(128)
#         self.encoder4 = EncoderBlock(256)
#         self.decoder4 = DecoderBlock(512)
        self.decoder3 = DecoderBlock(256)
        self.decoder2 = DecoderBlock(128)
        self.decoder1 = DecoderBlock(64)
        self.finalblock = FinalBlock(32, num_classes)
        
    def forward(self, x):
        out = self.initblock(x)
        residual1 = self.encoder1(out)
        residual2 = self.encoder2(residual1)
        out = self.encoder3(residual2)
#         residual3 = self.encoder3(residual2)
#         out = self.encoder4(residual3)
#         out = self.decoder4(out) + residual3
        out = self.decoder3(out) + residual2
        out = self.decoder2(out) + residual1
        out = self.decoder1(out)
        out = self.finalblock(out)
        return out

In [None]:
from torch.utils.data import DataLoader
from transformers import AdamW
from copy import deepcopy
import matplotlib.pyplot as plt

num_classes = 1

model = LinkNet(num_classes)
model_path = '/kaggle/input/linknet_lip_seg/pytorch/best/1/latest_lips_segmentation_linknet.pt'
state_dict = torch.load(model_path)
new_state_dict = {k.replace('module.', ''): v for k, v in state_dict.items()}
model.load_state_dict(new_state_dict)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Assuming that we are on a machine with multiple GPUs.
if torch.cuda.device_count() > 1:
    print(torch.cuda.device_count(), "GPUs!")
    # dim = 0 [30, xxx] -> [10, ...], [10, ...], [10, ...] on 3 GPUs
    model = nn.DataParallel(model)

model.to(device)

In [None]:
from PIL import Image
from torch.utils.data import Dataset
import os

class SegmentationDataset(Dataset):
    def __init__(self, img_dir, mask_dir, transform_image=None, transform_mask=None):
        self.img_dir = img_dir
        self.mask_dir = mask_dir
        self.img_names = sorted(os.listdir(img_dir))
        self.mask_names = sorted(os.listdir(mask_dir))
        self.transform_image = transform_image
        self.transform_mask = transform_mask

    def __len__(self):
        return len(self.img_names)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_names[idx])
        mask_path = os.path.join(self.mask_dir, self.mask_names[idx])
        image = Image.open(img_path).convert('RGB')
        mask = Image.open(mask_path).convert('L')

        if self.transform_image and self.transform_mask:
            image = self.transform_image(image)
            mask = self.transform_mask(mask)
            mask = mask.squeeze(0)

        return image, mask

In [None]:
from torchvision.transforms import functional as F_tr
from torchvision import transforms
from torchvision.transforms import v2

class ToGrayScaleAndSqueeze(object):
    def __call__(self, img):
        return F_tr.to_grayscale(img, num_output_channels=1)

transform_image = v2.Compose([
    v2.Resize((image_size)),
    v2.ToTensor(),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

def to_one_hot(mask, num_classes):
    mask = mask.long()  # Convert to LongTensor
    one_hot_mask = torch.nn.functional.one_hot(mask, num_classes=num_classes)
    return one_hot_mask.permute(0, 3, 1, 2)

transform_mask = v2.Compose([
    v2.Resize((image_size)),
    ToGrayScaleAndSqueeze(),
    v2.ToTensor(),
    v2.Lambda(lambda mask: (mask > 0).float()),  # Binarize the mask
])

In [None]:
import os

# Set the directories for your image and mask files
train_img_dir = '/kaggle/input/easypotraitlipsonly/EasyPotraitLipsOnly/data/train/images/img'
train_mask_dir = '/kaggle/input/easypotraitlipsonly/EasyPotraitLipsOnly/data/train/masks/img'
val_img_dir = '/kaggle/input/easypotraitlipsonly/EasyPotraitLipsOnly/data/val/images/img'
val_mask_dir = '/kaggle/input/easypotraitlipsonly/EasyPotraitLipsOnly/data/val/masks/img'
test_img_dir = '/kaggle/input/easypotraitlipsonly/EasyPotraitLipsOnly/data/test/images/img'
test_mask_dir = '/kaggle/input/easypotraitlipsonly/EasyPotraitLipsOnly/data/test/masks/img'

# Check if the directories exist and are not empty
for dir in [train_img_dir, train_mask_dir, val_img_dir, val_mask_dir, test_img_dir, test_mask_dir]:
    if not os.path.exists(dir):
        print(f"Directory does not exist: {dir}")
    elif not os.listdir(dir):
        print(f"Directory is empty: {dir}")

# Create the datasets
train_dataset = SegmentationDataset(train_img_dir, train_mask_dir, transform_image=transform_image, transform_mask=transform_mask)
val_dataset = SegmentationDataset(val_img_dir, val_mask_dir, transform_image=transform_image, transform_mask=transform_mask)
test_dataset = SegmentationDataset(test_img_dir, test_mask_dir, transform_image=transform_image, transform_mask=transform_mask)


from torch.utils.data import DataLoader

batch_size = 64

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
import numpy as np
import torch

class EarlyStopping:
    """Early stops the training if validation loss doesn't improve after a given patience."""
    def __init__(self, patience=7, verbose=False, delta=0, path='checkpoint.pt', trace_func=print):
        """
        Args:
            patience (int): How long to wait after last time validation loss improved.
                            Default: 7
            verbose (bool): If True, prints a message for each validation loss improvement.
                            Default: False
            delta (float): Minimum change in the monitored quantity to qualify as an improvement.
                            Default: 0
            path (str): Path for the checkpoint to be saved to.
                            Default: 'checkpoint.pt'
            trace_func (function): trace print function.
                            Default: print
        """
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta
        self.path = path
        self.trace_func = trace_func
    def __call__(self, val_loss, model):

        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            self.trace_func(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''Saves model when validation loss decrease.'''
        if self.verbose:
            self.trace_func(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

In [None]:
def mean_iou(output, target, smooth=1):
    output_flat = output.view(-1)
    target_flat = target.view(-1)
    intersection = (output_flat * target_flat).sum()
    union = output_flat.sum() + target_flat.sum() - intersection
    
    iou = (intersection + smooth) / (union + smooth)
    return iou.mean()

In [None]:
class DiceLoss(nn.Module):
    def __init__(self, smooth=1):
        super(DiceLoss, self).__init__()
        self.smooth = smooth

    def forward(self, output, target):
        output_flat = output.view(-1)
        target_flat = target.view(-1)
        intersection = (output_flat * target_flat).sum()
        
        return 1 - ((2. * intersection + self.smooth) /
                    (output_flat.sum() + target_flat.sum() + self.smooth))

In [None]:
import matplotlib.pyplot as plt
from torch.optim.lr_scheduler import StepLR

# Define the optimizer
optimizer = Adam(model.parameters(), lr=0.005)

# Define the learning rate scheduler
scheduler = StepLR(optimizer, step_size=10, gamma=0.5)

# Define the number of training epochs and patience for early stopping
num_epochs = 80
patience = 10

# Initialize variables for early stopping
best_val_loss = float('inf')
best_model = None
epochs_no_improve = 0

criterion = DiceLoss()

print("Mấy giờ rồi?")
print("Đến giờ train model rồi!")
# Training loop

for epoch in range(num_epochs):
    print(f'Epoch: {epoch+1}')
    # Training phase
    model.train()
    for images, masks in train_loader:
        images = images.to(device)
        masks = masks.to(device)
        masks = masks.long()
        outputs = model(images)
        loss = criterion(outputs, masks)
        miou = mean_iou(outputs, masks)
        print(f'Train loss: {loss}, IoU: {miou}')
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # Update the learning rate
    scheduler.step()

    # Validation phase
    model.eval()
    val_losses = []
    iou_values = []
    with torch.no_grad():
        i = 0
        for i, (images, masks) in enumerate(val_loader):
            if (i==3):
                break
            i += 1
            images = images.to(device)
            masks = masks.to(device)
            masks = masks.long()
            outputs = model(images)

            iou = mean_iou(outputs, masks)
            loss = criterion(outputs, masks)
            
            val_losses.append(loss.item())
            iou_values.append(iou)
            
    val_loss = np.mean(val_losses)
    iou_mean = np.mean([iou.cpu().numpy() for iou in iou_values])

    # Check for early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_model = deepcopy(model)
        torch.save(best_model.state_dict(),'best_lips_segmentation_linknet.pt')
        epochs_no_improve = 0
    else:
        epochs_no_improve += 1
        if epochs_no_improve == patience:
            print('Early stopping!')
            model = best_model
            torch.save(model.state_dict(),'early_lips_segmentation_linknet.pt')
            break
    torch.save(model.state_dict(),'latest_lips_segmentation_linknet.pt')
    
    
    # Print losses
    print(f'Epoch {epoch+1}/{num_epochs}: Mean iou: {iou_mean}')

In [None]:
#     # Test phase
#     import torch
#     import matplotlib.pyplot as plt

#     device = torch.device("cuda" if torch.cuda.is_available() else "cpu")



#     # Load the trained model
#     eval_model = LinkNet(num_classes)
#     model_path = '/kaggle/input/linknet_lip_seg/pytorch/best/1/best_lips_segmentation_linknet.pt'
#     state_dict = torch.load(model_path)
#     new_state_dict = {k.replace('module.', ''): v for k, v in state_dict.items()}
#     eval_model.load_state_dict(new_state_dict)

#     # # Assuming that we are on a machine with multiple GPUs.
#     # if torch.cuda.device_count() > 1:
#     #     print(torch.cuda.device_count(), "GPUs!")
#     #     # dim = 0 [30, xxx] -> [10, ...], [10, ...], [10, ...] on 3 GPUs
#     #     eval_model = nn.DataParallel(model)

#     eval_model.to(device)

#     eval_model.eval()

In [None]:
# # No gradient calculation
# iou_test = []
# with torch.no_grad():
#     i = 0
#     for images, masks in test_loader:
#         # Move the images and masks to the device
#         images = images.to(device)
#         masks = masks.to(device)

#         # Predict the masks
#         outputs = eval_model(images)
#         iou = mean_iou(outputs, masks)
#         i += 1
#         if i < 4:
#             # Plot original image, true mask, and predicted mask
#             fig, axs = plt.subplots(1, 3, figsize=(20, 20))
#             axs[0].imshow(images[0].permute(1, 2, 0).cpu())  # Assuming image is in (C, H, W) format
#             axs[0].title.set_text('Original Image')
#             axs[1].imshow(masks[0].cpu(), cmap='gray')
#             axs[1].title.set_text('True Mask')
#             axs[2].imshow(outputs[0].squeeze().cpu(), cmap='gray')
#             axs[2].title.set_text('Predicted Mask')
#             plt.show()
#         iou_test.append(iou)
# iou_mean = np.mean([iou.cpu().numpy() for iou in iou_test])
# print(f'Test iou: {iou_mean}')

In [None]:
# import requests
# from PIL import Image
# import matplotlib.pyplot as plt
# import torch
# from torchvision import transforms

# # Load the image
# url = "https://scontent.fhan7-1.fna.fbcdn.net/v/t1.6435-9/129730825_2843581455923669_4319122742225070187_n.jpg?stp=dst-jpg_p640x640&_nc_cat=111&ccb=1-7&_nc_sid=5f2048&_nc_ohc=MOcdN7aZCC4Q7kNvgEl0ux5&_nc_ht=scontent.fhan7-1.fna&oh=00_AYCqBbKCLJgME-1umNSS58LubHThzcld2EMZrLkFQdvZcQ&oe=6681136C"
# response = requests.get(url, stream=True)
# img = Image.open(response.raw)

# # Convert the image to a tensor and add a batch dimension
# img_tensor = transforms.ToTensor()(img).unsqueeze(0).to(device)

# # Predict the mask
# output = eval_model(img_tensor)
# predicted_mask = output[0].squeeze().cpu().detach().numpy()

# # Plot the original image
# plt.imshow(img)

# # Plot the predicted mask as an outer layer
# plt.imshow(predicted_mask, alpha=0.5, cmap='jet')

# plt.show()