In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
from PIL import Image
import torch
from torchvision import transforms
from torch.utils.data import Dataset
import numpy as np

class VineyardDataset(Dataset):
    def __init__(self, images_dir, masks_dir, transform=None):
        self.images_dir = images_dir
        self.masks_dir = masks_dir
        self.transform = transform
        # Prepare list of files, checking if both image and mask exist
        self.images = []
        for image in os.listdir(images_dir):
            if image.endswith('.png'):
                img_path = os.path.join(images_dir, image)
                mask_name = image.replace('.png', '_instanceIds.png')
                mask_path = os.path.join(masks_dir, mask_name)
                if os.path.exists(mask_path):  # Check if mask file exists
                    self.images.append(img_path)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = self.images[idx]
        mask_name = os.path.basename(img_path).replace('.png', '_instanceIds.png')
        mask_path = os.path.join(self.masks_dir, mask_name)

        #try:
        #    image = Image.open(img_path).convert("RGB")
        #    mask = Image.open(mask_path).convert("L")
        #except FileNotFoundError:
        #    return None  # Return None if there's an issue opening the file

        image = np.array(Image.open(img_path).convert("RGB")) # we are using np array since we will be using Albumentations library which req np array
        mask = np.array(Image.open(mask_path).convert("L"), dtype=np.float32)
        mask[mask == 255.0] = 1.0

        if self.transform:
            augmentations = self.transform(image = image, mask = mask)
            image = augmentations["image"]
            mask = augmentations["mask"]

        return image, mask


In [None]:
import torch
# from dataset import VineyardDataset
from torch.utils.data import DataLoader
import torchvision
import time

def get_loaders(
    train_dir,
    train_maskdir,
    val_dir,
    val_maskdir,
    train_transform,
    val_transform,
    batch_size,
    num_workers,
    pin_memory = True
):
    train_data = VineyardDataset(images_dir=train_dir,
                                masks_dir=train_maskdir,
                                transform=train_transform)
    val_data = VineyardDataset(images_dir=val_dir,
                                masks_dir=val_maskdir,
                                transform=val_transform)

    train_dataloader = DataLoader(dataset=train_data,
                                batch_size=batch_size,
                                shuffle=True,
                                num_workers=num_workers,
                                pin_memory=pin_memory)
    val_dataloader = DataLoader(dataset=val_data,
                                batch_size=batch_size,
                                shuffle=False,
                                num_workers=num_workers,
                                pin_memory=pin_memory)

    return train_dataloader, val_dataloader

def save_checkpoint(state, filename = "/content/drive/MyDrive/VineNetData/saved_images/my_checkpoint.pth.tar"):
    print("==> Saving CheckPoint")
    torch.save(state, filename)

def load_checkpoint(checkpoint, model):
    print("==> Loading CheckPoint")
    model.load_state_dict(checkpoint["state_dict"])


# Define the evaluation metrics functions
def calculate_precision(tp, fp):
    if (tp+fp) == 0:
      return 0.0
    return tp / (tp + fp)

def calculate_recall(tp, fn):
    if (tp+fn) == 0:
      return 0.0
    return tp / (tp + fn)

def calculate_mAP(tp, fp, fn):
    precision = calculate_precision(tp, fp)
    recall = calculate_recall(tp, fn)
    if precision == 0 or recall == 0:
        return 0.0
    return (2 * precision * recall) / (precision + recall)

def measure_inference_time(model, data_loader):
    model.eval()
    total_time = 0
    with torch.no_grad():
        for images, masks in val_loader:
            if torch.cuda.is_available():
                images = images.cuda()
            start_time = time.time()
            _ = model(images)
            end_time = time.time()
            total_time += end_time - start_time
    return total_time / len(data_loader)

# Assuming test_loader is defined similar to train_loader
def evaluate_model(model, loss_fn, val_loader):
    model.eval()
    tp, fp, fn = 0, 0, 0
    total_loss = 0
    with torch.no_grad():
        for images, masks in val_loader:
            if torch.cuda.is_available():
                images = images.cuda()
                masks = masks.cuda()

            outputs = model(images)
            masks = masks.reshape(outputs.shape)
            loss = loss_fn(outputs, masks)
            total_loss += loss.item()

            # Calculate true positives, false positives, and false negatives
            predictions = (outputs > 0.5).float()
            tp += torch.sum(predictions * masks).item()
            fp += torch.sum(predictions * (1 - masks)).item()
            fn += torch.sum((1 - predictions) * masks).item()

    precision = calculate_precision(tp, fp)
    recall = calculate_recall(tp, fn)
    mAP = calculate_mAP(tp, fp, fn)
    inference_time = measure_inference_time(model, val_loader)

    print(f'Precision: {precision:.4f}, Recall: {recall:.4f}, mAP: {mAP:.4f}, Inference Time: {inference_time:.4f}, Loss: {total_loss / len(val_loader):.4f}')


