In [None]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
import os
from scipy.io import loadmat
from sklearn.model_selection import train_test_split
import matplotlib.cm as cm
import time
import skimage.measure
from keras.initializers import RandomNormal
from keras.callbacks import ModelCheckpoint
from keras.models import *
from keras.layers import *
from keras.optimizers import SGD, Adam
from keras.models import model_from_json
import tensorflow as tf
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.applications.vgg16 import preprocess_input

from keras.applications.vgg19 import VGG19
from keras.applications.vgg19 import preprocess_input

# Save into array Train images, Ground Truth and Labels

In [None]:
Train_imgs_B = []
Train_density_B = []
Train_labels_B = []

path_images = ".../train_data/images/"
path_heads = ".../train_data/ground_truth/"
path_maps = ".../train_data/maps/"



for img_name, den_name, heads_name in zip(sorted(os.listdir(path = path_images)), sorted(os.listdir(path = path_maps)), sorted(os.listdir(path = path_heads))):

    img = preprocess_input(cv2.cvtColor(cv2.imread(path_images + img_name), cv2.COLOR_BGR2RGB))
    Train_imgs_B.append(img)

    img = np.load(path_maps + den_name)
    Train_density_B.append(img)

    Train_labels_B.append(len(loadmat(path_heads + heads_name)['image_info'][0, 0][0, 0][0]))


Train_imgs_B = np.asarray(Train_imgs_B)
Train_density_B = np.asarray(Train_density_B)

# Save into array Test images, Ground Truth and Labels

In [None]:
Test_imgs_B = []
Test_density_B = []
Test_labels_B = []

path_images = ".../test_data/images/"
path_heads = ".../test_data/ground_truth/"
path_maps = ".../test_data/maps/"


for img_name, den_name, heads_name in zip(sorted(os.listdir(path = path_images)), sorted(os.listdir(path = path_maps)), sorted(os.listdir(path = path_heads))):

    Test_imgs_B.append(preprocess_input(cv2.cvtColor(cv2.imread(path_images + img_name), cv2.COLOR_BGR2RGB)))
    Test_density_B.append(np.load(path_maps + den_name))
    Test_labels_B.append(len(loadmat(path_heads + heads_name)['image_info'][0, 0][0, 0][0]))

# Model Definition

In [None]:
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Conv2D, LeakyReLU, Input
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.applications import VGG16
import numpy as np
import os


weights_path = ".../vgg19_weights_tf_dim_ordering_tf_kernels_notop.h5"
#adam = Adam(learning_rate=1e-7)

def build_CSRnet(input_shape):
    VGG_19 = VGG19(weights=weights_path, include_top=False, input_shape=input_shape)


    VGG_19 = Model(inputs=VGG_19.input, outputs=VGG_19.get_layer('block4_conv4').output)

    Map_estimator_net = Sequential([
        Conv2D(512, (3, 3), dilation_rate=2, padding='same', kernel_initializer=RandomNormal(stddev=0.01)),
        LeakyReLU(negative_slope=0.01),
        Conv2D(512, (3, 3), dilation_rate=2, padding='same', kernel_initializer=RandomNormal(stddev=0.01)),
        LeakyReLU(negative_slope=0.01),
        Conv2D(512, (3, 3), dilation_rate=2, padding='same', kernel_initializer=RandomNormal(stddev=0.01)),
        LeakyReLU(negative_slope=0.01),
        Conv2D(256, (3, 3), dilation_rate=2, padding='same', kernel_initializer=RandomNormal(stddev=0.01)),
        LeakyReLU(negative_slope=0.01),
        Conv2D(128, (3, 3), dilation_rate=2, padding='same', kernel_initializer=RandomNormal(stddev=0.01)),
        LeakyReLU(negative_slope=0.01),
        Conv2D(64, (3, 3), dilation_rate=2, padding='same', kernel_initializer=RandomNormal(stddev=0.01)),
        LeakyReLU(negative_slope=0.01),
        Conv2D(1, (1, 1), dilation_rate=1, padding='same', kernel_initializer=RandomNormal(stddev=0.01)),
        LeakyReLU(negative_slope=0.01)
    ])(VGG_19.output)

    CSRnet = Model(inputs=VGG_19.input, outputs=Map_estimator_net)
    CSRnet.compile(optimizer=SGD(learning_rate=1e-2, decay=5e-4, momentum=0.9), loss="mse", metrics=["mae"])


    return CSRnet


# Train and Test functions definition

In [None]:

from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.models import Sequential

