In [15]:
import os
import shutil
import cv2
import pandas as pd
import tensorflow as tf
from tensorflow.keras.applications.vgg19 import preprocess_input
import matplotlib.pyplot as plt
import tensorflow 
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import InputLayer, BatchNormalization, Dropout, Flatten, Dense, Activation, MaxPool2D, Conv2D, GlobalAveragePooling2D, Concatenate, GlobalMaxPooling2D
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.applications.vgg19 import VGG19
from tensorflow.keras.applications.resnet_v2 import ResNet50V2, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.applications import ResNet50

from tensorflow.keras import layers
from tensorflow.keras.regularizers import l2

from keras.metrics import Recall,Precision
%matplotlib inline
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
# from keras import models, layers
from keras.utils import to_categorical

from sklearn.model_selection import train_test_split
from pathlib import Path
from sklearn.metrics import classification_report, confusion_matrix

from IPython.core.interactiveshell import InteractiveShell

In [None]:
import os
from pathlib import Path
import pandas as pd

def load_dataset_with_parent_labels(root_path, excluded_classes):

    root_path = Path(root_path)
    data = []
    
    for label_dir in root_path.iterdir():
        label = label_dir.name
        if label_dir.is_dir() and label not in excluded_classes:  # Ensure it's a directory
            
            for img_path in label_dir.glob('**/*.png'):
                if not img_path.stem.endswith('GT'):
                    data.append({
                        'path': str(img_path),
                        'label': label
                    })
    
    return pd.DataFrame(data)
    
root_path = "/kaggle/input/newly-fused-lung-pet-ct-dx3/Newly Fused Dataset"
aug_root_path = "/kaggle/input/augmented-fused-staging-dx"
none_aug_df = load_dataset_with_parent_labels(root_path, ['smallsellcarcinoma', 'squamouscellcarcinoma'])
aug_df = load_dataset_with_parent_labels(aug_root_path, ['M', 'T', 'N', 'adenocarcinoma'])

# concatenating both dfs
df = pd.concat([aug_df, none_aug_df], ignore_index=True)

print(f"Total images: {len(df)}")
print("Label distribution:")
print(df['label'].value_counts())    

In [17]:
# import numpy as np
# import pandas as pd

# def custom_undersample(df, sampling_dict):
#     """
#     Custom undersampling based on target counts per class.
#     Classes not in the dictionary remain unchanged.
#     """
#     np.random.seed(42)  # For reproducibility
#     dfs = []
    
#     for class_name, target_count in sampling_dict.items():
#         class_samples = df[df['label'] == class_name]
#         if len(class_samples) > target_count:
#             # Undersample if class is larger than target
#             selected_indices = np.random.choice(
#                 class_samples.index,
#                 size=target_count,
#                 replace=False
#             )
#             dfs.append(df.loc[selected_indices])
#         else:
#             # Keep all if class is smaller than target
#             dfs.append(class_samples)
    
#     # Add classes not in the dictionary (unchanged)
#     other_classes = df[~df['label'].isin(sampling_dict.keys())]
#     if not other_classes.empty:
#         dfs.append(other_classes)
    
#     return pd.concat(dfs).sample(frac=1).reset_index(drop=True)

In [19]:
filtered_df = df[df['label'].isin(['smallcellcarcinoma', 'adenocarcinoma', 'squamouscellcarcinoma'])]

In [20]:
class_counts = filtered_df['label'].value_counts().sort_index()
print(class_counts)

label
adenocarcinoma           3352
smallcellcarcinoma       3430
squamouscellcarcinoma    2862
Name: count, dtype: int64


In [21]:
from tensorflow.keras.applications.efficientnet import preprocess_input

train_df, test_df = train_test_split(filtered_df, test_size=0.2, random_state=2, shuffle=True, stratify = filtered_df['label']) 

batch_size = 64
train_generator = ImageDataGenerator(
    preprocessing_function=preprocess_input,
    validation_split=0.2
)
test_generator = ImageDataGenerator(preprocessing_function=preprocess_input)

train_imgs = train_generator.flow_from_dataframe(
    dataframe = train_df,
    x_col = "path",
    y_col = "label",
    target_size = (224,224),
    color_mode = "rgb",
    class_mode = "categorical",
    batch_size = batch_size,
    shuffle = True,
    subset = "training"
)

val_imgs = train_generator.flow_from_dataframe(
    dataframe = train_df,
    x_col = "path",
    y_col = "label",
    target_size =(224,224),
    color_mode = "rgb",
    class_mode = "categorical",
    batch_size = batch_size,
    shuffle = False,
    subset = "validation"
)

test_imgs = test_generator.flow_from_dataframe(
    dataframe = test_df,
    x_col = "path",
    y_col = "label",
    target_size = (224,224),
    color_mode = "rgb",
    class_mode = "categorical",
    batch_size = batch_size,
    shuffle = False
)  # No augmentation for validation

Found 6172 validated image filenames belonging to 3 classes.
Found 1543 validated image filenames belonging to 3 classes.
Found 1929 validated image filenames belonging to 3 classes.


In [22]:
print(pd.Series(train_imgs.classes).value_counts())  # Per-class counts
print(pd.Series(val_imgs.classes).value_counts())  # Per-class counts

1    2176
0    2142
2    1854
Name: count, dtype: int64
1    568
0    539
2    436
Name: count, dtype: int64


In [23]:
images, labels = next(train_imgs)
print("Labels shape:", labels.shape)
print("One-hot example:", labels[0])

