In [1]:
import cv2
import matplotlib.pyplot as plt
import os
import scipy.io
from shutil import copy2
import numpy as np
import tensorflow as tf
from keras import Sequential
from keras.models import Model
from keras.layers import Dense, Input
from keras.layers import Conv2D, MaxPooling2D, GlobalAveragePooling2D
from keras.layers import Dropout, Flatten
from tensorflow.keras.optimizers import Adam
import keras.backend as K
from keras.layers import *   

In [2]:
main_path=r'../input/malldataset/mall_dataset/frames'
images = os.listdir(main_path)
images=[x for x in images if 'seq' in x]
   
length=len(images)
length

In [51]:
wikiMat = scipy.io.loadmat(r'../input/malldataset/mall_dataset/mall_gt.mat')
y=wikiMat['count'].flatten()
y=y.astype(np.float16)

In [4]:
X=[]
scale_percent=60

for i in range(length):
  img = cv2.imread(os.path.join(main_path,images[i]))
  img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
  width = int(img.shape[1] * scale_percent / 100)
  height = int(img.shape[0] * scale_percent / 100)
  dim = (width, height)
  img = cv2.resize(img,dim, interpolation = cv2.INTER_AREA)
  X.append(img)

del images,wikiMat,length

In [5]:
X=np.array(X,dtype=np.float16)
X=X/255
N=X.shape[0]
idx=np.random.choice(N, 400,replace=False)
X_train=X[idx]
y_train=y[idx]

idx1=[]
idx2=[]

for i in range(N):
  if i not in idx:
    n=np.random.rand()
    if n >0.5:
        idx1.append(i)
    else:
        idx2.append(i)

idx1=np.array(idx1)
idx2=np.array(idx2)

X_cal=X[idx1]
y_cal=y[idx1]
X_test=X[idx2]
y_test=y[idx2]


    

In [6]:
def tilted_loss(q,y,f):
    # q: Quantile to be evaluated, e.g., 0.5 for median.
    # y: True value.
    # f: Fitted (predicted) value.
    e = (y-f)
    return K.mean(K.maximum(q*e, (q-1)*e), axis=-1)


def QuantileRegressionModel(dim,qs=[0.1, 0.5, 0.9]):
    
    ipt_layer = Input((dim[1],dim[2],dim[3]))
    layer1 = Conv2D(filters=64, kernel_size=3, padding='same', activation='relu')(ipt_layer)
    layer1_pool=MaxPooling2D(pool_size=2)(layer1)
    layer2 = Conv2D(filters=128, kernel_size=3, padding='same', activation='relu')(layer1_pool)
    layer2_pool=MaxPooling2D(pool_size=2)(layer2)
    layer2_pool_drop=Dropout(0.2)(layer2_pool)
    flat=Flatten()(layer2_pool_drop)
    output=Dense(128,activation='relu')(flat)
    
    out1 = Dense(1, name='out1')(output)
    out2 = Dense(1, name='out2')(output)
    out3 = Dense(1, name='out3')(output)
    
    q1, q2, q3 = qs
    model = Model(inputs=ipt_layer, outputs=[out1, out2, out3])
    model.compile(loss={'out1': lambda y,f: tilted_loss(q1,y,f),
                        'out2': lambda y,f: tilted_loss(q2,y,f),
                        'out3': lambda y,f: tilted_loss(q3,y,f),}, 
                  loss_weights={'out1': 1, 'out2': 1, 'out3': 1},
                 optimizer=Adam(learning_rate=1e-6))
    
    return model

In [7]:
alpha=0.1
model=QuantileRegressionModel(X.shape,qs=[alpha/2, 0.5, 1-alpha/2])
history=model.fit(X_train,y_train, epochs=100, batch_size=32, verbose=2,validation_data=(X_test, y_test))
del X_train,y_train

In [65]:
# plot mae
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.legend(['Loss', 'Val_loss'])
plt.xlabel('Epochs')
plt.ylabel('Cumulative quantile loss')
plt.title('Cumulative quantile loss in every epoch')
plt.show()

In [16]:
scores=[]
f_cal=model.predict(X_cal)
upper_bound_cal=f_cal[2].flatten()
lower_bound_cal=f_cal[0].flatten()

for i in range(X_cal.shape[0]):
    scores.append(max(lower_bound_cal[i]-y_cal[i],y_cal[i]-upper_bound_cal[i]))
q_yhat=np.quantile(scores,1-alpha)


In [17]:
f=model.predict(X_test)
upper_bound=f[2].flatten()
lower_bound=f[0].flatten()

In [18]:
def summary_statistics(arr):
    # calculates summary statistics from array
    
    return [np.min(arr),np.max(arr),np.mean(arr), np.std(arr),np.quantile(arr,0.25),np.quantile(arr,0.5),np.quantile(arr,0.75),np.quantile(arr,0.75)-np.quantile(arr,0.25)]

In [67]:
def calculate_coverage(lower_bound,upper_bound,ground_truth):
  N=len(ground_truth)
  correct=0

  for i in range(N):
    if lower_bound[i]<ground_truth[i] and ground_truth[i] < upper_bound[i]:
      correct+=1

  return correct/N,summary_statistics(np.abs((upper_bound-lower_bound)))

calculate_coverage(lower_bound-q_yhat,upper_bound+q_yhat,y_test)  

In [13]:
print(lower_bound[25]-q_yhat,upper_bound[25]+q_yhat,y_test[25])

In [78]:
plt.rcParams.update({'font.size': 15})
i=120
plt.imshow(np.uint8(X_test[i]*255))
plt.axis('off')
plt.title('Ground truth->'+str(y[i])+' PI-> ['+str(round(lower_bound[i]-q_yhat,2))+','+str(round(upper_bound[i]+q_yhat,2))+']')
plt.show()

In [49]:
import seaborn as sns
plt.rcParams["figure.figsize"] = (15,8)
plt.rcParams.update({'font.size': 25})
sns.set_style("darkgrid", {'axes.grid' : True})
plt.hist(y)
plt.xlabel('#people')
plt.ylabel('Absolute frequency')