<a href="https://www.kaggle.com/code/averma111/pytorch-hubmap-cnn?scriptVersionId=131946312" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [68]:
%%capture
!pip install 'git+https://github.com/cocodataset/cocoapi.git#subdirectory=PythonAPI'

In [69]:
import numpy as np
import pandas as pd
from glob import glob
import json
from torch.utils.data import Dataset, DataLoader
from torchmetrics import AveragePrecision
import torch
import torchvision
from torchvision.transforms import transforms
import os
from PIL import Image
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import torch.nn.functional as F
import time
import base64
from pycocotools import _mask as coco_mask
import typing as t
import zlib

ModuleNotFoundError: No module named 'pycocotools'

In [None]:
class Config:
    
    batch_size= 64
    n_epochs = 10
    learning_rate = 0.001
    
    
    
config = Config()

In [None]:
class Acquisition:
    
    def get_datframe(self,path):
        return pd.read_csv(path)
    
    def get_json_dataframe(self, json_file):
        data = []
        with open(json_file, 'r') as file:
            for line in file:
                item = json.loads(line)
                data.append(item)
        
        json_df = pd.DataFrame(data)
        return json_df
    
        
        
acq = Acquisition()      

In [None]:
title=acq.get_datframe(path='/kaggle/input/hubmap-hacking-the-human-vasculature/tile_meta.csv')
title.head()

In [None]:
wsi = acq.get_datframe(path='/kaggle/input/hubmap-hacking-the-human-vasculature/wsi_meta.csv')
wsi.head()

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
polygons_df = acq.get_json_dataframe('/kaggle/input/hubmap-hacking-the-human-vasculature/polygons.jsonl')
polygons_df.head()

In [None]:
class ImageHuBMAPDataset(Dataset):
    
    def __init__(self, image_dir, labels_file, transform=None):
        
        with open(labels_file, 'r') as json_file:
            self.json_labels = [json.loads(line) for line in json_file]
            
        self.image_dir = image_dir
        self.transform = transform

    def __len__(self):
        return len(self.json_labels)

    def __getitem__(self, idx):
        image_path = os.path.join(self.image_dir, f"{self.json_labels[idx]['id']}.tif")
        image = Image.open(image_path)

        mask = np.zeros((512, 512), dtype=np.float32)

        for annot in self.json_labels[idx]['annotations']:
            cords = annot['coordinates']
            if annot['type'] == "blood_vessel":
                for cd in cords:
                    rr, cc = np.array([i[1] for i in cd]), np.asarray([i[0] for i in cd])
                    mask[rr, cc] = 1

        image = torch.tensor(np.array(image), dtype=torch.float32,requires_grad=True).permute(2, 0, 1)  
        mask = torch.tensor(mask, dtype=torch.float32)

        if self.transform:
            image = self.transform(image)

        return image, mask

In [None]:
class get_images_labels:
    def __init__(self, root_path):
        self.root_path = root_path
    
    def set_path(self,filename):
        return os.path.join(self.root_path,filename)
    
    def get_test_path(self,filename):
        return os.path.join(self.root_path,filename)
        

        
img_lble = get_images_labels(root_path='/kaggle/input/hubmap-hacking-the-human-vasculature')      

image_folder = img_lble.set_path(filename='train')
labels_file = img_lble.set_path(filename='polygons.jsonl')
test_image_folder =img_lble.get_test_path(filename='test')

In [None]:
plt.figure(figsize=(12, 8))

dataset = ImageHuBMAPDataset(image_dir=image_folder, labels_file=labels_file)
num_samples = 8

num_rows = (num_samples + 3) // 4  
num_cols = min(num_samples, 4)

for i in range(num_samples):

    image, mask = dataset[i]
    image = image.permute(1, 2, 0).detach().numpy() / 255
    subplot_index = i + 1

    plt.subplot(num_rows, 2 * num_cols, 2 * subplot_index - 1)
    plt.imshow(image)
    plt.axis('off')
    plt.title('Train_Image')

    mask_subplot_index = (subplot_index - 1) % num_samples + 1

    plt.subplot(num_rows, 2 * num_cols, 2 * subplot_index)
    plt.imshow(mask, cmap='gray')
    plt.axis('off')
    plt.title('Train_Mask')

plt.tight_layout(pad=0.2)
plt.show()