def check_accuracy(loader, model, device):
    num_correct = 0
    num_pixels = 0
    dice_score = 0
    model.eval()

    with torch.inference_mode():
        for X, y in loader:
            X = X.to(device)
            y = y.to(device).unsqueeze(1)
            preds = torch.sigmoid(model(X))
            preds = (preds > 0.5).float()
            num_correct += (preds == y).sum()
            num_pixels += torch.numel(preds)
            dice_score += (2*(preds*y).sum())/((preds + y).sum()+ 1e-8)
    print(
        f"Got {num_correct}/{num_pixels} with accuracy {(num_correct/num_pixels)*100: .3f}"
    )
    print(f"Dice score: {dice_score/len(loader)}")
    model.train()

def save_predictions_as_imgs(loader, model,device, folder_dir = "saved_images/"):
    model.eval()

    for idx, (X, y) in enumerate(loader):
        X, y = X.to(device), y.to(device)
        with torch.inference_mode():
            preds = torch.sigmoid(model(X))
            preds = (preds > 0.5).float()

        torchvision.utils.save_image(preds, f"{folder_dir}/pred_{idx}.png")
        torchvision.utils.save_image(y.unsqueeze(1), f"{folder_dir}{idx}.png")

    model.train()

In [None]:
import torch
import torch.nn as nn
import torchvision.transforms.functional as TF
import math

# class Attention(nn.Module):
#     def __init__(self, dim_k):
#         super(Attention, self).__init__()
#         self.query = nn.Linear(dim_k, dim_k)
#         self.key = nn.Linear(dim_k, dim_k)
#         self.value = nn.Linear(dim_k, dim_k)
#         self.dim_k = dim_k

#     def forward(self, q, k, v, mask=None):
#         # Apply linear transformations
#         q = self.query(q)
#         k = self.key(k)
#         v = self.value(v)

#         # Calculate the scale factor for the dot products
#         scale = math.sqrt(self.dim_k)

#         # Perform scaled dot-product attention
#         attention_scores = torch.matmul(q, k.transpose(-1, -2)) / scale

#         # Optional: Apply mask here if not None

#         # Apply softmax to get probabilities
#         attention = torch.softmax(attention_scores, dim=-1)

#         # Multiply by values
#         output = torch.matmul(attention, v)

#         return output

# class BottleNeck(nn.Module):
#     def __init__(self, in_channels, out_channels, dim_k):
#         super(BottleNeck, self).__init__()
#         self.attention = Attention(dim_k)
#         self.down_layer = nn.Sequential(
#             nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias=False),
#             nn.BatchNorm2d(out_channels),
#             nn.ReLU(inplace=True),
#             nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False),
#             nn.BatchNorm2d(out_channels),
#             nn.ReLU(inplace=True)
#         )
#         self.dim_k = dim_k  # Ensure this is used or removed appropriately

#     def forward(self, X):
#         out = self.down_layer(X)

#         # Reshape for attention. Ensure dimensions are correctly aligned
#         B, C, H, W = out.shape
#         q = out.view(B, C, H * W).transpose(1, 2)  # Reshape to (B, H*W, C)
#         k = q  # Same shape for simplicity in this example
#         v = q  # Same shape for simplicity in this example


#         attention_output = self.attention(q, k, v, None)

#         # Reshape attention output to match input feature map dimensions
#         attention_output = attention_output.transpose(1, 2).view(B, C, H, W)

#         # Combine the attention output with the input feature map
#         out = out + attention_output

#         return out

class Down(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(Down, self).__init__()
        self.down_layer = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias = False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias = False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )
    def forward(self, X):
        return self.down_layer(X)


class Up(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(Up, self).__init__()
        self.up_conv = nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2) # 1024 --> 512
        self.DoubleConv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias = False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias = False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, X, skip_connection):
        X1 = self.up_conv(X)
        if(X1.shape != skip_connection.shape):
            X1 = TF.resize(X1, skip_connection.shape[2:])
        X2 = torch.cat((X1, skip_connection), dim=1)

        return self.DoubleConv(X2)

class FinalConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(FinalConv, self).__init__()
        self.finalConv = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1)

    def forward(self, X):
        return self.finalConv(X)

class UNet(nn.Module):
    def __init__(self, in_channels = 3, out_channels = 1):
        super(UNet, self).__init__()
        self.max_pool = nn.MaxPool2d(2, 2)
        self.down1 = Down(in_channels, 64)
        self.down2 = Down(64, 128)
        self.down3 = Down(128, 256)
        self.down4 = Down(256, 512)

        # self.bottleNeck = Down(512, 1024)
        self.bottleNeck = Down(512, 1024)


        self.up1 = Up(1024, 512)
        self.up2 = Up(512, 256)
        self.up3 = Up(256, 128)
        self.up4 = Up(128, 64)

        self.finalConv = FinalConv(64, out_channels)


    def forward(self, X):

        ### DownSampling
        x1_skip = self.down1(X)
        x1 = self.max_pool(x1_skip)

        x2_skip = self.down2(x1)
        x2 = self.max_pool(x2_skip)

        x3_skip = self.down3(x2)
        x3 = self.max_pool(x3_skip)

        x4_skip = self.down4(x3)
        x4 = self.max_pool(x4_skip)

        ### BottleNeck Layer
        x5 = self.bottleNeck(x4)

        ### UpSampling
        x  = self.up1(x5, x4_skip)
        x  = self.up2(x , x3_skip)
        x  = self.up3(x , x2_skip)
        x  = self.up4(x , x1_skip)
        x  = self.finalConv(x)

        return x


# def test():
#     x = torch.randn(3, 1, 572, 572)
#     model = UNet(in_channels=1, out_channels=1)
#     preds = model(x)
#     print(f"Preds shape: {preds.shape}")
#     print(x.shape == preds.shape)

# if __name__ == "__main__":
#     test()

In [None]:
import torch
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm import tqdm
import torch.nn as nn
import torch.optim
# from u_net_model import UNet
from torch.utils.data import DataLoader
# from dataset import VineyardDataset

# from utils import(
#     load_checkpoint,
#     save_checkpoint,
#     get_loaders,
#     check_accuracy,
#     save_predictions_as_imgs
# )

LEARNING_RATE = 0.001
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE = 16
EPOCHS = 30
NUM_WORKERS = 2
IMAGE_HEIGHT = 128
IMAGE_WIDTH = 256
PIN_MEMORY = True
LOAD_MODEL = False
TRAIN_IMG_DIR = "/content/drive/MyDrive/VineNetData/train/images"
TRAIN_MASK_DIR = "/content/drive/MyDrive/VineNetData/train/masks"
VAL_IMG_DIR = "/content/drive/MyDrive/VineNetData/val/images"
VAL_MASK_DIR = "/content/drive/MyDrive/VineNetData/val/masks"

def train_fn(loader, model, optimizer, loss_fn, scaler):
    loop = tqdm(loader)

    train_losses = []
    for batch_idx, (data, targets) in enumerate(loop):
        model.to(device=DEVICE)
        data = data.to(device = DEVICE)
        targets = targets.float().unsqueeze(1).to(device = DEVICE)

        # forward
        model.train()
        predictions = model(data)
        loss = loss_fn(predictions, targets)
        print((loss.to("cpu")).detach().numpy())
        train_losses.append(loss.to("cpu").detach().numpy())

        # backward
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        loop.set_postfix(loss = loss.item())
        return train_losses

### TRAINING
train_transform = A.Compose(
    [
        A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),
        A.Rotate(limit=35, p=1.0),
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.1),
        A.Normalize(
            mean=[0.0, 0.0, 0.0],
            std = [1.0, 1.0, 1.0],
            max_pixel_value=255.0
        ),
        ToTensorV2()
    ]
)

val_transform = A.Compose(
    [
        A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),
        A.Normalize(
            mean=[0.0, 0.0, 0.0],
            std = [1.0, 1.0, 1.0],
            max_pixel_value=255.0
        ),
        ToTensorV2()
    ]
)

new_model = UNet(in_channels=3, out_channels=1).to(device=DEVICE)
loss_fn = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(params=new_model.parameters(), lr=LEARNING_RATE)

