In [None]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 
from tensorflow.keras.layers import *
from tensorflow.keras.models import *
from tensorflow.keras import layers
from sklearn.metrics import f1_score,confusion_matrix
import tensorflow as tf
import numpy as np
IMAGE_ORDERING = 'channels_last'
tf.executing_eagerly()
BATCH_SIZE=32

In [None]:
#load data

data_test=np.load("Covariate_grid2_final.npy")
Y_sus=np.load("Label_grid2_final.npy")[:,:,:,2]
Y_sus[Y_sus>0]=1
labels_test=Y_sus

In [None]:
np.unique(labels_test,return_counts=True)

In [None]:
mindata=data_test.min(axis=(0,1,2))
maxdata=data_test.max(axis=(0,1,2))
data_test=(data_test-mindata)/(maxdata-mindata)

In [None]:
Xdata=np.load("Covariate_grid1_final.npy")
Y_sus=np.load("Label_grid1_final.npy")[:,:,:,:2]
# Y_area=np.load("labels_area_density.npy")
#normalizer.adapt(Xdata)
Ydata=np.expand_dims(np.argmax(Y_sus,axis=-1),axis=-1)
mindata=Xdata.min(axis=(0,1,2))
maxdata=Xdata.max(axis=(0,1,2))
Xdata=(Xdata-mindata)/(maxdata-mindata)

In [None]:

def conv_block(input_tensor, kernel_size, filters, stage, block,
               strides=(2, 2)):
    """conv_block is the block that has a conv layer at shortcut
    # Arguments
        input_tensor: input tensor
        kernel_size: defualt 3, the kernel size of middle conv layer at
                     main path
        filters: list of integers, the filterss of 3 conv layer at main path
        stage: integer, current stage label, used for generating layer names
        block: 'a','b'..., current block label, used for generating layer names
    # Returns
        Output tensor for the block.
    Note that from stage 3, the first conv layer at main path is with
    strides=(2,2) and the shortcut should have strides=(2,2) as well
    """
    filters1, filters2, filters3 = filters

    if IMAGE_ORDERING == 'channels_last':
        bn_axis = 3
    else:
        bn_axis = 1

    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'

    x = Conv2D(filters1, (1, 1), data_format=IMAGE_ORDERING, strides=strides,
               name=conv_name_base + '2a')(input_tensor)
    x = BatchNormalization(axis=bn_axis, name=bn_name_base + '2a')(x)
    x = Activation('relu')(x)

    x = Conv2D(filters2, kernel_size, data_format=IMAGE_ORDERING,
               padding='same', name=conv_name_base + '2b')(x)
    x = BatchNormalization(axis=bn_axis, name=bn_name_base + '2b')(x)
    x = Activation('relu')(x)

    x = Conv2D(filters3, (1, 1), data_format=IMAGE_ORDERING,
               name=conv_name_base + '2c')(x)
    x = BatchNormalization(axis=bn_axis, name=bn_name_base + '2c')(x)

    shortcut = Conv2D(filters3, (1, 1), data_format=IMAGE_ORDERING,
                      strides=strides, name=conv_name_base + '1')(input_tensor)
    shortcut = BatchNormalization(
        axis=bn_axis, name=bn_name_base + '1')(shortcut)

    x = layers.add([x, shortcut])
    x = Activation('relu')(x)
    return x
def one_side_pad(x):
    x = ZeroPadding2D((1, 1), data_format=IMAGE_ORDERING)(x)
    if IMAGE_ORDERING == 'channels_first':
        x = Lambda(lambda x: x[:, :, :-1, :-1])(x)
    elif IMAGE_ORDERING == 'channels_last':
        x = Lambda(lambda x: x[:, :-1, :-1, :])(x)
    return x
def identity_block(input_tensor, kernel_size, filters, stage, block):
    """The identity block is the block that has no conv layer at shortcut.
    # Arguments
        input_tensor: input tensor
        kernel_size: defualt 3, the kernel size of middle conv layer at
                     main path
        filters: list of integers, the filterss of 3 conv layer at main path
        stage: integer, current stage label, used for generating layer names
        block: 'a','b'..., current block label, used for generating layer names
    # Returns
        Output tensor for the block.
    """
    
    filters1, filters2, filters3 = filters

    if IMAGE_ORDERING == 'channels_last':
        bn_axis = 3
    else:
        bn_axis = 1

    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'

    x = Conv2D(filters1, (1, 1), data_format=IMAGE_ORDERING,
               name=conv_name_base + '2a')(input_tensor)
    x = BatchNormalization(axis=bn_axis, name=bn_name_base + '2a')(x)
    x = Activation('relu')(x)

    x = Conv2D(filters2, kernel_size, data_format=IMAGE_ORDERING,
               padding='same', name=conv_name_base + '2b')(x)
    x = BatchNormalization(axis=bn_axis, name=bn_name_base + '2b')(x)
    x = Activation('relu')(x)

    x = Conv2D(filters3, (1, 1), data_format=IMAGE_ORDERING,
               name=conv_name_base + '2c')(x)
    x = BatchNormalization(axis=bn_axis, name=bn_name_base + '2c')(x)

    x = layers.add([x, input_tensor])
    x = Activation('relu')(x)
    return x

