In [115]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import cv2
import os
import sys
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras import layers, models, metrics

In [116]:
class DataPreprocessor():
    """
    Class that preprocesses the training video 
    """
    
    train_video = "./data/train.mp4"
    test_video = "./data/test.mp4"
    
    train_images_location = "./data/train/"
    test_images_location = "./data/test/"
    
    
    def prepareData(self):
        """
        Prepare the images for the training and test video
        """
        
        # Create training images
        print("Creating Training images")
        self.createOpticalFlowImages(self.train_video,
                                     self.train_images_location)
        
        # Create Test images
        print("Creating Test images")
        self.createOpticalFlowImages(self.test_video,
                                    self.test_images_location)
        
        
    def createOpticalFlowImages(self,video_file,save_location):
        """
        Read the video frame by frame and create optical flow
        images for the entire video and store the 
        """
        
        video = cv2.VideoCapture(video_file)
        ret, first = video.read()

        i = 1
        while True:
            ret, second = video.read()
            if not ret:
                break

            opt_flow_img = self.opticalFlow(first,second)
            cv2.imwrite(save_location+str(i)+".jpg", opt_flow_img)

            first = second
            i+=1
            
    
    def opticalFlow(self,first,second):
        """
        Create an optical flow image from the two images
        """
        
        first = first[150:400]
        second = second[150:400]

        opt_flow_img = np.zeros_like(first)
        opt_flow_img[...,1] = 255

        first = cv2.cvtColor(first,cv2.COLOR_BGR2GRAY)
        second = cv2.cvtColor(second,cv2.COLOR_BGR2GRAY)

        pyr_scale = 0.5
        levels = 3
        winsize = 15
        iterations = 3
        poly_n = 7
        poly_sigma = 1.5

        flow = cv2.calcOpticalFlowFarneback(first,second,None,pyr_scale,
                                           levels,winsize,iterations,
                                           poly_n,poly_sigma,0)

        mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
        opt_flow_img[:, :, 0] = ang * (180 / np.pi / 2)
        opt_flow_img[:, :, 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
        # opt_flow_img = cv2.cvtColor(opt_flow_img,cv2.COLOR_HSV2BGR)

        # plt.figure()
        # plt.imshow(opt_flow_img)
        return opt_flow_img
    

In [117]:
class CNN():
    """
    Class that contains the CNN and predicts and 
    evaluates the speed of the car
    """
    
    train_outputs = "./data/train.txt"
    train_images_location = "./data/train/"
    train_images_array = "./data/training_images_array.npy"
    
    test_outputs = "./data/test.txt"
    test_images_location = "./data/test/"
    test_images_array = "./data/test_images_array.npy"
    
    outputs = None
    train_images = None
    test_images = None
    
    image_size = (100,100,2)
    nTrainImages = 20399
    nTestImages = 10797
    
    model = None
    best_weights = None
    
    
    def load_data(self):
        """
        Read all the images and the outputs and 
        prepare them for training
        """
        
        print("Loading images and outputs")

        self.outputs = np.loadtxt(self.train_outputs)[1:]
        
        
        print("Loading training images")
        self.train_images = self.load_images(self.train_images_location,self.nTrainImages,
                                       self.train_images_array)
        
        print("Loading test images")
        self.test_images = self.load_images(self.test_images_location,self.nTestImages,
                                       self.test_images_array)

    
    def load_images(self,images_location,nImages, array_file):
        """
        Loads all the images in the given location 
        """
        
        if os.path.exists(array_file):
            images = np.load(array_file)

        else:
            images = np.empty([nImages,self.image_size[0],self.image_size[1],
                               self.image_size[2]],dtype='float32')

            for i in range(nImages):
                img = cv2.imread(images_location+str(i+1)+".jpg")
                img = cv2.resize(img,(100,100))
                images[i] = img[:,:,[0,2]]

            np.save(array_file, images)
            
        return images
    
    
    def create_model(self):
        """
        Create a CNN
        """
        
        print("Creating model")
        self.model = models.Sequential()
        self.model.add(layers.Conv2D(64, (7, 7), activation='relu', input_shape=self.image_size))
        self.model.add(layers.Conv2D(32, (5, 5), activation='relu'))
        self.model.add(layers.Conv2D(16, (5, 5), activation='relu'))
        self.model.add(layers.Conv2D(16, (3, 3), activation='relu'))
        self.model.add(layers.Flatten())
        self.model.add(layers.Dense(256, activation='relu'))
        self.model.add(layers.Dropout(0.3))
        self.model.add(layers.Dense(128, activation='relu'))
        self.model.add(layers.Dropout(0.3))
        self.model.add(layers.Dense(64, activation='relu'))
        self.model.add(layers.Dropout(0.3))
        self.model.add(layers.Dense(32, activation='relu'))
        self.model.add(layers.Dropout(0.3))
        self.model.add(layers.Dense(1))

        self.model.compile(optimizer='adam', loss='mse', metrics=[metrics.MeanSquaredError()])

    
    
    def train_model(self):
        """
        Train model and save weights
        """
        
        print("Training model")
        filepath ='.\data\weights.{epoch:02d}-{val_loss:.2f}.hdf5'
        log_dir = ".\data\\logs"

        # Create callbacks for training
        tensorboard_callback = keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

        early_stopping_callback = keras.callbacks.EarlyStopping(monitor='loss', min_delta=0.1, 
                                                                patience=5, verbose=1, mode='min', 
                                                                restore_best_weights=True)

        model_checkpoint_callback = keras.callbacks.ModelCheckpoint(filepath, monitor='loss', 
                                                                    verbose=1, save_best_only=True,
                                                                    save_weights_only=True, 
                                                                    mode='min', save_freq='epoch')

        # Shuffle data
        indices = np.arange(len(self.outputs))
        np.random.shuffle(indices)
        x_train = self.train_images[indices]
        y_train = self.outputs[indices]

        self.model.fit(x=x_train, y=y_train, epochs=100, validation_split=0.2, 
                  callbacks=[tensorboard_callback,early_stopping_callback,model_checkpoint_callback])
        
        
    def evaluate_model(self):
        """
        Evaluate the model on the whole training and validation data
        """
        
        print("Evaluating model")
        if self.best_weights!=None:
            self.model.load_weights(self.best_weights)
            
        results = self.model.evaluate(self.train_images, self.outputs, batch_size=32)
        print('Training loss and mse:', results)
        
        predictions = self.model.predict(self.train_images)
        error = self.outputs - predictions[:,0]
        plt.plot(self.outputs,label="Ground Truth")
        plt.plot(predictions,label="Predictions")
        
        plt.figure()
        plt.plot(error)
        
        
    def predict_test_speed(self):
        """
        Predict the test speeds and generate the test.txt file
        """
        
        print("Predicting test speeds")
        test_speeds = self.model.predict(self.test_images)
        test_txt = np.insert(test_speeds,0,test_speeds[0,0],axis=0)
        np.savetxt("./data/test.txt",test_txt)

In [None]:
if __name__=="__main__":
    
    gen_flow_imgs = False
    if gen_flow_imgs:
        print("Pre-processing the data")

        prep = DataPreprocessor()
        prep.prepareData()
    
    cnn = CNN()
    cnn.load_data()
    cnn.create_model()
    # cnn.train_model()
    cnn.best_weights = ".\data\weights.57-3.40.hdf5"
    cnn.evaluate_model()
    # cnn.predict_test_speed()
    
    

Loading images and outputs
Loading training images
Loading test images
Creating model
Evaluating model
Training loss and mse: [1.6238697871746248, 1.6238707]
