# Project

#### by Tianyang Guï¼ŒZijin Wan, Jiashun he

### import some library

In [None]:
import os
import tensorflow as tf
import pathlib
import matplotlib.pyplot as plt
import numpy as np

### confirm the GPU

In [None]:
tf.config.experimental.list_physical_devices('GPU')

### Work on dataset, here we use the tf.dataset to load and proprecess data

In [None]:
#the dictionay for label
dic={'damage':0,'no_damage':1}

In [None]:
#prepare the preprocess function when reading the data
def preprocess(path, label):
    # read image
    image = tf.io.read_file(path) 
    #decode 
    image = tf.image.decode_jpeg(image, channels=3)
    # rsize o the same input size of resnet 
    image = tf.image.resize(image, [224, 224]) 
     # normalize
    image /= 255.0 
    return image, label

In [None]:
#prepare the image augument function
def image_aug(img,label):
    #random contrast
    #img = tf.image.random_contrast(img,lower=0.9,upper=1.8)
    #random saturration
    #img = tf.image.random_saturation(img,lower=0.8,upper=1.2)
    #random flip up and down
    img=tf.image.random_flip_up_down(img)
    #random flip left and right
    img=tf.image.random_flip_left_right(img)
    return img, label

In [None]:
#the function to get dataset 
def get_dataset(path_trans):
    #get the path to dataset
    data_path_train = pathlib.Path(path_trans)
    #get the last two level of the whole path , like damage/111.jpeg or no_damage/11111.jpeg
    all_image_paths_train = list(data_path_train.glob('*/*')) 
    #transform all path to string, preparing for reading 
    all_image_paths_train = [str(path) for path in all_image_paths_train]
    #get the label, damage or no_damage
    label_names_train = sorted(item.name for item in data_path_train.glob('*/') if item.is_dir())
    #transform damage to 0, no_damage to 1
    all_image_labels_train = [dic[pathlib.Path(path).parent.name] for path in all_image_paths_train]
    #get the data path and label
    ds_train = tf.data.Dataset.from_tensor_slices((all_image_paths_train, all_image_labels_train))
    #preprocess fuciton, transform the data path to data and other preprocess
    ds_train=ds_train.map(preprocess)
    return ds_train

In [None]:
#get validation data
ds_validation=get_dataset('p_data/validation_another')

In [None]:
#get training data
ds_train=get_dataset('p_data/train_another')

In [None]:
#do img augument on training data 
ds_train_a=ds_train.map(image_aug)

In [None]:
ds_test=get_dataset('p_data/test_another')

In [None]:
ds_test_1=get_dataset('p_data/test')

### show some data in trianing data

In [None]:
data=ds_train_a.take(4).as_numpy_iterator()

In [None]:
datas=[]
for element in data:
    datas.append(element)

In [None]:
#draw the data
f, ax = plt.subplots(2,2)
ax[0][0].imshow(datas[0][0], cmap='jet')
ax[0][1].imshow(datas[1][0], cmap='jet')
ax[1][0].imshow(datas[2][0], cmap='jet')
ax[1][1].imshow(datas[3][0], cmap='jet')
plt.tight_layout()
plt.show()

### start to build model
#### use transfer learning, transfer ResNet50V2 and add to fully connnected layers after it 

##### code to build the model

In [None]:
#import the resnet50 as base model, with maxpooling and not incldue fully connected layers
base_model=tf.keras.applications.ResNet50V2(pooling=max,include_top=False,input_shape=(224, 224, 3))

In [None]:
#base_model.trainable = False

In [None]:
#show the summary of base mdoel
base_model.summary()

In [None]:
#here to build model after the resnet
inputs = tf.keras.Input(shape=(224, 224, 3))
x = base_model(inputs)
#use the gloval average pooling

x=GlobalAveragePooling2D(x)
#use dropout
x = tf.keras.layers.Dropout(0.5)(x)
#add a layer with 128 nodes
x=tf.keras.layers.Dense(128,activation='relu')(x)
#use dropout
x = tf.keras.layers.Dropout(0.5)(x)
#add a layer with 64 nodes
x=tf.keras.layers.Dense(64,activation='relu')(x)
#the output layer,1 nodes with sigmoid function
outputs=tf.keras.layers.Dense(1,activation='sigmoid')(x)
model = tf.keras.Model(inputs, outputs)

In [None]:

