In [None]:
%tensorflow_version 1.x

import numpy as np
import tensorflow as tf
import keras
import os
import cv2
import random
from google.colab import drive
from skimage import color
from skimage import io
from sklearn.metrics import classification_report, confusion_matrix
from keras.utils import to_categorical
from keras.models import Sequential
from keras.layers import Dense, Activation, Dropout, Flatten,\
                         Conv2D, MaxPooling2D, AveragePooling2D, GlobalAveragePooling2D,\
                         UpSampling2D
from keras.layers.normalization import BatchNormalization
from keras import regularizers
from keras import optimizers
from keras import callbacks
from keras import applications
from keras.models import Model, Input

print("Tensorflow version %s" %tf.__version__)

device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
    raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

Using TensorFlow backend.


Tensorflow version 1.15.0
Found GPU at: /device:GPU:0


In [None]:
drive.mount('/content/drive', force_remount=True)

datadir = '/content/drive/My Drive/MWI-public-custom'
trainingset = datadir+'/train/'
testset = datadir + '/test/'
BlindTestSet = datadir + '/WeatherBlindTestSet/'

Mounted at /content/drive


In [None]:
#CREATE TRAINING SET


training_set=[]
PIXELS=200
categories=["HAZE", "RAINY", "SNOWY", "SUNNY"]

def create_training_set():
    for category in categories:
    path = os.path.join(trainingset, category) #path to HAZE, RAINY, SNOWY or SUNNY
    class_number = categories.index(category)
    for img in os.listdir(path):
        img_array = cv2.imread(os.path.join(path, img), cv2.IMREAD_COLOR) #path to images within each class
        new_array = cv2.resize(img_array, (PIXELS, PIXELS))
        training_set.append([new_array, class_number])

create_training_set()
print(len(training_set))      

400


In [None]:
#CREATE AUGMENTED TRAINING SET

import cv2
import numpy as np
from matplotlib import pyplot as plt
from skimage import color
augmented_training_set=[]

def create_augmented_training_set():
    for category in categories:
    path = os.path.join(trainingset, category) #path to HAZE, RAINY, SNOWY or SUNNY
    class_number = categories.index(category)
    for img in os.listdir(path):
        #Add original image to dataset
        img_array = cv2.imread(os.path.join(path, img), cv2.IMREAD_COLOR) #path to images within each class
        new_array = cv2.resize(img_array, (PIXELS, PIXELS))
        augmented_training_set.append([new_array/255, class_number]) #import already normalized color images
        #Add Spectral Image to dataset
        spectral_image = cv2.applyColorMap(new_array, cv2.COLORMAP_RAINBOW)
        augmented_training_set.append([spectral_image/255, category])
        #Add blurred version of the image to the dataset
        blurred_image = cv2.GaussianBlur(new_array, (5,5), cv2.BORDER_DEFAULT) 
        augmented_training_set.append([blurred_image/255, category])
        #Add edge images
        edge_image = cv2.Canny(new_array, 100, 200)
        edge_image = color.gray2rgb(edge_image)
        augmented_training_set.append([edge_image/255, category])
        #Compute FFT
        img_transform = new_array
        img_transform = color.rgb2gray(img_transform)
        f = np.fft.fft2(img_transform)
        fshift = np.fft.fftshift(f)
        magnitude_spectrum = 20*np.log(np.abs(fshift))
        #Apply HPF and IFFT
        rows, cols= img_transform.shape
        crow,ccol= round(rows/2) , round(cols/2)
        fshift[crow-30:crow+30, ccol-30:ccol+30] = 0
        f_ishift = np.fft.ifftshift(fshift)
        img_back = np.fft.ifft2(f_ishift)
        img_back = np.abs(img_back)
        #Get back to a "color" image 
        img_back = color.gray2rgb(img_back)
        #augmented_training_set.append([img_back, category])
    

create_augmented_training_set()
print(len(augmented_training_set))

1600


In [None]:
#CREATE TEST SET

