In [None]:
import tifffile as tiff
import cv2
import numpy as np
import pandas as pd
from numpy import nan
#from patchify import patchify
#from DataGenerater import batch_generator
from matplotlib import pyplot as plt
import os
os.environ["SM_FRAMEWORK"] = "tf.keras"

from tensorflow import keras
import segmentation_models as sm
from keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report, cohen_kappa_score
from sklearn.impute import KNNImputer

# Attention 3D U-net

In [None]:
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Input, Conv3D, MaxPooling3D, UpSampling3D, concatenate, BatchNormalization, Activation,add,Add,Conv3DTranspose,multiply,Reshape,Conv2D,Dropout


def attention_block(x, g, inter_channel):
    theta_x = Conv3D(inter_channel, [1, 1, 1], strides=[1, 1, 1])(x)

    phi_g = Conv3D(inter_channel, [1, 1, 1], strides=[1, 1, 1])(g)

    f = Activation('relu')(add([theta_x, phi_g]))

    psi_f = Conv3D(1, [1, 1, 1], strides=[1, 1, 1])(f)

    rate = Activation('sigmoid')(psi_f)

    att_x = multiply([x, rate])

    return att_x

def conv_block(tensor, nfilters, size=3, padding='same', initializer="he_normal"):
    x = Conv3D(filters=nfilters, kernel_size=(size, size, size), padding=padding, kernel_initializer=initializer)(tensor)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    x = Conv3D(filters=nfilters*2, kernel_size=(size, size, size), padding=padding, kernel_initializer=initializer)(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    return x

def conv_block2(tensor, nfilters, size=3, padding='same', initializer="he_normal"):
    x = Conv3D(filters=nfilters/2, kernel_size=(size, size, size), padding=padding, kernel_initializer=initializer)(tensor)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    x = Conv3D(filters=nfilters/2, kernel_size=(size, size, size), padding=padding, kernel_initializer=initializer)(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    return x

def deconv_block(tensor, residual, nfilters, size=2, strides=2, padding='same', initializer="he_normal"):
    #y = UpSampling3D(size=(size, size, size))(tensor)
    y = Conv3DTranspose(filters= nfilters, kernel_size= (size, size,1), strides=(strides, strides,1), padding=padding)(tensor)
    #y = Conv3D(filters=nfilters, kernel_size=(size, size, size), strides=(strides, strides, strides), padding=padding, kernel_initializer=initializer)(y)
    y = attention_block(y, residual, nfilters)
    y = Dropout(0.5)(y)
    y = concatenate([y, residual])
    y = conv_block2(y, nfilters)
    return y

def AttentionUnet_3D(input_shape1,input_shape2,input_shape3, nfilters=16, initializer="he_normal"):
    # Base
    inputs = Input(input_shape1)
    reshaped_inputs = Reshape((input_shape1[0], input_shape1[1], 4, 3))(inputs)

    # Downsampling
    conv1 = conv_block(reshaped_inputs, nfilters)
    pool1 = MaxPooling3D(pool_size=(2, 2, 1),strides=(2,2,1))(conv1)

    conv2 = conv_block(pool1, nfilters*2)
    pool2 = MaxPooling3D(pool_size=(2, 2, 1),strides=(2,2,1))(conv2)

    conv3 = conv_block(pool2, nfilters*4) #### 128
    pool3 = MaxPooling3D(pool_size=(2, 2, 1),strides=(2,2,1))(conv3)

    conv4 = conv_block(pool3, nfilters*8) ###  256
    print(conv4.shape)
    print(conv3.shape)

    # Upsampling
    deconv3 = deconv_block(conv4, conv3, nfilters*16)

    deconv2 = deconv_block(deconv3, conv2, nfilters*8)

    deconv1 = deconv_block(deconv2, conv1, nfilters*4)


    ##################################################################################################

    inputs2 = Input(input_shape2)
    reshaped_inputs2 = Reshape((input_shape2[0], input_shape2[1], 10, 2))(inputs2)

    # Downsampling
    conv11 = conv_block(reshaped_inputs2, nfilters)
    pool11 = MaxPooling3D(pool_size=(2, 2, 1),strides=(2,2,1))(conv11)

    conv22 = conv_block(pool11, nfilters*2)
    pool22 = MaxPooling3D(pool_size=(2, 2, 1),strides=(2,2,1))(conv22)

    conv33 = conv_block(pool22, nfilters*4)
    pool33 = MaxPooling3D(pool_size=(2, 2, 1),strides=(2,2,1))(conv33)

    conv44 = conv_block(pool33, nfilters*8)
    print(conv44.shape)
    print(conv33.shape)

    # Upsampling
    deconv33 = deconv_block(conv44, conv33, nfilters*16)

    deconv22 = deconv_block(deconv33, conv22, nfilters*8)

    deconv11 = deconv_block(deconv22, conv11, nfilters*4)

    #deconv33 = MaxPooling3D(pool_size=(1, 1, 2),strides=(1,1,2))(deconv33)
    #deconv22 = MaxPooling3D(pool_size=(1, 1, 2),strides=(1,1,2))(deconv22)
    #deconv11 = MaxPooling3D(pool_size=(1, 1, 2),strides=(1,1,2))(deconv11)

  ###############################################################################
    inputs3 = Input(input_shape3)
    reshaped_inputs3 = Reshape((input_shape2[0], input_shape2[1], 10, 2))(inputs3)

    # Downsampling
    conv111 = conv_block(reshaped_inputs3, nfilters)
    pool111 = MaxPooling3D(pool_size=(2, 2, 1),strides=(2,2,1))(conv111)

    conv222 = conv_block(pool111, nfilters*2)
    pool222 = MaxPooling3D(pool_size=(2, 2, 1),strides=(2,2,1))(conv222)

    conv333 = conv_block(pool222, nfilters*4)
    pool333 = MaxPooling3D(pool_size=(2, 2, 1),strides=(2,2,1))(conv333)

    conv444 = conv_block(pool333, nfilters*8)
    print(conv444.shape)
    print(conv333.shape)

    # Upsampling
    deconv333 = deconv_block(conv444, conv333, nfilters*16)

    deconv222 = deconv_block(deconv333, conv222, nfilters*8)

    deconv111 = deconv_block(deconv222, conv111, nfilters*4)

    #deconv33 = MaxPooling3D(pool_size=(1, 1, 5),strides=(1,1,5))(deconv33)
    #deconv22 = MaxPooling3D(pool_size=(1, 1, 5),strides=(1,1,5))(deconv22)
    #deconv11 = MaxPooling3D(pool_size=(1, 1, 5),strides=(1,1,5))(deconv11)





    ##########################################################################

    feature_map3 = concatenate([deconv3, deconv33,deconv333],axis=3)
    print(feature_map3.shape)
    up3 = UpSampling3D(size=(2, 2, 1))(feature_map3)
    print(up3.shape)
    up3 = Conv3D(64, (1, 1,1))(up3)
    print(up3.shape)

    feature_map2 = concatenate([deconv2, deconv22,deconv222],axis=3)
    print(feature_map2.shape)

    add_feature_map = Add()([up3,feature_map2])


    feature_map1 = concatenate([deconv1, deconv11,deconv111],axis=3)
    print(feature_map1.shape)

    up2 = UpSampling3D(size=(2, 2, 1))(add_feature_map)
    up2 = Conv3D(32, (1, 1,1))(up2)

    add_feature_map = Add()([up2,feature_map1])
    print(add_feature_map.shape)


    # Output
    output1 = Conv3D(filters=32, kernel_size=(1, 1, 3), padding='valid',activation='relu')(add_feature_map)
    output1 = MaxPooling3D(pool_size=(1, 1, 2),strides=(1,1,2))(output1)
    output1 = Conv3D(filters=16, kernel_size=(1, 1, 3), padding='valid',activation='relu')(output1)
    output1 = MaxPooling3D(pool_size=(1, 1, 2),strides=(1,1,2))(output1)

    x= Reshape((128, 128, 64))(output1)
    output2 = Conv2D(5, (1,1), activation='softmax')(x)

    model = Model([inputs,inputs2,inputs3], output2)

    return model

input_shape1 = (128, 128, 12)# Adjust based on your input
input_shape2 = (128, 128, 20)
input_shape3 = (128, 128, 20)

model = AttentionUnet_3D(input_shape1,input_shape2,input_shape3)
model.summary()

# Transformer based on three sources

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Layer

class PositionalEncoding(Layer):
    def __init__(self, d_model, pe_tau, max_seq_len=5000, **kwargs):
        super(PositionalEncoding, self).__init__(**kwargs)
        self.d_model = d_model
        self.pe_tau = pe_tau
        self.max_seq_len = max_seq_len

        # Compute the positional encoding matrix
        pe = np.zeros((max_seq_len, d_model))
        position = np.arange(0, max_seq_len)[:, np.newaxis].astype(np.float32)
        divisor = -np.exp(
            np.arange(0, d_model, 2).astype(np.float32)
            * np.log(pe_tau) / d_model
        )
        pe[:, 0::2] = np.sin(position * divisor)
        pe[:, 1::2] = np.cos(position * divisor)

        # Expand dimensions for batch size compatibility
        self.pe = tf.constant(pe[:, np.newaxis, :], dtype=tf.float32)

    def call(self, x):
        batch_size = tf.shape(x)[0]
        seq_len = tf.shape(x)[1]
        return x + self.pe[:batch_size, :seq_len, :]


In [None]:
from tensorflow.keras.layers import MultiHeadAttention, LayerNormalization, Dropout, Activation,GlobalAveragePooling1D,Flatten,Reshape,Input,Dense
from tensorflow.keras import Model


def TransformerClassifier(input_shape, d_model, nhead, dim_feedforward, dropout, num_layers, num_classes, pe_tau):
    inputs = Input(shape=input_shape)
    x = Dense(d_model)(inputs)
    x = PositionalEncoding(d_model, pe_tau)(x)
    print(x.shape)

    # Transformer blocks
    for _ in range(num_layers):
        # Multi-head attention and normalization
        query = x
        key = x
        value = x
        attn_output = MultiHeadAttention(num_heads=nhead, key_dim=d_model)(query, value, key)
        attn_output = Dropout(dropout)(attn_output)
        out1 = LayerNormalization(epsilon=1e-6)(attn_output + x)

        # Feed-forward neural network and normalization
        ffn_output = Dense(dim_feedforward, activation="relu")(out1)
        ffn_output = Dense(d_model)(ffn_output)
        ffn_output = Dropout(dropout)(ffn_output)
        x = LayerNormalization(epsilon=1e-6)(ffn_output + out1)

    x = GlobalAveragePooling1D(data_format='channels_last')(x)
    #outputs = x

    outputs = Dense(d_model, activation='relu')(x)
    return Model(inputs=inputs, outputs=outputs)

In [None]:
# three sources
from tensorflow.keras import Model

def MultiInputModel(input_shapes, d_model, nhead, dim_feedforward, dropout, num_layers, num_classes, pe_tau):
    inputs1 = Input(shape=input_shapes[0])
    inputs2 = Input(shape=input_shapes[1])
    inputs3 = Input(shape=input_shapes[2])

    model1 = TransformerClassifier(input_shapes[0], d_model, nhead, dim_feedforward, dropout, num_layers, num_classes, pe_tau)
    model2 = TransformerClassifier(input_shapes[1], d_model, nhead, dim_feedforward, dropout, num_layers, num_classes, pe_tau)
    model3 = TransformerClassifier(input_shapes[2], d_model, nhead, dim_feedforward, dropout, num_layers, num_classes, pe_tau)

    out1 = model1(inputs1)
    out2 = model2(inputs2)
    out3 = model3(inputs3)

    concatenated = tf.keras.layers.concatenate([out1, out2,out3], axis=1)


    dense_layer1 = Dense(units=512, activation='relu')(concatenated)
    dense_layer2 = Dropout(0.5)(dense_layer1)
    outputs = Dense(units=num_classes, activation='softmax')(concatenated)

    return Model(inputs=[inputs1, inputs2,inputs3], outputs=outputs)

input_shapes = [(10, 2),(10,2), (4, 3)]
model2 = MultiInputModel(input_shapes, d_model=128, nhead=4, dim_feedforward=256, dropout=0.5, num_layers=2, num_classes=5, pe_tau=10000)
model2.summary()

# Transformer-AtLSTM (Backbone)




In [None]:
from tensorflow.keras import Model

def MultiInputModelWithConcatOutput(input_shapes, d_model, nhead, dim_feedforward, dropout, num_layers, num_classes, pe_tau):
    inputs1 = Input(shape=input_shapes[0])
    inputs2 = Input(shape=input_shapes[1])
    inputs3 = Input(shape=input_shapes[2])

    model1 = TransformerClassifier(input_shapes[0], d_model, nhead, dim_feedforward, dropout, num_layers, num_classes, pe_tau)
    model2 = TransformerClassifier(input_shapes[1], d_model, nhead, dim_feedforward, dropout, num_layers, num_classes, pe_tau)
    model3 = TransformerClassifier(input_shapes[2], d_model, nhead, dim_feedforward, dropout, num_layers, num_classes, pe_tau)

    out1 = model1(inputs1)
    out2 = model2(inputs2)
    out3 = model3(inputs3)

    #concatenated = tf.keras.layers.concatenate([out1, out2 ], axis=1)
    return Model(inputs=[inputs1, inputs2, inputs3], outputs=[out1,out2,out3])

In [None]:
# Saved the weights of the pretrained model
pretrained_weights_path = 'gdrive/My Drive/Chapter 5/transformer_tracks_2017.h5'

backbone_model = MultiInputModelWithConcatOutput(input_shapes, d_model=64, nhead=4, dim_feedforward=256, dropout=0.5, num_layers=2, num_classes=5, pe_tau=10000)
backbone_model.load_weights(pretrained_weights_path, by_name=True)

In [None]:
from tensorflow.keras.layers import Lambda, dot, concatenate, Dense, Dropout, Activation, Input, Bidirectional, LSTM
import tensorflow as tf

def attention_2d_block(hidden_states, name_suffix=''):
    print("Shape of hidden_states:", hidden_states.shape)  # Debug statement

    hidden_size = int(hidden_states.shape[1])
    score_first_part = Dense(hidden_size, use_bias=False, name='attention_score_vec' + name_suffix)(hidden_states)

    print("Shape of score_first_part:", score_first_part.shape)  # Debug statement

    h_t = Lambda(lambda x: x[:, -1, :], output_shape=(hidden_size,), name='last_hidden_state' + name_suffix)(hidden_states)

    print("Shape of h_t:", h_t.shape)  # Debug statement

    score = dot([score_first_part, h_t], [1, 1], name='attention_score' + name_suffix)
    attention_weights = Activation('softmax', name='attention_weight' + name_suffix)(score)
    context_vector = dot([hidden_states, attention_weights], [1, 1], name='context_vector' + name_suffix)
    pre_activation = concatenate([context_vector, h_t], name='attention_output' + name_suffix)
    attention_vector = Dense(256, use_bias=False, activation='tanh', name='attention_vector' + name_suffix)(pre_activation)
    return attention_vector

def model1(init, name_suffix=''):
    x = init
    # Expand dimensions
    x = Lambda(lambda x: tf.expand_dims(x, axis=2), output_shape=lambda s: (s[0], s[1], 1))(x)
    x = Bidirectional(LSTM(32, return_sequences=True))(x)
    x = attention_2d_block(x, name_suffix)  # Pass the name_suffix here
    out = Dense(512, activation="relu")(x)
    return out

In [None]:
# for atlstm use transformer as backbone
inputs1 = Input(shape=input_shapes[0])
inputs2 = Input(shape=input_shapes[1])
inputs3 = Input(shape=input_shapes[2])

out1,out2,out3 = backbone_model([inputs1, inputs2, inputs3])

out11 = model1(out1, '_1')
out22 = model1(out2, '_2')
out33 = model1(out3, '_3')
concat_output2 = tf.keras.layers.concatenate([out11, out22, out33], axis=1)


#concatenated2 = tf.keras.layers.concatenate([concat_output, concat_output2], axis=1)
dense_layer1 = Dense(units=512, activation='relu')(concat_output2)
dense_layer2 = Dropout(0.5)(dense_layer1)
outputs = Dense(units=5, activation='softmax')(dense_layer2)



new_model = Model(inputs=[inputs1, inputs2, inputs3], outputs=outputs)
new_model.summary()

# Transformer-AtLSTM (Ensemble)

In [None]:
#attention LSTM

def attention_3d_block(hidden_states, name_suffix=''):
    # hidden_states.shape = (batch_size, time_steps, hidden_size)
    hidden_size = int(hidden_states.shape[2])
    # Inside dense layer
    #              hidden_states            dot               W            =>           score_first_part
    # (batch_size, time_steps, hidden_size) dot (hidden_size, hidden_size) => (batch_size, time_steps, hidden_size)
    # W is the trainable weight matrix of attention
    # Luong's multiplicative style score
    # Modify the layer names to include the name_suffix
    score_first_part = Dense(hidden_size, use_bias=False, name='attention_score_vec' + name_suffix)(hidden_states)
    h_t = Lambda(lambda x: x[:, -1, :], output_shape=(hidden_size,), name='last_hidden_state' + name_suffix)(
        hidden_states)
    score = dot([score_first_part, h_t], [2, 1], name='attention_score' + name_suffix)
    attention_weights = Activation('softmax', name='attention_weight' + name_suffix)(score)
    context_vector = dot([hidden_states, attention_weights], [1, 1], name='context_vector' + name_suffix)
    pre_activation = concatenate([context_vector, h_t], name='attention_output' + name_suffix)
    attention_vector = Dense(256, use_bias=False, activation='softmax',
                             name='attention_vector' + name_suffix)(
        pre_activation)
    return attention_vector

In [None]:
from keras.layers import Lambda,dot,concatenate

def model1(init, name_suffix=''):
    x = init
    x = Bidirectional(LSTM(32, return_sequences=True))(x)
    x = attention_3d_block(x, name_suffix)  # Pass the name_suffix here
    out = Dense(512, activation="relu")(x)
    return out

In [None]:
# Saved the weights of the pretrained model
pretrained_weights_path = 'gdrive/My Drive/Chapter 5/transformer_tracks_2017.h5'

backbone_model = MultiInputModelWithConcatOutput(input_shapes, d_model=64, nhead=4, dim_feedforward=256, dropout=0.5, num_layers=2, num_classes=5, pe_tau=10000)
backbone_model.load_weights(pretrained_weights_path, by_name=True)

In [None]:
# Transformer and Atlstm ensemble learning
inputs1 = Input(shape=input_shapes[0])
inputs2 = Input(shape=input_shapes[1])
inputs3 = Input(shape=input_shapes[2])

concat_output = backbone_model([inputs1, inputs2, inputs3])

out1 = model1(inputs1, '_1')
out2 = model1(inputs2, '_2')
out3 = model1(inputs3, '_3')
concat_output2 = tf.keras.layers.concatenate([out1, out2, out3], axis=1)


concatenated2 = tf.keras.layers.concatenate([concat_output, concat_output2], axis=1)
dense_layer1 = Dense(units=512, activation='relu')(concatenated2)
dense_layer2 = Dropout(0.5)(dense_layer1)
outputs = Dense(units=5, activation='softmax')(dense_layer2)



new_model2 = Model(inputs=[inputs1, inputs2], outputs=outputs)
new_model2.summary()