def Conv2d_BN(x, nb_filter, kernel_size, strides=(1, 1), padding='same'):
    x = tf.keras.layers.Conv2D(nb_filter, kernel_size, padding=padding, strides=strides)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x =tf.keras.layers.Activation('relu')(x)
    return x

In [None]:
inputs = tf.keras.Input(shape=(224, 224, 3)) 
x_1= Conv2d_BN(inputs,32,1)

x=Conv2d_BN(inputs,32,3)
x=Conv2d_BN(x,32,3)
x=tf.keras.layers.Concatenate(axis=3)([x_1, x])
x=tf.keras.layers.MaxPooling2D()(x)
x=Conv2d_BN(x,64,3)
x=Conv2d_BN(x,64,3)

x=tf.keras.layers.MaxPooling2D()(x)
x=Conv2d_BN(x,128,3)
x=Conv2d_BN(x,128,3)
x=Conv2d_BN(x,128,3)
x=tf.keras.layers.MaxPooling2D()(x)
x=Conv2d_BN(x,256,3)
x=Conv2d_BN(x,256,3)
x=Conv2d_BN(x,256,3)
x=tf.keras.layers.MaxPooling2D()(x)
GlobalAveragePooling2D = tf.keras.layers.GlobalAveragePooling2D() 
x=GlobalAveragePooling2D(x)
#use dropout
#add a layer with 128 nodes
x=tf.keras.layers.Dense(256)(x)
x=tf.keras.layers.BatchNormalization()(x)
x=tf.keras.layers.Activation('relu')(x)
#use dropout
#add a layer with 64 nodes
x=tf.keras.layers.Dense(256)(x)
x = tf.keras.layers.BatchNormalization()(x)
x =tf.keras.layers.Activation('relu')(x)
#the output layer,1 nodes with sigmoid function
outputs=tf.keras.layers.Dense(1,activation='sigmoid')(x)
model = tf.keras.Model(inputs, outputs)

In [None]:
tf.keras.utils.plot_model(model, "mini_resnet.png", show_shapes=True)

In [None]:
#compile the model
model.compile(optimizer=tf.keras.optimizers.Adam(lr=0.003),
              loss=tf.keras.losses.BinaryCrossentropy(),
              metrics=['accuracy'])

In [None]:
callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss',patience=60,restore_best_weights=True)
history = model.fit(ds_train_a.shuffle(2560).batch(16).prefetch(2),
                    epochs=30,#the epoch should be 600 and here we only trian 30 times to save time and resource
                    validation_data=ds_validation.batch(256).prefetch(2),
                   callbacks=[callback])

#### step1: trian only the last 2 dense layer

In [None]:
##here to freeze or unfreeze the layers in resnet
for layer in base_model.layers[:]:
    layer.trainable=False

In [None]:
#here to freeze or unfreeze the layers in model
for layer in model.layers[-6:]:
    layer.trainable=True

In [None]:
#show the summary of the model
model.summary()

In [None]:
#compile the model
model.compile(optimizer=tf.keras.optimizers.Adam(lr=0.000003),
              loss=tf.keras.losses.BinaryCrossentropy(),
              metrics=['accuracy'])

In [None]:
#prepare the early stop
callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss',patience=60,restore_best_weights=True)

##### becasue we restarted the kernel and run all, it takes too long to trian 600 epochs, so here we only train 30 epochs. And the model we showed in the presentation which is completely trained has been saved. we will load it and show it later

In [None]:
#fit the model
history = model.fit(ds_train_a.shuffle(4).batch(32).prefetch(2),
                    epochs=30,#the epoch should be 600 and here we only trian 30 times to save time and resource
                    validation_data=ds_validation.batch(256).prefetch(2),
                   callbacks=[callback])

In [None]:
#plot the accuracy and loss
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

loss = history.history['loss']
val_loss = history.history['val_loss']

plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(acc, label='Training Accuracy')
plt.plot(val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.ylabel('Accuracy')
plt.ylim([min(plt.ylim()),1])
plt.title('Training and Validation Accuracy')

plt.subplot(2, 1, 2)
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss') 
plt.legend(loc='upper right')
plt.ylabel('Cross Entropy')
plt.ylim([0,1.0])
plt.title('Training and Validation Loss')
plt.xlabel('epoch')
plt.show()

#### step2 :train the last 2 Conv-layers in resnet 

In [None]:
##here to freeze or unfreeze the layers in resnet
#the last 2 conv layer
for layer in base_model.layers[-7:]:
    layer.trainable=True

In [None]:
#here to freeze or unfreeze the layers in model
#freeze the dense layer
for layer in model.layers[-6:]:
    layer.trainable=False

In [None]:
#show the summary to confirm
base_model.summary()

In [None]:
model.summary()

In [None]:
callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss',patience=20,restore_best_weights=True)
history = model.fit(ds_train_a.shuffle(4).batch(32).prefetch(2),
                    epochs=10,#the epoch should be 100 here, we set it 10 to save time
                    validation_data=ds_validation.batch(256).prefetch(2),
                   callbacks=[callback])

#### step3: train the first layer in resnet

In [None]:
##here to freeze or unfreeze the layers in resnet
#the last 2 conv layer
for layer in base_model.layers[-7:]:
    layer.trainable=False
for layer in base_model.layers[:3]:
    layer.trainable=True

In [None]:
#here to freeze or unfreeze the layers in model
#freeze the dense layer
for layer in model.layers[-6:]:
    layer.trainable=False

In [None]:
base_model.summary()

In [None]:
model.summary()

In [None]:
callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss',patience=20,restore_best_weights=True)
history = model.fit(ds_train_a.shuffle(4).batch(32).prefetch(2),
                    epochs=10,#the epoch should be 100 here, we set it 10 to save time
                    validation_data=ds_validation.batch(256).prefetch(2),
                   callbacks=[callback])

In [None]:
#code to save model
#model_s.save('v5')

#### Because it takes too long to restart kernel and run all, so here we load a model that we trianed before , which get the best performance on test.  This model has been trained for 3 steps just like what above. 

In [None]:
#laod the saved model
model_s=tf.keras.models.load_model('v4')

In [None]:
model_s.summary()

In [None]:
model_s.layers[-7].summary()

In [None]:
l_1,a_1=model_s.evaluate(ds_test_1.batch(512))
print(f'The accuracy on unbalanced test is {a_1}')

In [None]:
l,a=model_s.evaluate(ds_test.batch(512))
print(f'The accuracy on balanced test is {a}')

### Do some error analysis after we trained a model

In [None]:
import cv2

#### here to find images wrongly predicted

In [None]:
#index to count total number wrongly predicted
i=0
c=0
#the list to save the wrongly predicted data
wrong_data=[]
for data in ds_test:
    datas=np.expand_dims(data[0],axis=0)
    predicted=np.where(model_s(datas)<0.5,0,1)
    i+=1
    if data[1]!=predicted:
        wrong_data.append(data)
        c+=1

#### draw the grad-CAM to analysis 

In [None]:
    img=wrong_data[129][0].numpy()
    x = np.expand_dims(img, axis=0)
    get_maps = tf.keras.models.Model(inputs = [model_s.layers[-7].inputs], outputs = [model_s.layers[-7].output,model_s.layers[-7].layers[-4].output])

    with tf.GradientTape() as tape:

        model_out, last_conv_layer = get_maps(x)
    
        class_out = tf.reduce_max(model_out)

    grads = tape.gradient(class_out,last_conv_layer)
    # Here we combine all the gradients for each feature map 
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
    # to multiply the pooled grads with each feature map and take the average across
    # all the feature maps to make the heat map
    heatmap = tf.reduce_mean(tf.multiply(pooled_grads, last_conv_layer), axis=-1)
    heatmap = heatmap.numpy()
    heatmap[heatmap < 0] = 0 #relu
    heatmap = (heatmap - heatmap.min())/(heatmap.max() - heatmap.min())
    heatmap = heatmap.reshape((7, 7))

    # We plot the (7,7) heatmap
    plt.imshow(heatmap,cmap='jet')
    plt.show()

In [None]:
resized_heatmap = np.uint8(cv2.resize(heatmap,(224,224))*255)
val = np.uint8(256-resized_heatmap)
heatmap_final = cv2.applyColorMap(val, cv2.COLORMAP_JET)
fig, ax = plt.subplots(1,1, figsize=(6,6))
ax.imshow(img, cmap='jet')
ax.imshow(heatmap_final, cmap='jet',alpha=0.3)
ax.axis('off');
fig.suptitle(f'label: {wrong_data[129][1].numpy()}',y=0.92,fontsize=14);
plt.show();