In [5]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Conv2DTranspose, Concatenate, Dropout
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.metrics import MeanIoU

# Load and preprocess data
def load_data():
    # Load CSV containing image paths and labels
    csv_path = "C:/Users/sk731/OneDrive/Desktop/MLDSA2/train.csv"
    df = pd.read_csv(csv_path)
    
    # Define image and mask directories
    img_dir = "C:/Users/sk731/OneDrive/Desktop/MLDSA2/train/my_train_img"
    mask_dir = "C:/Users/sk731/OneDrive/Desktop/MLDSA2/train/my_train_seg"
    
    # Read image and mask paths
    img_paths = [os.path.join(img_dir, fname) for fname in df['id']]
    mask_paths = [os.path.join(mask_dir, fname.replace(".png", ".png")) for fname in df['id']]
    
    # Load images and masks
    images = [tf.image.decode_png(tf.io.read_file(fname)) for fname in img_paths]
    masks = [tf.image.decode_png(tf.io.read_file(fname)) for fname in mask_paths]
    
    # Normalize images and masks
    images = [tf.cast(img, tf.float32) / 255.0 for img in images]
    masks = [tf.cast(mask, tf.float32) / 255.0 for mask in masks]
    
    # Convert masks to binary format
    masks = [tf.where(mask > 0.5, 1.0, 0.0) for mask in masks]
    
    return images, masks

# Define Dice coefficient metric
def dice_coefficient(y_true, y_pred):
    intersection = tf.reduce_sum(y_true * y_pred)
    union = tf.reduce_sum(y_true) + tf.reduce_sum(y_pred)
    return (2.0 * intersection) / (union + 1e-7)  # Adding a small epsilon to avoid division by zero

# Define UNet++ model architecture
def unet_plus_plus(input_shape):
    inputs = Input(input_shape)
    
    # Encoder
    conv1 = Conv2D(64, 3, activation='relu', padding='same')(inputs)
    conv1 = Conv2D(64, 3, activation='relu', padding='same')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
    
    conv2 = Conv2D(128, 3, activation='relu', padding='same')(pool1)
    conv2 = Conv2D(128, 3, activation='relu', padding='same')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
    
    # Bottleneck
    conv3 = Conv2D(256, 3, activation='relu', padding='same')(pool2)
    conv3 = Conv2D(256, 3, activation='relu', padding='same')(conv3)
    
    # Decoder
    up1 = Conv2DTranspose(128, 2, strides=(2, 2), padding='same')(conv3)
    up1 = Concatenate()([conv2, up1])
    conv4 = Conv2D(128, 3, activation='relu', padding='same')(up1)
    conv4 = Conv2D(128, 3, activation='relu', padding='same')(conv4)
    
    up2 = Conv2DTranspose(64, 2, strides=(2, 2), padding='same')(conv4)
    up2 = Concatenate()([conv1, up2])
    conv5 = Conv2D(64, 3, activation='relu', padding='same')(up2)
    conv5 = Conv2D(64, 3, activation='relu', padding='same')(conv5)
    
    # Output layer
     # Output layer
    outputs = Conv2D(3, 1, activation='sigmoid')(conv5)
    
    model = Model(inputs=inputs, outputs=outputs)
    return model

# Load data
images, masks = load_data()

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(images, masks, test_size=0.2, random_state=42)

# Model parameters
input_shape = images[0].shape
batch_size = 100
epochs = 50

# Define UNet++ model
model = unet_plus_plus(input_shape)
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=[dice_coefficient])

# Callbacks
checkpoint = ModelCheckpoint("unet_plus_plus_model.h5", monitor='val_dice_coefficient', verbose=1, save_best_only=True, mode='max')
early_stopping = EarlyStopping(monitor='val_dice_coefficient', patience=5, verbose=1, mode='max', restore_best_weights=True)
# Convert lists of images to a single tensor
X_train = tf.stack(X_train)
X_test = tf.stack(X_test)

# Convert lists of masks to a single tensor
y_train = tf.stack(y_train)
y_test = tf.stack(y_test)


# Train model
history = model.fit(X_train, y_train, batch_size=batch_size, epochs=epochs, 
                    validation_split=0.1, callbacks=[checkpoint, early_stopping])

# Evaluate model on test data
loss, dice_coefficient = model.evaluate(X_test, y_test)
print("Test Loss:", loss)
print("Test Dice Coefficient:", dice_coefficient)

    
   



Epoch 1/50