In [None]:
class HuBMAPClassificationNN(torch.nn.Module):
    def __init__(self):
        super(HuBMAPClassificationNN, self).__init__()
        
        self.network_1 = torch.nn.Sequential(
            torch.nn.Conv2d(3, 32, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.Conv2d(32, 32, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.network_2 = torch.nn.Sequential(
            torch.nn.Conv2d(32, 32, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.Conv2d(32, 32, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.ConvTranspose2d(32, 1, kernel_size=2, stride=2)
        )

    def forward(self, x):
        x = self.network_1(x)
        x = self.network_2(x)
        x = torch.sigmoid(x)
        return x

In [None]:
model = HuBMAPClassificationNN().to(device)
print(model)

In [None]:
class Trainer:
    
    def train_dataloader(self,image_folder,labels_file):
        dataset = ImageHuBMAPDataset(image_folder,labels_file)
        return DataLoader(dataset, batch_size=config.batch_size, shuffle=True)
        
  
    def fit(self,epochs, lr, model, train_loader,opt_func):
        history =[]
        result = {}
        
        criterion = torch.nn.BCELoss() 
        optimizer = opt_func(model.parameters(), lr=lr)
        
        for epoch in range(epochs):
            model.train() 
            running_loss = 0.0
            start_time = time.time()
            
            for images, masks in train_loader:
                images = images.to(device)
                masks = masks.to(device)
                optimizer.zero_grad()
                outputs = model(images)
                masks = masks.unsqueeze(1) 
                loss = criterion(outputs, masks)
                loss.backward()
                optimizer.step()
                running_loss += loss.item()

            epoch_loss = running_loss / len(train_loader)
            epoch_time = time.time() - start_time
            print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss:.4f}, Time: {epoch_time:.2f} seconds")
            result['Train_loss'] = epoch_loss
            history.append(result)
        return history 
        
    
trainer = Trainer()

In [None]:
history=trainer.fit(config.n_epochs, config.learning_rate, model, 
                    trainer.train_dataloader(image_folder,labels_file),
                    torch.optim.Adam)

In [None]:
def plot_losses(history):
    """ Plot the losses in each epoch"""
    train_losses = [x.get('Train_loss') for x in history]
    plt.plot(train_losses, '-bx')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend('Training')
    plt.title('Loss vs. No. of epochs');

plot_losses(history)

In [None]:
class ImageHuBMAPDatasetTest(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        self.image_files = sorted(os.listdir(image_dir))

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        image_path = os.path.join(self.image_dir, self.image_files[idx])
        image = Image.open(image_path)

        if self.transform:
            image = self.transform(image)

        return image

In [66]:
class Test:
    
    def encode_binary_mask(mask: np.ndarray) -> t.Text:
        if mask.dtype != bool:
            raise ValueError(
                "encode_binary_mask expects a binary mask, received dtype == %s" %
                mask.dtype)

        mask = np.squeeze(mask)
        if len(mask.shape) != 2:
            raise ValueError(
                "encode_binary_mask expects a 2d mask, received shape == %s" %
                mask.shape)

        mask_to_encode = mask.reshape(mask.shape[0], mask.shape[1], 1)
        mask_to_encode = mask_to_encode.astype(np.uint8)
        mask_to_encode = np.asfortranarray(mask_to_encode)

        encoded_mask = coco_mask.encode(mask_to_encode)[0]["counts"]

        binary_str = zlib.compress(encoded_mask, zlib.Z_BEST_COMPRESSION)
        base64_str = base64.b64encode(binary_str)
        return base64_str

    
    
    def mask_to_rle(self,mask):
        pixels = mask.flatten(order="F")
        pixels = np.concatenate([[0], pixels, [0]])
        runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
        runs[1::2] -= runs[::2]
        rle = " ".join(str(x) for x in runs)
        return rle
    
    def get_test_transforms(self):
        return transforms.Compose([transforms.ToTensor(),transforms.Normalize(
            mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])
        
    
    def test_dataloader(self,image_folder):
        dataset = ImageHuBMAPDatasetTest(test_image_folder,self.get_test_transforms())
        return DataLoader(dataset, batch_size=config.batch_size)
    
    

    def evaluate(self,model):
        model.eval()
        predictions = []
        with torch.no_grad():
            for images in self.test_dataloader(test_image_folder):
                images = images.to(device)
                outputs = model(images)
                predicted_masks = (outputs > 0.5).float()  
                predictions.extend(predicted_masks.cpu().numpy())
        return predictions
    
    
        
test = Test()


NameError: name 't' is not defined

In [None]:
class Submission:
    
    def submit_results(self,predictions,test_dataset):
        submission = pd.DataFrame(columns=["id", "height","width","prediction_string"])
        image_ids = [os.path.splitext(file)[0] for file in test_dataset.image_files]
        for image_id, prediction in zip(image_ids, predictions):
            rle_encoded = test.mask_to_rle(prediction)
            submission = submission.append({"id": image_id,"height":512,"width":512 ,"prediction_string": rle_encoded}, ignore_index=True)
        submission.to_csv("submission.csv", index=False)
        
        
submit = Submission()

predictions = test.evaluate(model)
test_dataset = ImageHuBMAPDatasetTest(test_image_folder,test.get_test_transforms())
submit.submit_results(predictions,test_dataset,)

In [None]:
test = pd.read_csv("/kaggle/working/submission.csv")
test.head()