test_set=[]
PIXELS=200
categories=["HAZE", "RAINY", "SNOWY", "SUNNY"]

def create_test_set():
    for category in categories:
    path = os.path.join(testset, category) #path to HAZE, RAINY, SNOWY or SUNNY
    class_number = categories.index(category)
    for img in os.listdir(path):
        img_array = cv2.imread(os.path.join(path, img), cv2.IMREAD_COLOR) #path to images within each class
        new_array = cv2.resize(img_array, (PIXELS, PIXELS))
        test_set.append([new_array, category])

create_test_set()
print(len(test_set))      

400


In [None]:
#SHUFFLE THE DATA

random.shuffle(training_set)
random.shuffle(test_set)
random.shuffle(augmented_training_set)

In [None]:
#SEPARATE FEATURES AND LABELS

x_train=[]
y_train=[]

for features, label in training_set:
    x_train.append(features)
    y_train.append(label)

x_test=[]
y_test=[]

for features, label in test_set:
    x_test.append(features)
    y_test.append(label)

x_train_aug=[]
y_train_aug=[]

for features, label in augmented_training_set:
    x_train_aug.append(features)
    y_train_aug.append(label)

#Convert features and labels to NumpyArrays
x_train=np.array(x_train).reshape(-1, PIXELS, PIXELS, 3)
x_test=np.array(x_test).reshape(-1, PIXELS, PIXELS, 3)
x_train_aug=np.array(x_train_aug).reshape(-1, PIXELS, PIXELS, 3)

y_train=np.array(y_train)
y_train_aug=np.array(y_train_aug)

#Normalize training and test set
x_test=x_test/255
x_train=x_train/255

print(y_train[1])

SNOWY


In [None]:
#TRANSFER LEARNING

from keras import applications
from keras.models import Model, Input

#Download the structure of the pre-trained model we want to make use of

def load_backbone_net(input_shape):
    
    # define input tensor
    input0 = Input(shape=input_shape)
    print(input0)

    # load a pretrained model on imagenet without the final dense layer
    feature_extractor = applications.vgg16.VGG16(include_top=False, weights='imagenet', input_tensor=input0)   
    feature_extractor = feature_extractor.output
    feature_extractor = Model(input=input0, output=feature_extractor)
    feature_extractor.compile(loss=keras.losses.categorical_crossentropy, optimizer='adam', metrics=['accuracy'])

    return feature_extractor


def transferNet(feature_extractor, num_classes, output_layer_name, trainable_layers):
    
    # get the original input layer tensor
    input_t = feature_extractor.get_layer(index=0).input

    # set the feture extractor layers as non-trainable
    for idx,layer in enumerate(feature_extractor.layers):
        if layer.name in trainable_layers:
            layer.trainable = True
        else:
            layer.trainable = False

    # get the output tensor from a layer of the feature extractor
    output_extractor = feature_extractor.get_layer(name = output_layer_name).output
    
    #output_extractor = MaxPooling2D(pool_size=(4,4))(output_extractor)

    # flat the output of a Conv layer
    flatten = Flatten()(output_extractor) 
    flatten_norm = BatchNormalization()(flatten)

    # add a Dense layer
    dense = Dropout(0.4)(flatten_norm)
    dense = Dense(4096, activation='relu', kernel_regularizer=keras.regularizers.l2(0.01))(dense)
    dense = BatchNormalization()(dense)
    
    # add a Dense layer
    dense = Dropout(0.4)(dense)
    dense = Dense(2048, activation='relu', kernel_regularizer=keras.regularizers.l2(0.01))(dense)
    dense = BatchNormalization()(dense)

    # add a Dense layer
    dense = Dropout(0.4)(dense)
    dense = Dense(1024, activation='relu', kernel_regularizer=keras.regularizers.l2(0.01))(dense)
    dense = BatchNormalization()(dense)

    # add the final output layer
    dense = BatchNormalization()(dense)
    dense = Dense(num_classes, activation='softmax')(dense)
    

    model = Model(input=input_t, output=dense, name="transferNet")
    
  
    model.compile(loss=keras.losses.categorical_crossentropy, optimizer="adam", metrics=['accuracy'])

    return model

