In [None]:
import tensorflow as tf
physical_devices=tf.compat.v1.config.experimental.list_physical_devices('GPU')
print('GPU is available' if len(physical_devices) > 0 else 'Not available')
tf.config.experimental.set_memory_growth(physical_devices[0], True)

In [None]:
import numpy as np
import tensorflow as tf
import random
np.random.seed(65)
random.seed(65)
tf.random.set_seed(65)

# Models

In [None]:
def gegenbauer_polynomial(x, degree, alpha):
    x = tf.cast(x, tf.float32)
    alpha = tf.cast(alpha, tf.float32)

    C0 = tf.ones_like(x)
    if degree == 0:
        return C0

    C1 = 2.0 * alpha * x
    if degree == 1:
        return C1

    C_prev = C0
    C_curr = C1
    for n in range(2, degree + 1):
        coef1 = 2.0 * (n + alpha - 1.0) / n
        coef2 = (n + 2.0 * alpha - 2.0) / n
        C_next = coef1 * x * C_curr - coef2 * C_prev
        C_prev, C_curr = C_curr, C_next

    return C_curr

In [None]:
class GegenbauerNeuralBlock(tf.keras.layers.Layer):
    def __init__(self, degree=5, **kwargs):
        super(GegenbauerNeuralBlock, self).__init__(**kwargs)
        self.degree = degree
        self.gegenbauer_fn = tf.function(gegenbauer_polynomial)

    def build(self, input_shape):
        self.alpha = self.add_weight(
            name="alpha",
            initializer="ones",
            shape=(1,),
            trainable=True
        )
        super(GegenbauerNeuralBlock, self).build(input_shape)

    def call(self, inputs):
        x = tf.keras.activations.sigmoid(inputs)
        alpha = tf.keras.activations.softplus(self.alpha)  # ensures alpha > 0
        return self.gegenbauer_fn(x, self.degree, alpha)

    def get_config(self):
        config = super(GegenbauerNeuralBlock, self).get_config()
        config.update({
            "degree": self.degree
        })
        return config


In [None]:
import tensorflow as tf
from tensorflow.keras.layers import (Input, Conv2D, Conv2DTranspose, MaxPooling2D, 
                                     Concatenate, Activation, GlobalMaxPooling2D, Reshape, Multiply, Add)
from tensorflow.keras.models import Model
from tensorflow.keras.utils import register_keras_serializable
import tensorflow.keras.layers as KL
import tensorflow_addons as tfa

In [None]:

# --- Snake Activation ---
class SnakeActivation(tf.keras.layers.Layer):
    def __init__(self, alpha=1.0):
        super().__init__()
        self.alpha = alpha

    def call(self, x):
        return x + (1.0 / self.alpha) * tf.math.sin(self.alpha * x) ** 2

# --- DSConvTF: deformable conv layer ---
class ConvGNRelu(tf.keras.layers.Layer):
    def __init__(self, out_ch, kernel_size, groups=16):
        super().__init__()
        self.out_ch = out_ch
        self.kernel_size = kernel_size
        self.groups = groups

        # Layers created once
        self.conv = Conv2D(out_ch, kernel_size, padding='same', use_bias=False)
        self.gn = tfa.layers.GroupNormalization(groups=groups, axis=-1)
        self.relu = tf.keras.layers.ReLU()

    def call(self, x):
        # Placeholder deformable conv: simply apply conv for now
        deformed = x  # You can replace with actual deformable offsets
        x = self.conv(deformed)
        x = self.gn(x)
        x = self.relu(x)
        return x


# --- Dynamic Snake Conv2D ---
class DynamicSnakeConv2D(tf.keras.layers.Layer):
    def __init__(self, filters, kernel_size, groups=16):
        super().__init__()
        self.filters = filters
        self.kernel_size = kernel_size
        self.groups = groups
        self.dsconv = ConvGNRelu(filters, kernel_size, groups=groups)
        self.snake = SnakeActivation()

    def call(self, x):
        x = self.dsconv(x)
        x = self.snake(x)
        return x

    def get_config(self):
        config = super().get_config()
        config.update({
            "filters": self.filters,
            "kernel_size": self.kernel_size,
            "groups": self.groups,
        })
        return config