def train_CSRnet(CSRnet, Train_imgs_B, Train_density_B, checkpoint_filepath):
    if not os.path.exists('./checkpoints'):
        os.makedirs('./checkpoints')

    model_checkpoint_callback = ModelCheckpoint(
        filepath=checkpoint_filepath,
        save_weights_only=True,
        monitor='val_mae',
        mode='min',
        save_best_only=True,
        verbose=1)

    early_stopping_callback = EarlyStopping(
        monitor='val_mae',
        patience=15,
        mode='min',
        verbose=1,
        restore_best_weights=True)

    history = CSRnet.fit(Train_imgs_B, Train_density_B,
                         batch_size=1, epochs=100,
                         validation_split=0.1, steps_per_epoch=100,
                         callbacks=[model_checkpoint_callback])

    return history


def test_CSRnet(CSRnet, Test_imgs, Test_labels, file_parameters_path):
    CSRnet.load_weights(file_parameters_path)

    mae_error = 0
    mse_error = 0
    mape_error = 0


    N = len(Test_labels)
    for i, (x, y) in enumerate(zip(Test_imgs, Test_labels)):

        print(i)

        mae_error += np.abs(np.sum(CSRnet.predict(np.reshape(np.asarray(x),(1, x.shape[0], x.shape[1], x.shape[2])))) - y)
        mse_error += np.square(np.sum(CSRnet.predict(np.reshape(np.asarray(x),(1, x.shape[0], x.shape[1], x.shape[2])))) - y)
        mape_error += np.abs((np.sum(CSRnet.predict(np.reshape(np.asarray(x),(1, x.shape[0], x.shape[1], x.shape[2])))) - y) / y)* 100

    return mae_error/N, mse_error/N, mape_error/N

checkpoint_filepath = './checkpoints/checkpoint.weights.h5'

input_shape = (None, None, 3)

CSRnet = build_CSRnet(input_shape)
CSRnet.summary()


history = train_CSRnet(CSRnet, Train_imgs_B, Train_density_B, checkpoint_filepath)

if os.path.isfile(checkpoint_filepath):
    print(f"The best weights have been saved in: {checkpoint_filepath}")
else:
    print(f"The weights were not saved correctly. Check the path: {checkpoint_filepath}")




## Test values

In [None]:
mae, mse, mape  = test_CSRnet(CSRnet, Test_imgs_B, Test_labels_B, checkpoint_filepath)
print(f"Mean Absolute Error: {mae}, Mean Square Error: {mse}, Mean Absolute Percentage Error:{mape}")

# Print some images and train curve

In [None]:
to_print = 15
plt.figure(figsize=(9, 3 * to_print))
path_images = ".../test_data/images/"
path_heads = ".../test_data/ground_truth/"

for i, (img, dens, raw, head) in enumerate(zip(Test_imgs_B, Test_density_B, sorted(os.listdir(path = path_images)),sorted(os.listdir(path = path_heads)))):
    if i == to_print: break

    estimated_dens = CSRnet.predict(np.reshape(np.asarray(img),(1,img.shape[0],img.shape[1],img.shape[2])))

    heads = loadmat(path_heads + head)
    true_number=len(heads['image_info'][0, 0][0, 0][0])
    predicted_number=int(np.sum(estimated_dens))

    plt.subplot(to_print, 3, i * 3 + 1)
    plt.axis("off")
    plt.title("Test image:")
    plt.imshow(cv2.cvtColor(cv2.imread(path_images + raw), cv2.COLOR_BGR2RGB))

    plt.subplot(to_print, 3, i * 3 + 2)
    plt.axis("off")
    plt.title("GT density map:")
    plt.title(f"GT density map\nGT count: {true_number:.2f}")

    plt.imshow(dens , cmap = cm.jet)


    plt.subplot(to_print, 3, i * 3 + 3)
    plt.axis("off")
    plt.title("Estimated density map:")
    plt.title(f"Estimated density map\nEstimated count: { predicted_number:.2f}")

    plt.imshow(estimated_dens.reshape((estimated_dens.shape[1], estimated_dens.shape[2], 1)) , cmap = cm.jet)

In [None]:
def plot_learning_curves(history):
    plt.figure(figsize=(8, 5))
    plt.plot(history.epoch,history.history['loss'], label='train loss')
    plt.plot(history.epoch,history.history['val_loss'], label='valid loss')
    plt.legend()
    plt.title('loss')
    plt.grid(True)
    plt.show()
plot_learning_curves(history)
