In [None]:
import pandas as pd
import numpy as np
np.random.seed(123)
import glob
import os
import re
import cv2
import gc
from sklearn.decomposition import PCA
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import concatenate, Dense, Flatten, GlobalAveragePooling2D, Dropout, Conv2D, MaxPool2D, \
    BatchNormalization, Reshape, Input, Lambda
from tensorflow.keras.backend import tile
from sklearn.preprocessing import StandardScaler
from tensorflow.keras import optimizers
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.keras import backend as K

In [None]:
!unzip /content/drive/MyDrive/data.zip

unzip:  cannot find or open /content/drive/MyDrive/data.zip, /content/drive/MyDrive/data.zip.zip or /content/drive/MyDrive/data.zip.ZIP.


In [None]:
# input height and width of the image
HEIGHT = 64
WIDTH = 64

# weighted MSE to give more weight to first column
def weighted_mse(yTrue,yPred):
    ones = K.ones_like(yTrue[0,:]) 
    idx = K.cumsum(ones)
    return K.mean((1/idx)*K.square(yTrue-yPred))

# rotate image by a given angle
def rotateImage(image, angle):
    (h, w) = image.shape[:2]
    (cX, cY) = (w // 2, h // 2)

    M = cv2.getRotationMatrix2D((cX, cY), angle, 1.0)
    cos = np.abs(M[0, 0])
    sin = np.abs(M[0, 1])

    nW = int((h * sin) + (w * cos))
    nH = int((h * cos) + (w * sin))

    M[0, 2] += (nW / 2) - cX
    M[1, 2] += (nH / 2) - cY

    return cv2.warpAffine(image, M, (nW, nH))


In [None]:
# for each type get images and associated csv files in training/ validation/ test file lists
# ensures equal distribution of values in all the 3 sets

train_files = []
validation_files =[]
test_files = []

folders = glob.glob("data/*")
train_pca = []
print(folders)
for folder in folders:
    folder_files = [x for x in os.listdir(folder) if x.endswith(".jpg")]
    for file in folder_files:
        csv_file = file.replace('.jpg', '.csv')
        file = os.path.join(folder, file)
        try:
            csv_file = os.path.join(folder, csv_file)
            csv_file = pd.read_csv(csv_file)
            if csv_file.shape[0] == 205:
                prob = np.random.random()
                if prob < 0.02:
                    test_files.append(file)
                elif 0.02 < prob < 0.12:
                    validation_files.append(file)
                else:
                    train_files.append(file)
        except:
            continue

print(len(train_files))
print(len(validation_files))
print(len(test_files))


[]
0
0
0


In [None]:
data = np.zeros((len(train_files), 201))
for i,file in enumerate(train_files):
    label_name = file.replace('.jpg', '.csv')
    label_file = pd.read_csv(label_name, skiprows=[0, 1, 2, 3, 4], names=['freq', 'values'])
    data[i, :] = label_file['values'].astype(float)
# using the training files fit pca
pca = PCA(n_components=20)
pca.fit(data)

print(sum(pca.explained_variance_ratio_))

In [None]:
# batch generator to generate batches
def batch_generator(X, batch_size=64):
    while True:
        # Select files (paths/indices) for the batch
        batch_paths = np.random.randint(low=0, high=len(X), size=batch_size)

        images = []
        props = []
        batch_label = []


        # Read in each input, perform preprocessing and get labels
        for input_path_index in batch_paths:
            file= X[input_path_index]
            img = cv2.imread(file,0)

            folder = os.path.split(os.path.split(file)[0])[1]
            if folder == '0d65h' or folder == '0d75h':
                angle = np.random.randint(0, 360)
                img = rotateImage(img, angle)
            img = cv2.resize(img, (HEIGHT, WIDTH))

            img = img/255
            img = img.reshape(img.shape[0], img.shape[1], 1)
            prop = re.findall("\d+", folder)
            prop = [int(x) for x in prop]
            prop[0] = prop[0] / 60
            prop[1] = prop[1] / 75
            prop1 = np.full((HEIGHT, WIDTH, 1), prop[0])
            prop2 = np.full((HEIGHT, WIDTH, 1), prop[1])
            props = np.concatenate((prop1, prop2), axis=2)
            # combine properties to images as additional channels
            img = np.concatenate((img, props), axis=2)
            images.append(img)
            # props.append(prop)
            label_name = file.replace('.jpg', '.csv')

            label_file = pd.read_csv(label_name, skiprows=[0, 1, 2, 3, 4], names=['freq', 'values'])
            label_file['values'] = label_file['values'].astype(float)
            label = label_file['values'].values
            label = pca.transform(label.reshape(1,-1)).reshape(-1,)
            batch_label.append(label)


        batch_x = np.array(images)
        batch_y = np.array(batch_label)

        yield (batch_x, batch_y)


In [None]:
# using the segregated files define train and validation generators
train_gen = batch_generator(train_files, batch_size=32)
valid_gen = batch_generator(validation_files, batch_size=8)

# define model architecture
def cnn():
    model = Sequential()
    # add model layers
    model.add(Conv2D(16, kernel_size=3, activation='relu', input_shape = (HEIGHT, WIDTH, 3), padding='same', name="conv_1"))
    model.add(MaxPool2D(pool_size=(2, 2)))
    model.add(Conv2D(32, kernel_size=3, activation='relu', padding='same', name="conv_2"))
    model.add(MaxPool2D(pool_size=(2, 2)))
    model.add(Conv2D(64, kernel_size=3, activation='relu', padding='same', name="conv_3"))
    model.add(MaxPool2D(pool_size=(2, 2)))
    model.add(BatchNormalization())
    model.add(Flatten())
    model.add(Dense(512, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(20, activation="linear"))
    return model


model = cnn()

# compile model
model.compile(optimizer=optimizers.Adam(learning_rate=0.0001), loss=weighted_mse, metrics=['mae'])
print(model.summary())
callbacks = [
    tf.keras.callbacks.ModelCheckpoint('best_model_v2.h5', save_weights_only=True, save_best_only=True, mode='min'),
    tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.8, patience=100, min_lr=1e-5, epsilon=0.000001, verbose=2,
                          mode='min'),
]

# clear memory
tf.keras.backend.clear_session()
gc.collect()

In [None]:
# train the model
# model.load_weights('/content/drive/MyDrive/YOLOv4_weight/best_model_v2.h5')

hist = model.fit(
    train_gen,
    epochs=500,
    verbose=2,
    steps_per_epoch=len(train_files) // 32,
    validation_data=valid_gen,
    validation_steps=len(validation_files) // 8,
    callbacks = callbacks
    )


In [None]:
# Plot - loss during training
plt.figure()
plt.plot(hist.history['loss'])
plt.plot(hist.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='best')
plt.show()

plt.figure()
plt.plot(hist.history['mae'])
plt.plot(hist.history['val_mae'])
plt.title('model mae')
plt.ylabel('mae')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='best')
plt.show()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# evaluate on test/ unseen data
test_gen = batch_generator(test_files, 1)