# ------------------- SE-Net with DenseKAN and FJNB -------------------
def se_net(x, r=8):
    input_channel = x.shape[-1]
    squeeze = GlobalMaxPooling2D()(x)
    squeeze = Reshape((1, 1, input_channel))(squeeze)

    excitation = KL.Dense(input_channel // r)(squeeze)
    
    excitation = Activation('relu')(excitation)
    excitation = GegenbauerNeuralBlock(5)(excitation)
    

    excitation = KL.Dense(input_channel)(excitation)
    
    excitation = Activation('sigmoid')(excitation)
    excitation = GegenbauerNeuralBlock(5)(excitation)

    scaled = Multiply()([x, excitation])
    return scaled

# ------------------- Feature Selective Fusion Block -------------------
def fsf_block(low_x, high_x):
    merged = Concatenate()([high_x, low_x])
    attention = se_net(merged)
    fused = Conv2D(int(attention.shape[-1] / 2), kernel_size=(1, 1), strides=1, padding='same')(attention)

    gate = GlobalMaxPooling2D()(fused)
    gate = Reshape((1, 1, gate.shape[-1]))(gate)
    gate = Activation('sigmoid')(gate)

    gated_low = Multiply()([low_x, gate])
    output = Add()([gated_low, high_x])
    return output

def encoder_block(x, filters):
    x = Conv2D(filters, (3, 3), activation='relu', padding='same')(x)
    x = DynamicSnakeConv2D(filters, 3)(x)
    #x = DynamicSnakeConv2D(filters, 3)(x)

    p = MaxPooling2D((2, 2))(x)
    return x, p


def decoder_block(x, skip, filters):
    x = Conv2DTranspose(filters, (2, 2), strides=(2, 2), padding='same')(x)
    x = fsf_block(skip, x)
    #x = DynamicSnakeConv2D(filters, 3)(x) 
    x = Conv2D(filters, (3, 3), activation='relu', padding='same')(x)
    x = DynamicSnakeConv2D(filters, 3)(x)
    return x

# ------------------- Full U-Net Model -------------------
def build_snake_gegenbauer_groupconv_unet(input_shape=(512, 512, 3), num_classes=1, filters=32):
    inputs = Input(shape=input_shape)

    # Encoder
    s1, p1 = encoder_block(inputs, filters)
    s2, p2 = encoder_block(p1, filters * 2)
    s3, p3 = encoder_block(p2, filters * 4)
    s4, p4 = encoder_block(p3, filters * 8)

    # Bottleneck
    b1 = Conv2D(filters * 16, (3, 3), activation='relu', padding='same')(p4)
    b1 = Conv2D(filters * 16, (3, 3), activation='relu', padding='same')(b1)

    # Decoder
    d1 = decoder_block(b1, s4, filters * 8)
    d2 = decoder_block(d1, s3, filters * 4)
    d3 = decoder_block(d2, s2, filters * 2)
    d4 = decoder_block(d3, s1, filters)

    # Output
    activation = 'sigmoid' if num_classes == 1 else 'softmax'
    outputs = Conv2D(num_classes, (1, 1), padding='same', activation=activation)(d4)

    return Model(inputs, outputs)

# ------------------- Example -------------------
model = build_snake_gegenbauer_groupconv_unet(input_shape=(512, 512, 3), num_classes=1)
model.summary()

# without augmentation

In [None]:
import os
import numpy as np
import cv2
from matplotlib import pyplot as plt
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import tensorflow as tf

# Define image dimensions
desired_width = 512  # Replace with your desired width
desired_height = 512  # Replace with your desired height
batch_size = 32
seed = 24

# Create an ImageDataGenerator for images and masks without augmentation
img_data_gen_args = dict(rescale=1/255.)

mask_data_gen_args = dict(rescale=1/255., )  # Binarize the output

# Generators for training, validation, and test data
image_data_generator = ImageDataGenerator(**img_data_gen_args)
mask_data_generator = ImageDataGenerator(**mask_data_gen_args)

In [None]:
def create_generator(image_dir, mask_dir, batch_size, target_size=(desired_width, desired_height)):
    image_generator = image_data_generator.flow_from_directory(
        image_dir,
        target_size=target_size,
        color_mode='rgb',
        class_mode=None,
        batch_size=batch_size,
        seed=seed)

    mask_generator = mask_data_generator.flow_from_directory(
        mask_dir,
        target_size=target_size,
        color_mode='grayscale',
        class_mode=None,
        batch_size=batch_size,
        seed=seed)

    return zip(image_generator, mask_generator)

train_generator = create_generator('/home/jayakumar/road-extraction-main/data3/train_images/', '/home/jayakumar/road-extraction-main/data3/train_masks/', batch_size)
val_generator = create_generator('/home/jayakumar/road-extraction-main/data3/val_images/', '/home/jayakumar/road-extraction-main/data3/val_masks/', batch_size)
test_generator = create_generator('/home/jayakumar/road-extraction-main/data3/test_images/', '/home/jayakumar/road-extraction-main/data3/test_masks/', batch_size)

# Display Images to confirm

In [None]:
# import matplotlib.pyplot as plt
# def display_images(generator, num_images=3):
#     for i in range(num_images):
#         image, mask = next(generator)
        
#         # Display image
#         plt.figure(figsize=(12, 6))
#         plt.subplot(1, 2, 1)
#         plt.imshow(image[0])
#         plt.title("Image")
        
#         # Display mask
#         plt.subplot(1, 2, 2)
#         plt.imshow(mask[0].squeeze(), cmap='gray')
#         plt.title("Mask")
        
#         plt.show()

In [None]:
# display_images(train_generator, num_images=3)

In [None]:
# image, mask = next(train_generator)
# print("Image datatype:", image[0].dtype)
# print("Mask datatype:", mask[0].dtype)

In [None]:
# image, mask = next(train_generator)
# print(type(image[0][0][0]))
# print(mask[0][0][0])     #In other code, print(train_masks[0][0][0]) is giving 0.0 and type is <class 'numpy.float64'>, Here it is [0.] and type is <class 'numpy.ndarray'>

In [None]:
# display_images(val_generator, num_images=3)

In [None]:
# image, mask = next(val_generator)
# print("Image datatype:", image[0].dtype)
# print("Mask datatype:", mask[0].dtype)

In [None]:
# print(type(image[0][0][0]))
# print(mask[0][0][0]) 

In [None]:
# display_images(test_generator, num_images=3)

In [None]:
# image, mask = next(test_generator)
# print("Image datatype:", image[0].dtype)
# print("Mask datatype:", mask[0].dtype)

In [None]:
# print(type(image[0][0][0]))
# print(type(mask[0][0][0])) 

# Model Related

In [None]:
import tensorflow as tf
checkpoint = tf.keras.callbacks.ModelCheckpoint('patchmodels/fkanunetadam32d24f6f6rsmitpatchba.h5', verbose=1, save_best_only=True)
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

In [None]:
from keras.callbacks import LearningRateScheduler
def lr_scheduler(epoch,lr):
    decay_rate=1e-6
    return lr-decay_rate
lr_callback=LearningRateScheduler(lr_scheduler)

In [None]:
physical_devices=tf.compat.v1.config.experimental.list_physical_devices('GPU')
print('GPU is available' if len(physical_devices) > 0 else 'Not available')

In [None]:
import os
import segmentation_models as sm
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Precision, Recall
model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=0.01, momentum=0.9), loss=sm.losses.dice_loss, metrics=[sm.metrics.iou_score, sm.metrics.f1_score,sm.metrics.precision,sm.metrics.recall]) 

