In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import cv2
import pathlib

In [None]:
import torchvision.models as models
import torch
from torchsummary import summary

In [None]:
single_img = plt.imread("/kaggle/input/data/images_001/images/00000001_000.png")

In [None]:
plt.imshow(single_img, cmap="gray")

In [None]:
data = pd.read_csv("/kaggle/input/data/Data_Entry_2017.csv")

In [None]:
data

In [None]:
all_columns =list(data.columns)

In [None]:
all_columns

In [None]:
useful_columns = all_columns[0:2]+all_columns[7:11]

In [None]:
useful_columns

In [None]:
useful_data = data[useful_columns]

In [None]:
path_list = list()
file_names = list()
for absolute_path in pathlib.Path("/kaggle/input/data").glob("images_*/images/*.png"):
    
    path_list.append(str(absolute_path))                
    file_names.append(str(absolute_path).split("/")[-1])

In [None]:
path_list.sort()

In [None]:
path_list

In [None]:
useful_data["Image Path"]= path_list

In [None]:
useful_data

In [None]:
unique_diseases = set()

for disease in useful_data["Finding Labels"]:
    unique_diseases.update(set(disease.split("|")))

In [None]:
unique_diseases = list(unique_diseases)

In [None]:
unique_diseases.remove("No Finding")

In [None]:
unique_diseases

In [None]:
diseases2idx = dict(zip(unique_diseases , range(len(unique_diseases))))

In [None]:
diseases2idx

In [None]:
another_data = pd.read_csv("/kaggle/input/data/BBox_List_2017.csv")

In [None]:
another_data

In [None]:
b_box = pd.DataFrame(another_data)

In [None]:
train_data = pd.DataFrame(useful_data)

In [None]:
columns_to_drop = b_box["Image Index"]
    

In [None]:
columns_to_drop

In [None]:
test_data = train_data[train_data['Image Index'].isin(columns_to_drop)]

In [None]:
train_data = train_data[~train_data['Image Index'].isin(columns_to_drop)]

In [None]:
test_data

In [None]:
train_data

In [None]:
y_train =  train_data["Finding Labels"]

In [None]:
y_train


In [None]:
y_train =list()
for single_img_labels in train_data["Finding Labels"]:
    single_img_multi_hot_vector = np.zeros((len(diseases2idx,)))
    diseases = single_img_labels.split("|")
    if "No Finding" not in diseases:
        for single_diseases in diseases:
            single_img_multi_hot_vector[diseases2idx[single_diseases]]=1.0
    y_train.append(single_img_multi_hot_vector)
y_train = np.array(y_train)

In [None]:
y_train

In [None]:
y_train.shape

In [None]:
y_test =list()
for single_img_labels in test_data["Finding Labels"]:
    single_img_multi_hot_vector = np.zeros((len(diseases2idx,)))
    diseases = single_img_labels.split("|")
    if "No Finding" not in diseases:
        for single_diseases in diseases:
            single_img_multi_hot_vector[diseases2idx[single_diseases]]=1.0
    y_test.append(single_img_multi_hot_vector)
y_test = np.array(y_test)

In [None]:
y_test

In [None]:
y_test.shape