def regression_block(input_tensor,o1_tensor):
    
    y = LandslideArea(1024)(input_tensor,o1_tensor)
    y = LandslideArea(512)(y,o1_tensor)
    y = LandslideArea(256)(y,o1_tensor)
    y = LandslideArea(128)(y,o1_tensor)
    y = LandslideArea(64)(y,o1_tensor)
    y = LandslideArea(32)(y,o1_tensor)
    y = LandslideArea(16)(y,o1_tensor)
    y = LandslideArea(1)(y,o1_tensor)
    return y
    
def get_resnet50_unet(n_classes=2,input_height=128,  input_width=128, channels=11):

    assert input_height % 32 == 0
    assert input_width % 32 == 0
    
    if IMAGE_ORDERING == 'channels_first':
        img_input = Input(shape=(channels, input_height, input_width))
    elif IMAGE_ORDERING == 'channels_last':
        img_input = Input(shape=(input_height, input_width, channels))

    if IMAGE_ORDERING == 'channels_last':
        bn_axis = 3
    else:
        bn_axis = 1
    if IMAGE_ORDERING == 'channels_first':
        MERGE_AXIS = 1
    elif IMAGE_ORDERING == 'channels_last':
        MERGE_AXIS = -1
    l1_skip_conn=True
    #x = normalizer()
    x = ZeroPadding2D((3, 3), data_format=IMAGE_ORDERING)(img_input)
    x = Conv2D(64, (4, 4), data_format=IMAGE_ORDERING,
               strides=(2, 2), name='conv1')(x)
    x = BatchNormalization(axis=bn_axis, name='bn_conv1')(x)
    x = Activation('relu')(x)
    x = MaxPooling2D((3, 3), data_format=IMAGE_ORDERING, strides=(2, 2))(x)
    x = conv_block(x, 3, [64, 64, 256], stage=2, block='a', strides=(1, 1))
    x = identity_block(x, 3, [64, 64, 256], stage=2, block='b')
    x = identity_block(x, 3, [64, 64, 256], stage=2, block='c')
    x = conv_block(x, 3, [128, 128, 512], stage=3, block='a')
    x = identity_block(x, 3, [128, 128, 512], stage=3, block='b')
    x = identity_block(x, 3, [128, 128, 512], stage=3, block='c')
    x = identity_block(x, 3, [128, 128, 512], stage=3, block='d')
    x = conv_block(x, 3, [256, 256, 1024], stage=4, block='a')
    x = identity_block(x, 3, [256, 256, 1024], stage=4, block='b')
    x = identity_block(x, 3, [256, 256, 1024], stage=4, block='c')
    x = identity_block(x, 3, [256, 256, 1024], stage=4, block='d')
    x = identity_block(x, 3, [256, 256, 1024], stage=4, block='e')
    x = identity_block(x, 3, [256, 256, 1024], stage=4, block='f')
    br1=tf.identity(x,name='branch_1')
    br1=AveragePooling2D((2, 2), padding='same')(br1)
    x = conv_block(x, 3, [512, 512, 2048], stage=5, block='a')
    # classification part
    x = identity_block(x, 3, [512, 512, 2048], stage=5, block='b')
    x = identity_block(x, 3, [512, 512, 2048], stage=5, block='c')

    x = (ZeroPadding2D((1, 1), data_format=IMAGE_ORDERING))(x)
    x = (Conv2D(1, (3, 3), padding='valid' , activation='relu' , data_format=IMAGE_ORDERING))(x)
    x = (BatchNormalization())(x)
    o1=Activation(activation="sigmoid",name="sus_output")(x)

    
    model = Model(inputs=[img_input], outputs=[o1])
    return model

In [None]:
model=get_resnet50_unet(n_classes=1,input_height=128,  input_width=128, channels=13)

In [None]:
tf.keras.utils.plot_model(model, to_file='Susceptibilitypart.png', show_shapes=True)

In [None]:
model.load_weights("checkpoint_classification_V2.4") #load initial weights because training is done in multiple stages, you need to load previous state
model.summary()

In [None]:
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=1e-3,decay_steps=20000,decay_rate=0.9)
import keras.backend as K

opt = Adam(learning_rate=lr_schedule)
def recall(targets, inputs):
    #flatten label and prediction tensors
    inputs = K.flatten(inputs)
    targets = K.flatten(targets)
    targets = K.cast(targets, dtype='float32')
    #print(targets)
    #True Positives, False Positives & False Negatives
    TP = K.sum((inputs * targets))
    FP = K.sum(((1-targets) * inputs))
    FN = K.sum((targets * (1-inputs)))
            
    return (TP/(TP+FN+1e-7))