train_loader, val_loader = get_loaders(train_dir=TRAIN_IMG_DIR,
                                        train_maskdir=TRAIN_MASK_DIR,
                                        val_dir=VAL_IMG_DIR,
                                        val_maskdir=VAL_MASK_DIR,
                                        train_transform=train_transform,
                                        val_transform=val_transform,
                                        batch_size=BATCH_SIZE,
                                        num_workers=NUM_WORKERS,
                                        pin_memory=PIN_MEMORY)

if LOAD_MODEL:
    load_checkpoint(torch.load("my_checkpoint.pth.tar"), new_model)
scaler = torch.cuda.amp.GradScaler()

train_losses = []
# loop = tqdm(train_loader)
for epoch in range(EPOCHS):
    print(f"------Epoch: {epoch}------")

    # train_losses = train_fn(train_loader, new_model, optimizer, loss_fn, scaler)
    loop = tqdm(train_loader)

    train_loss = []
    for batch_idx, (data, targets) in enumerate(loop):
        new_model.to(device=DEVICE)
        data = data.to(device = DEVICE)
        targets = targets.float().unsqueeze(1).to(device = DEVICE)

        # forward
        new_model.train()
        predictions = new_model(data)
        loss = loss_fn(predictions, targets)
        train_loss.append(loss.to("cpu").detach().numpy())

        # backward
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        loop.set_postfix(loss = loss.item())
    train_losses.append(sum(train_loss)/len(train_loss))
    # if(len(train_losses)>3):
    #   if(abs(train_losses[-1]-train_losses[-2])<0.0001):
    #     checkpoint = {"state_dict": new_model.state_dict(),"optimizer": optimizer.state_dict()}
    #     save_checkpoint(checkpoint)
    #     break

    # save the new_model
    checkpoint = {
        "state_dict": new_model.state_dict(),
        "optimizer": optimizer.state_dict()
    }
    save_checkpoint(checkpoint)

    check_accuracy(val_loader, new_model, device= DEVICE)
    # evaluate_new_model(new_model, loss_fn, val_loader)

    # print some examples to a folder
    save_predictions_as_imgs(
        val_loader,
        new_model,
        folder_dir="/content/drive/MyDrive/VineNetData/saved_images",
        device=DEVICE
    )

# def test_fn():
#     pass

# if __name__ == "__main__":
#     main()

------Epoch: 0------


  self.pid = os.fork()
100%|██████████| 36/36 [07:30<00:00, 12.51s/it, loss=0.225]


==> Saving CheckPoint


KeyboardInterrupt: 

In [None]:

import matplotlib.pyplot as plt

# Plot losses versus epochs
plt.plot(range(1, len(train_losses) + 1), train_losses, marker='o', linestyle='-')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss vs. Epoch')
plt.grid(False)
plt.show()


In [None]:
def validate(model, val_loader):
    model.eval()  # Set model to evaluation mode
    with torch.no_grad():
        total_loss = 0
        for images, masks in val_loader:
            if torch.cuda.is_available():
                images = images.cuda()
                masks = masks.cuda()
            outputs = model(images)
            print(outputs.shape)
            loss = loss_fn(outputs, masks)
            total_loss += loss.item()
    print(f'Validation Loss: {total_loss / len(val_loader):.4f}')

    model.train()  # Set model back to training mode


In [None]:
import torch
import torch.nn as nn
import torchvision.transforms.functional as TF
from torch.utils.data import DataLoader
import time

# Define the evaluation metrics functions
def calculate_precision(tp, fp):
    return tp / (tp + fp)

def calculate_recall(tp, fn):
    return tp / (tp + fn)

def calculate_mAP(tp, fp, fn):
    precision = calculate_precision(tp, fp)
    recall = calculate_recall(tp, fn)
    return (2 * precision * recall) / (precision + recall)

#------------------------------------

def compute_iou_and_miou(outputs, labels, num_classes):
    intersection = torch.zeros(num_classes)
    union = torch.zeros(num_classes)

    for cls in range(num_classes):
        intersection[cls] = torch.logical_and(outputs == cls, labels == cls).sum()
        union[cls] = torch.logical_or(outputs == cls, labels == cls).sum()

    iou = torch.div(intersection, union)
    valid_classes = union.nonzero().size(0)
    miou = iou.sum() / valid_classes

    return iou, miou

#----------------------------------------


def jaccard_index(gt, pred):
    pred = pred.to("cpu").numpy()
    gt = gt.to("cpu").numpy()
    pred = np.where(pred >= 0.5, 1, 0)
    gt = (gt >= 0.5).astype(int)
    intersection = np.sum(gt * pred)
    union = np.sum(gt) + np.sum(pred) - intersection
    jaccard = intersection / (union + 1e-8)
    return jaccard