In [None]:
def training_data_mini_batches_generator(training_data_df, mb_size=10):
    
    for i in range (training_data_df.shape[0]//mb_size):

        img_np_array_mb_list = list()
        
        for j in range(i*mb_size,(i+1)*mb_size):
            
            single_img_path  = train_data["Image Path"].iloc[j]
            img_np_array = plt.imread(single_img_path)

            resized_img_np_array = cv2.resize(img_np_array,(224,224))
            if len(resized_img_np_array.shape) == 2 or resized_img_np_array.shape[2] == 1:
            # grayscale image (H, W) or (H, W, 1)
                three_channel_np_array = cv2.cvtColor(resized_img_np_array, cv2.COLOR_GRAY2RGB)
            elif resized_img_np_array.shape[2] == 4:
            # image with 4 channels (e.g., RGBA) â†’ drop alpha
                three_channel_np_array = resized_img_np_array[:, :, :3]
            else:
            # already 3 channels
                three_channel_np_array = resized_img_np_array

            img_np_array_mb_list.append(three_channel_np_array)

        x_train_mb = np.array(img_np_array_mb_list)
        y_train_mb = y_train[i*mb_size:(i+1)*mb_size,:]

        yield x_train_mb,y_train_mb
        

In [None]:
our_training_datagen = training_data_mini_batches_generator(train_data)

In [None]:
X_train_mb , y_train_mb = our_training_datagen.__next__()

In [None]:
class our_custom_resnet50_cnn(torch.nn.Module):
    def __init__(self, num_classes):
        super().__init__()

        self.resnet50_full_net = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)

        for param in self.resnet50_full_net.parameters():
            param.requires_grad = False

        self.resnet50_full_net.fc = torch.nn.Sequential(
            torch.nn.Linear(self.resnet50_full_net.fc.in_features, num_classes),
            torch.nn.Sigmoid()  
        )

    def forward(self, x):
        
        y_pred_mb =  self.resnet50_full_net(x)
        return y_pred_mb

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
our_model = our_custom_resnet50_cnn(num_classes=14)
our_model = our_model.to(device)

In [None]:
summary(our_model, input_size = (3,224,224))

In [None]:
test_data

In [None]:
def test_data_mini_batches_generator(test_data_df, mb_size=10):
    for i in range(test_data_df.shape[0] // mb_size):
        img_mb_list = []
        meta_mb_list = []

        for j in range(i * mb_size, (i + 1) * mb_size):
            # Image path & loading
            img_path = test_data_df["Image Path"].iloc[j]
            img = plt.imread(img_path)

            # Original size
            ow = test_data_df["OriginalImage[Width"].iloc[j]
            oh = test_data_df["Height]"].iloc[j]
            spacing_x = test_data_df["OriginalImagePixelSpacing[x"].iloc[j]
            spacing_y = test_data_df["y]"].iloc[j]

            # Resize
            img_resized = cv2.resize(img, (224, 224))
            img_rgb = cv2.cvtColor(img_resized, cv2.COLOR_GRAY2RGB)
            img_rgb = img_rgb.transpose((2, 0, 1))  # CHW format

            # Scale spacings
            scaled_spacing_x = (ow / 224) * spacing_x
            scaled_spacing_y = (oh / 224) * spacing_y

            img_mb_list.append(img_rgb)
            meta_mb_list.append([scaled_spacing_x, scaled_spacing_y])

        # Convert to tensors
        x_img_mb = np.array(img_mb_list).astype(np.float32) / 255.0
        x_meta_mb = np.array(meta_mb_list).astype(np.float32)
        y_mb = y_test[i * mb_size:(i + 1) * mb_size, :]

        yield (x_img_mb, x_meta_mb), y_mb


In [None]:
our_testing_datagen = test_data_mini_batches_generator(test_data)

In [None]:
X_test_mb , y_test_mb = our_testing_datagen.__next__()

In [None]:
X_test_mb

In [None]:
loss_func = torch.nn.BCELoss()
optimizer = torch.optim.SGD(params = our_model.parameters(),lr=0.01)
epochs = 5
mb_size = 10

for i in range(epochs):

    our_training_data_gen = training_data_mini_batches_generator(train_data)
    
    for x_train_mb,y_train_mb in our_training_data_gen:

        x_train_mb = torch.tensor(x_train_mb, dtype=torch.float32).permute(0, 3, 1, 2).to(device)  # Numpy -> Tensor + channel-first
        y_train_mb = torch.tensor(y_train_mb, dtype=torch.float32).to(device)


    y_pred_mb_train = our_model(x_train_mb)
    training_mb_loss = loss_func(y_pred_mb_train,y_train_mb)

    training_mb_loss.backward()
    optimizer.step()
    optimizer.zero_grad()

    print("Epoch # {}, Training Loss Value = {}".format(i+1,training_mb_loss))
        