def precision(targets, inputs):
    #flatten label and prediction tensors
    inputs = K.flatten(inputs)
    targets = K.flatten(targets)
    targets = K.cast(targets, dtype='float32')
    
    #True Positives, False Positives & False Negatives
    TP = K.sum((inputs * targets))
    FP = K.sum(((1-targets) * inputs))
    FN = K.sum((targets * (1-inputs)))
    return (TP/(TP+FP+1e-7))

def f1_score(targets, inputs):
    p = precision(targets, inputs)
    r = recall(targets, inputs)
    return 2*((p*r)/(p+r+1e-6))

ALPHA = 0.30
BETA = 0.70
GAMMA = 1
def FocalTverskyLoss(targets, inputs, alpha=ALPHA, beta=BETA, gamma=GAMMA, smooth=1e-6):
    
        #flatten label and prediction tensors
        inputs = K.flatten(inputs)
        targets = K.flatten(targets)
        targets = K.cast(targets, dtype='float32')
        #True Positives, False Positives & False Negatives
        TP = K.sum((inputs * targets))
        FP = K.sum(((1-targets) * inputs))
        FN = K.sum((targets * (1-inputs)))       
        Tversky = (TP + smooth) / (TP + alpha*FP + beta*FN + smooth)  
        FocalTversky = K.pow((1 - Tversky), gamma)
        
        return FocalTversky   
def regressionloss(y_true, y_pred):
    y_true_flat=K.flatten(y_true)
    y_pred_flat=K.flatten(y_pred)
    mask=tf.where(y_true_flat==0.,False,True)
    y=tf.boolean_mask(y_true_flat, mask)
    ypred=tf.boolean_mask(y_pred_flat, mask)
    #print(y,ypred)
    pixel_sq_loss=FocalTverskyLoss    
    loss_val=pixel_sq_loss(y,ypred)
    return tf.reduce_mean(loss_val)

model.compile(optimizer=opt, 
              loss=tf.keras.losses.BinaryCrossentropy(), 
              metrics=[ f1_score,tf.keras.metrics.BinaryAccuracy(),tf.keras.metrics.AUC(),tf.keras.metrics.FalseNegatives(),tf.keras.metrics.FalsePositives()])

In [None]:
NUMBER_EPOCHS = 100
filepath="checkpoint_classification_V2.4"
model_checkpoint_callback=tf.keras.callbacks.ModelCheckpoint(
    filepath,
    monitor="f1_score",
    verbose=0,
    save_best_only=True,
    save_weights_only=False,
    mode="max",
    save_freq="epoch",
    options=None
)
import os,datetime
logdir = os.path.join("logs_classification", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
tensorboard_callback = tf.keras.callbacks.TensorBoard(logdir, histogram_freq=1)


def train(model, xdata,ydata):
    hist = model.fit(x=xdata,
                     y=ydata,
                     epochs=NUMBER_EPOCHS,
                     validation_split=0.0,
                     batch_size=4,#auto validate using 30% of random samples at each epoch
                     verbose=1, callbacks=[model_checkpoint_callback,tensorboard_callback] ,sample_weight=zeros
                    )
    return hist

In [None]:
from sklearn.model_selection import train_test_split
# data_train, data_test, labels_train, labels_test = train_test_split(Xdata[10680:], Ydata[10680:], test_size=0.30, random_state=42)
# labels_test=np.rot90(labels_test,axes=(1,2))
# data_test=np.rot90(data_test,axes=(1,2))
# labels_train=np.rot90(labels_train,axes=(1,2))
# data_train=np.rot90(data_train,axes=(1,2))
zeros=np.zeros(labels_test.shape)
zeros[labels_test==[0.]]=1
zeros[labels_test==[1.]]=5

In [None]:
history=train(model,Xdata,np.expand_dims(Ydata,axis=-1))

In [None]:
Ydata_pred_s=model.predict(data_test,batch_size=16)
f1_score(labels_test.flatten(),np.rint(Ydata_pred_s.flatten()))

In [None]:
confusion_matrix(labels_test.flatten(),np.rint(Ydata_pred_s.flatten()))

In [None]:
import sklearn.metrics as metrics
import matplotlib.pyplot as plt
# calculate the fpr and tpr for all thresholds of the classification
from matplotlib.pyplot import figure

figure(figsize=(6, 6), dpi=500)
preds = Ydata_pred_s.flatten()
fpr, tpr, threshold = metrics.roc_curve(labels_test.flatten(),preds)
roc_auc = metrics.auc(fpr, tpr)

# method I: plt

plt.title('ROC Susceptibility')
plt.plot(fpr, tpr, 'b', label = 'AUC = %0.2f' % roc_auc)
plt.legend(loc = 'lower right')
plt.plot([0, 1], [0, 1],'r--')
plt.xlim([0, 1])
plt.ylim([0, 1])
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.savefig('ROC.svg')
plt.show()