In [None]:
# Import libraries
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.metrics import MeanIoU

In [None]:
# Load Stenosis annotation data
stenosis_train = pd.read_parquet("../datasets/stenosis_train.parquet")
stenosis_val = pd.read_parquet("../datasets/stenosis_val.parquet")
stenosis_test = pd.read_parquet("../datasets/stenosis_test.parquet")

# Preview Stenosis annotation data
stenosis_train.head()

In [None]:
# Helper function to load tensors from .tf files
def load_tensor_from_file(file_path):
    serialized_tensor = tf.io.read_file(file_path)
    tensor = tf.io.parse_tensor(serialized_tensor, out_type=tf.float16)
    return tensor

In [None]:
# Load tensors
X_train = load_tensor_from_file("../datasets/X_train_stenosis.tf")
y_train = load_tensor_from_file("../datasets/y_train_stenosis.tf")
X_val = load_tensor_from_file("../datasets/X_val_stenosis.tf")
y_val = load_tensor_from_file("../datasets/y_val_stenosis.tf")
X_test = load_tensor_from_file("../datasets/X_test_stenosis.tf")
y_test = load_tensor_from_file("../datasets/y_test_stenosis.tf")

In [None]:
plt.figure(figsize=(12, 4))
plt.subplot(1, 3, 1)
plt.imshow(tf.squeeze(X_train[0]), cmap='gray')
plt.xticks([])
plt.yticks([])
plt.title('Train Image')
plt.subplot(1, 3, 2)
plt.imshow(tf.squeeze(X_train[1]), cmap='gray')
plt.xticks([])
plt.yticks([])
plt.title('Train Image Enhanced')

In [None]:
#Build dice loss function
def dice_loss(y_true, y_pred):
    smooth = 1e-6
    y_true_f = tf.reshape(y_true, [-1])
    y_pred_f = tf.reshape(y_pred, [-1])
    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    return 1 - (2. * intersection + smooth) / (tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) + smooth)

def dice_bce_loss(y_true, y_pred):
    bce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
    dice = dice_loss(y_true, y_pred)
    return bce + dice

In [None]:
# Build Model Architecture
def dense_block(x, num_layers, growth_rate, dropout_rate=0.2):
    for i in range(num_layers):
        # BatchNorm + ReLU + Conv + Dropout
        x1 = layers.BatchNormalization()(x)
        x1 = layers.Activation('relu')(x1)
        x1 = layers.Conv2D(growth_rate, (3, 3), padding='same')(x1)
        if dropout_rate > 0:
            x1 = layers.Dropout(dropout_rate)(x1)
        x = layers.Concatenate()([x, x1])  # Concatenate input and output
    return x

def transition_down(x, compression_factor, dropout_rate=0.2):
    filters = int(x.shape[-1] * compression_factor)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Conv2D(filters, (1, 1), padding='same')(x)
    if dropout_rate > 0:
        x = layers.Dropout(dropout_rate)(x)
    x = layers.AveragePooling2D((2, 2), strides=(2, 2))(x)
    return x

def transition_up(x, skip_connection, filters):
    x = layers.Conv2D(filters, (3, 3), padding='same')(x)
    x = layers.UpSampling2D((2, 2))(x)
    x = layers.Concatenate()([x, skip_connection])  # Concatenate with skip connection
    return x

def tiramisu(input_shape=(256, 256, 1), num_classes=1, num_blocks=2, initial_features=12, growth_rate=2, compression_factor=0.5, dropout_rate=0.2):
    inputs = layers.Input(shape=input_shape)
    x = layers.Conv2D(initial_features, (3, 3), padding='same')(inputs)

    # Downsampling path
    skip_connections = []
    for i in range(num_blocks):
        x = dense_block(x, num_layers=4, growth_rate=growth_rate, dropout_rate=dropout_rate)
        skip_connections.append(x)
        x = transition_down(x, compression_factor=compression_factor, dropout_rate=dropout_rate)

    # Bottleneck
    x = dense_block(x, num_layers=4, growth_rate=growth_rate, dropout_rate=dropout_rate)

    # Upsampling path
    skip_connections = reversed(skip_connections)
    for skip_connection in skip_connections:
        x = transition_up(x, skip_connection, filters=x.shape[-1])
        x = dense_block(x, num_layers=4, growth_rate=growth_rate, dropout_rate=dropout_rate)

    # Classifier
    x = layers.Conv2D(num_classes, (1, 1), activation='sigmoid')(x)

    model = models.Model(inputs=inputs, outputs=x)
    return model

#Build Tiramisu model
tf.keras.backend.clear_session()
model = tiramisu()
adam_optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
model.compile(optimizer=adam_optimizer, loss=dice_bce_loss, metrics=[MeanIoU(num_classes=2)])

# Model summary
model.summary()

In [None]:
history = model.fit(
    x=X_train,
    y=tf.cast(y_train == 255, tf.float16),
    batch_size=16,
    epochs=10,
    validation_data=(X_val, tf.cast(y_val == 255, tf.float16))
)

In [None]:

clean_results = pd.DataFrame(history.history).reset_index()
clean_results = round(clean_results, 4)
clean_results['index'] = (clean_results['index'] + 1)
clean_results.columns = ['Epoch', 'Loss', 'Mean IoU', 'Validaiton Loss', 'Validation Mean IoU']
plt.figure(figsize=(8,4))
plt.axis('off')
plt.table(cellText=clean_results.values, colLabels=clean_results.columns, cellLoc='center', loc='center', bbox=[0, 0, 1, 1])

In [None]:
y_pred = model.predict(X_test)

In [None]:
predicted_masks = (y_pred > 0.5).astype(np.float16)

iou_metric = MeanIoU(num_classes=2)

# Update states with the ground truth and predictions
iou_metric.update_state(tf.cast(y_test == 255, tf.float16), predicted_masks)

# Get the IoU score
test_iou = iou_metric.result().numpy()
print(f"Test IoU: {test_iou}")

In [None]:
# Example: Visualize the first test image, its true mask, and the predicted mask
plt.figure(figsize=(12, 4))
plt.subplot(1, 3, 1)
plt.imshow(tf.squeeze(X_test[0]), cmap='gray')
plt.xticks([])
plt.yticks([])
plt.title('Test Image')
plt.subplot(1, 3, 2)
plt.imshow(tf.squeeze(y_test[0]), cmap='gray')
plt.xticks([])
plt.yticks([])
plt.title('True Mask')
plt.subplot(1, 3, 3)
plt.imshow(tf.squeeze(predicted_masks[0]), cmap='gray')
plt.xticks([])
plt.yticks([])
plt.title('Predicted Mask')
plt.show()