model.load_weights('best_model_v2.h5')
print(model.evaluate_generator(test_gen, steps=len(test_files)))

In [None]:
# see prediction plots for first 10 responses in test data
for file in test_files[10:20]:
    img = cv2.imread(file, 0)
    folder = os.path.split(os.path.split(file)[0])[1]
    if folder == '0d65h' or folder == '0d75h':
        angle = np.random.randint(0, 360)
        img = rotateImage(img, angle)
    img = cv2.resize(img, (HEIGHT, WIDTH))
    img = img.reshape(img.shape[0], img.shape[1], 1)
    img = img/255
    prop = re.findall("\d+", folder)
    prop = [int(x) for x in prop]
    prop[0] = prop[0]/60
    prop[1] = prop[1]/ 75
    prop1 = np.full((HEIGHT, WIDTH, 1), prop[0])
    prop2 = np.full((HEIGHT, WIDTH, 1), prop[1])
    props = np.concatenate((prop1, prop2), axis=2)
    img = np.concatenate((img, props), axis=2)
    img = img.reshape(-1, img.shape[0], img.shape[1], img.shape[2])
    label_name = file.replace('.jpg', '.csv')

    label_file = pd.read_csv(label_name, skiprows=[0, 1, 2, 3, 4], names=['freq', 'values'])
    label_file['values'] = label_file['values'].astype(float)
    label = label_file['values'].values
    pred = model.predict(img)
    predictions = pca.inverse_transform(pred[0])
    # pred = scale.inverse_transform(predictions)
    label_file['pred_values'] = predictions
    plt.plot(predictions)
    plt.plot(label)
    plt.legend(['Predictions', 'Actual_Values'])
    plt.show()

In [None]:
mv /content/best_model_v2.h5 /content/drive/MyDrive/images_s

mv: cannot stat '/content/best_model_v2.h5': No such file or directory