def measure_inference_time(model, data_loader):
    model.eval()
    total_time = 0
    with torch.no_grad():
        for images, masks in val_loader:
            if torch.cuda.is_available():
                images = images.cuda()
            start_time = time.time()
            _ = model(images)
            end_time = time.time()
            total_time += end_time - start_time
    return total_time / len(data_loader)

# Assuming test_loader is defined similar to train_loader
def evaluate_model(model, criterion, val_loader):
    model.eval()
    tp, fp, fn = 0, 0, 0
    total_loss = 0
    with torch.no_grad():
        for images, masks in val_loader:
            if torch.cuda.is_available():
                images = images.cuda()
                masks = masks.cuda()

            outputs = model(images)
            masks = masks.unsqueeze(1)
            # print(masks.shape)
            loss = criterion(outputs, masks)
            total_loss += loss.item()

            # Calculate true positives, false positives, and false negatives
            predictions = (outputs > 0.5).float()
            tp += torch.sum(predictions * masks).item()
            fp += torch.sum(predictions * (1 - masks)).item()
            fn += torch.sum((1 - predictions) * masks).item()

    precision = calculate_precision(tp, fp)
    recall = calculate_recall(tp, fn)
    mAP = calculate_mAP(tp, fp, fn)
    inference_time = measure_inference_time(model, val_loader)
    iou, miou = compute_iou_and_miou(predictions, masks, 1)
    jacc_list = []
    for i in range(len(masks)):
      jacc_list.append((jaccard_index(masks[i], predictions[i])))
    jacc_list = np.array(jacc_list)

    print(f'Precision: {precision:.4f}, Recall: {recall:.4f}, mAP: {mAP:.4f}, miou: {miou}, Jaccard_index: {np.mean(jacc_list)}, Inference Time: {inference_time:.4f}, Loss: {total_loss / len(val_loader):.4f}')

evaluate_model(new_model, loss_fn, val_loader)

In [None]:
import matplotlib.pyplot as plt
import torch

def visualize_test_results(images, masks, outputs, num_images=3):
    fig, axs = plt.subplots(nrows=3, ncols=num_images, figsize=(10, 10), sharex=True, sharey=True)
    for i in range(num_images):
        axs[0, i].imshow(images[i].permute(1, 2, 0).cpu().numpy())
        axs[0, i].set_title('Input Image')
        axs[0, i].axis('off')

        # print(masks.shape)
        axs[1, i].imshow(masks[i].cpu(), cmap='gray')
        axs[1, i].set_title('True Mask')
        axs[1, i].axis('off')

        axs[2, i].imshow(outputs[i][0].sigmoid().detach().cpu().numpy() > 0.5, cmap='gray')
        axs[2, i].set_title('Predicted Mask')
        axs[2, i].axis('off')

    plt.tight_layout()
    plt.show()

# Assuming test_loader is set up similarly to train_loader
new_model.eval()
with torch.no_grad():
    for images, masks in val_loader:
        if torch.cuda.is_available():
            images = images.cuda()
            masks = masks.cuda()

        outputs = new_model(images)
        visualize_test_results(images, masks, outputs)
        break  # Only display one batch


In [None]:
# Assuming you have a trained model and a test dataset (test_loader)
new_model.eval()  # Set the model to evaluation mode

from torch.utils.data import DataLoader

# Assuming you have a test dataset named 'test_dataset'
# and batch_size is set to 10
from torch.utils.data import Dataset
from PIL import Image
import os

class TestDataset(Dataset):
    def __init__(self, images_dir, transform=None):
        self.images_dir = images_dir
        self.image_paths = [os.path.join(image_dir, img_name) for img_name in os.listdir(images_dir) if img_name.endswith('.png')]
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image

image_dir = '/content/drive/MyDrive/vinenet_testdata/images'
test_dataset = TestDataset(image_dir, transform=None)

# Define the test loader
# test1_loader = DataLoader(test_dataset, batch_size=10, shuffle=False)
import torch
from torchvision.transforms import ToTensor

def custom_collate(batch):
    # Convert PIL.Image.Image objects to tensors
    transformed_batch = [ToTensor()(item) for item in batch]
    return torch.utils.data.dataloader.default_collate(transformed_batch)