# load the pre-trained model
feature_extractor = load_backbone_net(x_train.shape[1:])
feature_extractor.summary()


# choose the layer from which you can get the features (block5_pool the end, glob_pooling to get the pooled version of the output)
name_output_extractor = "block5_pool"
trainable_layers = ["block5_conv3"]

# build the transfer model
transfer_model = transferNet(feature_extractor, 4, name_output_extractor, trainable_layers)
transfer_model.summary()

In [None]:
#TRAINING SETUP

nepochs = 20       # nr. of learning steps
batch_size = 16    # batch_size

model = transfer_model
y_train_aug1 = keras.utils.to_categorical(y_train_aug, 4)
y_train1 = keras.utils.to_categorical(y_train, 4)
y_test1 = keras.utils.to_categorical(y_test, 4)
stopping = callbacks.EarlyStopping(monitor='val_acc', patience=4) # early stopping condition 

h=model.fit(x_train_aug, y_train_aug1, batch_size=batch_size, epochs=nepochs, validation_split=0.2, validation_data=(x_test, y_test1), callbacks=[stopping]) #y_train_aug1

In [None]:
#PERFORMANCE METRICS PLOT

num_classes=4
preds= model.predict(x_test)
y_pred = np.argmax(preds, axis=1)
cm = confusion_matrix(y_test, y_pred, labels=None, sample_weight=None)
print(cm)
y_pred = keras.utils.to_categorical(y_pred, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
print(classification_report(y_test, y_pred, labels=None, digits=3))

In [None]:
#CONFUSION MATRIX PLOT

import seaborn as sn
import pandas as pd
import matplotlib.pyplot as plt
cmap = 'YlOrRd'
cmap = 'PuBuGn'
cmap = 'inferno'
cmap = 'Oranges'

array =[[73,  0,  21,  6],
 [12, 37, 46,  5],
 [11,  3, 75, 11],
 [8, 1,  7, 84]] 
df_cm = pd.DataFrame(array, index = [i for i in "1234"],
                  columns = [i for i in "1234"])
plt.figure(figsize = (10,7))
sn.heatmap(df_cm, cmap= 'YlOrRd', annot=True)

In [None]:
#LOSS AND ACCURACY PLOTS

#Loss comparision
plt.plot(h.history['loss'],'r')
plt.plot(h.history['val_loss'],'b')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.suptitle('Model loss comparison', fontsize=14)
plt.legend(['train', 'test'], loc='upper left')
plt.show()

#Accuracy comparison
plt.plot(h.history['acc'],'r')
plt.plot(h.history['val_acc'],'b')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.suptitle('Model accuracy comparison', fontsize=14)
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
#SAVE THE MODEL

models_dir = datadir + '/models/'

def savemodel(model, problem):
    filename = os.path.join(models_dir, '%s.h5' %problem)
    model.save(filename)
    print("\nModel saved successfully on file %s\n" %filename)

# Save the model
savemodel(model,'VGG-16_FineTune_20epochs_augmented data')

In [None]:
#PREDICTIONS ON BLIND TEST SET

import csv
predictions=[]

def make_predictions():
    k=0
    for img in os.listdir(BlindTestSet):
        img_array = cv2.imread(os.path.join(BlindTestSet, img), cv2.IMREAD_COLOR) #path to images within BlindTestSet
        new_array = cv2.resize(img_array/255, (PIXELS, PIXELS))
        new_array=np.array(new_array).reshape(-1, PIXELS, PIXELS, 3)
        ynew = model.predict(new_array)
        predictions.insert(k, ynew)
        k+=1

    #Write those predictions on a csv file

    with open('predictions.csv', 'w') as csvFile:
        writer = csv.writer(csvFile)
        writer.writerows(predictions)
    csvFile.close()

make_predictions()
print(len(predictions)) 
print(predictions[1])     