In [None]:
num_train_imgs = len(os.listdir('/home/jayakumar/road-extraction-main/data3/train_images/train/'))
steps_per_epoch = num_train_imgs // batch_size
num_val_imgs = len(os.listdir('/home/jayakumar/road-extraction-main/data3/val_images/val/'))
validation_steps = num_val_imgs // batch_size
epochs = 100
with tf.device("/GPU:0"):
    #history = model.fit(train_images, train_masks, batch_size=4, epochs=epochs, validation_data=(val_images, val_masks), callbacks=[checkpoint,early_stopping,lr_callback])
    history = model.fit(train_generator,validation_data=val_generator, steps_per_epoch=steps_per_epoch,validation_steps=validation_steps,epochs=epochs,callbacks=[checkpoint,early_stopping,lr_callback])

In [None]:
import os
import pickle
import matplotlib.pyplot as plt

# Define folder and filename
folder_path = 'patchhistory'
filename = 'fkanunetadam32d24f6f6rsmitpatchba.pkl'  #Change filename everytime

# Ensure the folder exists
os.makedirs(folder_path, exist_ok=True)

# Construct the full file path
file_path = os.path.join(folder_path, filename)

# Save the training history
with open(file_path, 'wb') as f:
    pickle.dump(history.history, f)

# Later, load and plot the history
with open(file_path, 'rb') as f:
    loaded_history = pickle.load(f)