# Example usage:
# Assuming test1_loader is your DataLoader
test1_loader = torch.utils.data.DataLoader(test_dataset, batch_size=10, collate_fn=custom_collate)

#----------------------------
# Example:
new_model = UNet(3, 1)

# Load the saved state dictionary
checkpoint_path = "/content/drive/MyDrive/VineNetData/saved_images/my_checkpoint.pth.tar"
checkpoint = torch.load(checkpoint_path)

# Load the state dictionary into your model
new_model.load_state_dict(checkpoint['state_dict'])
new_model.to("cuda")

# Ensure that the model is in evaluation mode
new_model.eval()

#----------------------------
# Create an empty list to store the predicted masks
predicted_masks = []

with torch.no_grad():
    for images in test1_loader:  # Iterate over the test dataset
        if torch.cuda.is_available():
            images = images.cuda()

        # Forward pass to get predictions
        outputs = new_model(images)

        # Convert the output to probabilities by applying the sigmoid function
        # You might need to adjust this depending on your model architecture
        probabilities = torch.sigmoid(outputs)

        # Convert probabilities to binary predictions (0 or 1) based on a threshold
        # You can adjust the threshold based on your model and dataset
        binary_predictions = (probabilities > 0.5).float()

        # Append the predicted masks to the list
        predicted_masks.append(binary_predictions.cpu())  # Convert to CPU if necessary

# Concatenate the predicted masks along the batch dimension
# This assumes that your test_loader batches contain only one image each
predicted_masks = torch.cat(predicted_masks, dim=0)

# predicted_masks


In [None]:
import os
import matplotlib.pyplot as plt
from torchvision.transforms.functional import to_pil_image

# Assuming you want to save the masks to a directory named 'predicted_masks'
output_dir = '/content/drive/MyDrive/vinenet_testdata/masks'
os.makedirs(output_dir, exist_ok=True)

# Save each predicted mask as an image with the same filename as the corresponding test image
for i, (image, mask) in enumerate(zip(test_dataset, predicted_masks)):
    # Get the filename of the corresponding test image
    test_filename = os.path.basename(test_dataset.image_paths[i])

    # Save the predicted mask image with the same filename as the test image
    mask_image = to_pil_image(mask.byte()).resize(1920, 960)
    mask_image.save(os.path.join(output_dir, test_filename))

    # Visualize and save the masks (optional)
#     plt.figure(figsize=(10, 5))

    # Original image
#     plt.subplot(1, 2, 1)
#     plt.imshow(image.permute(1, 2, 0))
#     plt.title('Original Image')
#     plt.axis('off')

#     # Predicted mask
#     plt.subplot(1, 2, 2)
    plt.imshow(mask.squeeze(), cmap='gray')
#     plt.title('Predicted Mask')
    plt.axis('off')

    plt.savefig(os.path.join(output_dir, test_filename))
    plt.close()


In [None]:
# importing required modules
import os
from zipfile import ZipFile

# specifying the zip file name
directory = "/content/drive/MyDrive/vinenet_testdata/masks"
file_name = "P05_B_predictedimages.zip"

# opening the zip file in READ mode
with ZipFile(file_name, 'w') as zip_file:
    for root, _, files in os.walk(directory):
        for file in files:
            file_path = os.path.join(root, file)
            zip_file.write(file_path, os.path.relpath(file_path, os.path.join(directory, '..')))

print(f"Done")

In [None]:
import os
import cv2

# Path to the test data folder
test_data_folder = "/content/drive/MyDrive/vinenet_testdata/images"

# Create a folder to save the segmentation masks
output_folder = "/content/drive/MyDrive/vinenet_testdata/masks"
os.makedirs(output_folder, exist_ok=True)

# Function to perform segmentation and save masks
def generate_masks(image_path, output_folder):
    # Read the image
    image = cv2.imread(image_path)

    mask = new_model(image)
    mask = mask.resize(1920, 960)

    # Assuming a simple thresholding example
    # gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    # _, binary_mask = cv2.threshold(gray_image, 128, 255, cv2.THRESH_BINARY)

    # Save the mask
    mask_filename = os.path.basename(image_path).split('.')[0] + '_mask.png'
    mask_path = os.path.join(output_folder, mask_filename)
    cv2.imwrite(mask_path, binary_mask)

# Iterate over each image in the test data folder
for filename in os.listdir(test_data_folder):
    if filename.endswith('.jpg') or filename.endswith('.png'):
        image_path = os.path.join(test_data_folder, filename)
        generate_masks(image_path, output_folder)
