<a href="https://colab.research.google.com/github/bfraiche/parkingdirty/blob/master/parking_dirty.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Libraries

In [0]:
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

%matplotlib inline

import os
import random
import gc
import requests
import zipfile
import io
import matplotlib.image as mpimg
import seaborn as sns
from sklearn.model_selection import train_test_split
from keras import layers
from keras import models
from keras import optimizers
from keras.preprocessing.image import ImageDataGenerator
from keras.preprocessing.image import img_to_array, load_img

### ELT

In [0]:
nrows = 150
ncolumns = 150
channels = 3

def read_and_process_image(list_of_images):
    X = []
    y = []
    
    for image in list_of_images:
        X.append(cv2.resize(cv2.imread(image, cv2.IMREAD_COLOR), (nrows,ncolumns), interpolation=cv2.INTER_CUBIC))
        if 'not' in image:
            y.append(1)
        else:
            y.append(0)
    
    return X, y

In [0]:
pkngdrty 	= 'http://parkingdirty.com/BlockedBikeLaneTrainingSingleCam.zip'

rPd = requests.get(pkngdrty)
zPd = zipfile.ZipFile(io.BytesIO(rPd.content))
zPd.extractall()

In [0]:
# separate raw image files into training and test sets
n_blocked = len([name for name in os.listdir('blocked')])
n_unblocked = len([name for name in os.listdir('notblocked')])

# make the training set have equal images for each class
if(n_blocked >= n_unblocked):
  n_train = round(n_unblocked*0.8)
  n_test_ub = n_unblocked - n_train
  n_test_b = n_blocked - n_train
else:
  n_train = round(n_blocked*0.8)
  n_test_ub = n_unblocked - n_train
  n_test_b = n_blocked - n_train

train_blocked = ['blocked/{}'.format(i) for i in os.listdir('blocked')]
train_notblocked = ['notblocked/{}'.format(i) for i in os.listdir('notblocked')]

random.shuffle(train_blocked)
random.shuffle(train_notblocked)

train_imgs = train_blocked[:n_train] + train_notblocked[:n_train]

test_imgs = train_blocked[n_test_b:] + train_notblocked[n_test_ub:]

# format images for model
X, y = read_and_process_image(train_imgs)

In [0]:
X = np.array(X)
y = np.array(y)

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.20, random_state=2)

In [0]:
# # clear memory
# del X, y, train_imgs, train_blocked, train_notblocked, n_train, n_test_ub, n_test_b, n_blocked, n_unblocked, rPd, zPd
# gc.collect()

### Define Model

In [0]:
#get the length of the train and validation data
ntrain = len(X_train)
nval = len(X_val)

#We will use a batch size of 32. Note: batch size should be a factor of 2.***4,8,16,32,64...***
batch_size = 32

model = models.Sequential()
model.add(layers.Conv2D(32, (3, 3), activation='relu',input_shape=(150, 150, 3)))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(128, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(128, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Flatten())
model.add(layers.Dropout(0.5))  #Dropout for regularization
model.add(layers.Dense(512, activation='relu'))
model.add(layers.Dense(1, activation='sigmoid'))

model.compile(loss='binary_crossentropy', optimizer=optimizers.RMSprop(lr=1e-4), metrics=['acc'])

train_datagen = ImageDataGenerator(rescale=1./255,   #Scale the image between 0 and 1
                                    rotation_range=40,
                                    width_shift_range=0.2,
                                    height_shift_range=0.2,
                                    shear_range=0.2,
                                    zoom_range=0.2,
                                    horizontal_flip=True,)

val_datagen = ImageDataGenerator(rescale=1./255)  #We do not augment validation data. we only perform rescale

#Create the image generators
train_generator = train_datagen.flow(X_train, y_train, batch_size=batch_size)
val_generator = val_datagen.flow(X_val, y_val, batch_size=batch_size)

### Train Model

In [0]:
history = model.fit_generator(train_generator,
                              steps_per_epoch=100, 
                              epochs=64,
                              validation_data=val_generator,
                              validation_steps=100)

In [0]:
# plot the train and val curve
acc = history.history['acc']
val_acc = history.history['val_acc']
loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(1, len(acc) + 1)

#Train and validation accuracy
plt.plot(epochs, acc, 'b', label='Training accurarcy')
plt.plot(epochs, val_acc, 'r', label='Validation accurarcy')
plt.title('Training and Validation accurarcy')
plt.legend()

plt.figure()
#Train and validation loss
plt.plot(epochs, loss, 'b', label='Training loss')
plt.plot(epochs, val_loss, 'r', label='Validation loss')
plt.title('Training and Validation loss')
plt.legend()

plt.show()

In [0]:
# #Save the model
# model.save_weights('model_weights.h5')
# model.save('model_keras.h5')

# from google.colab import files
# files.download('model_keras.h5')
# files.download('model_wieghts.h5')

### Load Model

In [0]:
from google.colab import files
files.upload()

In [0]:
batch_size = 32

model = models.Sequential()
model.add(layers.Conv2D(32, (3, 3), activation='relu',input_shape=(150, 150, 3)))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(128, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(128, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Flatten())
model.add(layers.Dropout(0.5))  #Dropout for regularization
model.add(layers.Dense(512, activation='relu'))
model.add(layers.Dense(1, activation='sigmoid'))

model.compile(loss='binary_crossentropy', optimizer=optimizers.RMSprop(lr=1e-4), metrics=['acc'])

In [0]:
# model = load_model('model_keras.h5') # this doesn't work
model.load_weights('model_wieghts.h5')

### Evaluate Model

Failed to evaluate against the test set with a confusion matrix

In [0]:
import sklearn.metrics as metrics

In [0]:
test_imgs[0]

In [0]:
X = cv2.resize(cv2.imread(test_imgs[0], cv2.IMREAD_COLOR), (nrows,ncolumns), interpolation=cv2.INTER_CUBIC)
y = 0
X = np.array(X)
y = np.array(y)

In [0]:
test_datagen = ImageDataGenerator(rescale=1./255)

In [0]:
img = mpimg.imread(test_imgs[0])
imgplot = plt.imshow(img)
plt.show()

In [0]:
zz = test_datagen.flow(X, batch_size=1)
print(zz)
# pred = model.predict(batch)

In [0]:
confusion_matrix = metrics.confusion_matrix(y_true=y_true_labels, y_pred=y_pred_labels)

Predict the first 10 images of the test set

In [0]:
X_test, y_test = read_and_process_image(test_imgs[0:10]) #Y_test in this case will be empty.
x = np.array(X_test)
test_datagen = ImageDataGenerator(rescale=1./255)

In [0]:
i = 0
text_labels = []
columns = 5
plt.figure(figsize=(30,20))
for batch in test_datagen.flow(x, batch_size=1):
    pred = model.predict(batch)
    if pred > 0.5:
        text_labels.append('not blocked')
    else:
        text_labels.append('blocked')
    plt.subplot(5 / columns + 1, columns, i + 1)
    plt.title('The lane is ' + text_labels[i])
    imgplot = plt.imshow(batch[0])
    i += 1
    if i % 10 == 0:
        break
plt.show()

### Scratch

In [0]:
os.listdir('.')

Critical Path:
- Visualize false positives with prediction percentage
- Train and test the model on the full dataset
- Create heatmap or use LIME for interpretation