# Plotting
plt.plot(loaded_history['loss'], label='Training Loss')
plt.plot(loaded_history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()


# Test

In [None]:
from tensorflow.keras.optimizers.legacy import Adam
from tensorflow.keras.models import load_model
import segmentation_models as sm
from tensorflow.keras.metrics import Precision, Recall
# load the saved model due to prior interuption
model = load_model('patchmodels/fkanunetadam32d24f6f6rsmitpatchba.h5',custom_objects={'GegenbauerNeuralBlock': GegenbauerNeuralBlock},compile=False)
model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=0.01, momentum=0.9), loss=sm.losses.dice_loss, metrics=[sm.metrics.iou_score, sm.metrics.f1_score,sm.metrics.precision,sm.metrics.recall]) 

In [None]:
# display_images(test_generator, num_images=1)

In [None]:
import os

# Calculate the number of test images
num_test_images = len(os.listdir('/home/jayakumar/road-extraction-main/data3/test_images/test/'))  # Update 'test' with the actual folder name inside 'test_images'

# Calculate the number of steps
steps = num_test_images // batch_size

# Evaluate the model using the test generator
eval = model.evaluate(test_generator, steps=steps+1)

# Print the IoU score (or other metrics based on your model setup)
print('Test IoU score: {:.2f}'.format(eval[1]))


In [None]:
import random
import numpy as np
import matplotlib.pyplot as plt
import cv2
import os

# Calculate the number of test images
num_test_images = len(os.listdir('/scratch/jayakumar/SC23D004/jkpatch/data3/test_images/test'))  # Update 'test' with the actual folder name inside 'test_images'

# Calculate the number of steps
steps = num_test_images // batch_size

# Get a batch of test images and masks using the test generator
test_images, test_masks = next(test_generator)
print(len(test_images))

# Select 10 random images for visualization
random_indices = random.sample(range(0, len(test_images)), 8)
test_sample = test_images[random_indices]
ground_truth_sample = test_masks[random_indices]

# Predict masks for the randomly selected images
predictions = model.predict(test_sample)
predictions = (predictions > 0.5).astype(np.uint8)

# Set up a figure with 10 rows and 3 columns for the plots
fig, axes = plt.subplots(8, 3, figsize=(8, 3*8))

# Iterate over the random samples and display them
for i in range(len(test_sample)):

    image = (test_sample[i] * 255).astype(np.uint8)  # Rescale image to 0-255
    mask = predictions[i]  # Predicted binary mask
    ground_truth = ground_truth_sample[i]  # Ground truth binary mask

    # Prepare overlay for predicted mask
    overlay = image.copy()
    mask = np.repeat(mask, 3, axis=2)  # Convert binary mask to 3 channels
    inverted_mask = 1 - mask
    yellow_mask = np.array([255, 255, 0]) * mask  # Use yellow color for mask

    # Apply the mask to the image
    result = image * inverted_mask + yellow_mask
    alpha = 0.2
    predicted_overlay = cv2.addWeighted(overlay, alpha, result.astype(overlay.dtype), 1 - alpha, 0)

    # Plot the image, ground truth, and predicted mask
    axes[i, 0].imshow(image)
    axes[i, 0].set_title('Original')
    axes[i, 0].axis('off')

    axes[i, 1].imshow(ground_truth[:, :, 0], cmap='gray')
    axes[i, 1].set_title('Ground Truth')
    axes[i, 1].axis('off')

    axes[i, 2].imshow(predicted_overlay)
    axes[i, 2].set_title('Predicted')
    axes[i, 2].axis('off')

# Adjust the spacing between subplots
plt.tight_layout()
#plt.savefig('result.png', bbox_inches='tight')  # Save the figure as a PNG image

# Show the plot
plt.show()