Labels shape: (64, 3)
One-hot example: [0. 1. 0.]


In [24]:
images, _ = next(train_imgs)
print("Min pixel value:", images.min())
print("Max pixel value:", images.max())

Min pixel value: 0.0
Max pixel value: 255.0


In [26]:
from tensorflow.keras.applications import EfficientNetB0, DenseNet121, DenseNet201, ResNet50
from tensorflow.keras.models import Model

def build_model():
    base_model = EfficientNetB0(
        input_shape=(224, 224, 3),
        include_top=False,
        weights="imagenet",
    )
    base_model.trainable = False
    for layer in base_model.layers[-20:]: #30   reduce the unfreezed layers -->  less complex model
        layer.trainable = True
    
    
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    
    x = Dense(256)(x)
    x = BatchNormalization()(x)  
    x = Activation('relu')(x)
    
    x = Dense(256)(x)
    x = BatchNormalization()(x)   
    x = Activation('relu')(x)   

    x = Dense(128)(x)
    x = Activation('relu')(x)   

    x = Dense(64)(x)
    x = Activation('relu')(x)   

    outputs = Dense(3, activation='softmax')(x),

    model = Model(inputs=base_model.input, outputs=outputs)
    
    return model

In [28]:
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, Callback
from tensorflow.keras.optimizers.schedules import ExponentialDecay

training_samples_num = len(train_df)*0.8

initial_learning_rate = 5e-3
lr_schedule = ExponentialDecay(
    initial_learning_rate,
    decay_steps=(training_samples_num//batch_size)*2,
    decay_rate=0.4, # reduce by 20%
    staircase=True)

model = build_model()

model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=lr_schedule),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

class LrValLogger(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        lr = self.model.optimizer.learning_rate
        if hasattr(lr, 'numpy'):
            lr_value = lr.numpy()
        else:
            lr_value = tf.keras.backend.get_value(lr)
        
        val_loss = logs.get('val_loss')
        print(f"            learning rate ={lr_value:.6f}")

history = model.fit(
    train_imgs,
    validation_data=val_imgs,
    epochs=15,
    callbacks=[
        LrValLogger()
    ]
)

  self._warn_if_super_not_called()


Epoch 1/15
[1m97/97[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 421ms/step - accuracy: 0.5290 - loss: 0.9473            learning rate =0.005000
[1m97/97[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m92s[0m 618ms/step - accuracy: 0.5301 - loss: 0.9455 - val_accuracy: 0.7660 - val_loss: 1.7320
Epoch 2/15
[1m97/97[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 329ms/step - accuracy: 0.7878 - loss: 0.4820            learning rate =0.002000
[1m97/97[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 412ms/step - accuracy: 0.7881 - loss: 0.4814 - val_accuracy: 0.8276 - val_loss: 0.7370
Epoch 3/15
[1m97/97[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 330ms/step - accuracy: 0.9062 - loss: 0.2440            learning rate =0.002000
[1m97/97[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 416ms/step - accuracy: 0.9063 - loss: 0.2436 - val_accuracy: 0.9339 - val_loss: 0.1998
Epoch 4/15
[1m97/97[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 336m

In [None]:
def trainingPlots(history):
    # Accuracy plot
    plt.figure(figsize=(12, 6))
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend()
    # plt.savefig('accuracy_plot.png')
    plt.show()
    plt.close()
    
    # Loss plot
    plt.figure(figsize=(12, 6))
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend()
    # plt.savefig('loss_plot.png')
    plt.show()
    plt.close()    
    # Accuracy
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Train Accuracy', marker='o')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy', marker='x')
    plt.title('Accuracy Over Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.grid(True)

    # Loss
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Train Loss', marker='o')
    plt.plot(history.history['val_loss'], label='Validation Loss', marker='x')
    plt.title('Loss Over Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    # plt.show()

    plt.tight_layout()
    acc_loss_plot_path = os.path.join(output_dir, "accuracy_loss_plot.png")
    plt.savefig(acc_loss_plot_path)
    plt.close()

def modelEvaluation(model, test_imgs):
    y_pred = model.predict(test_imgs)
    y_pred = np.argmax(y_pred, axis=1)
    
    y_true = test_imgs.labels
    
    cm = confusion_matrix(y_true, y_pred)
    
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=test_imgs.class_indices.keys(),
                yticklabels=test_imgs.class_indices.keys())
    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.savefig('confusion_matrix.png')
    plt.show()
    plt.close()
    
    report = classification_report(y_true, y_pred, target_names=test_imgs.class_indices.keys())
    with open('classification_report.txt', 'w') as f:
        f.write(report)

trainingPlots(history)
modelEvaluation(model, test_imgs)

model.save('lung_classification_model.h5')

In [59]:
import tensorflow as tf
import numpy as np

def make_gradcam_heatmap(img_array, model, last_conv_layer_name):
    grad_model = tf.keras.models.Model(
        inputs=model.inputs,
        outputs=[model.get_layer(last_conv_layer_name).output, model.output]
    )

    with tf.GradientTape() as tape:
        conv_outputs, predictions = grad_model(img_array)
        pred_index = tf.argmax(predictions[0])
        print(pred_index)
        class_output = predictions[:, pred_index]

    grads = tape.gradient(class_output, conv_outputs)

    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    conv_outputs = conv_outputs[0] 
    heatmap = conv_outputs @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)

    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap + tf.keras.backend.epsilon())
    
    return heatmap.numpy()
