In [None]:
import json
import os
import cv2
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import random as rnd
import itertools
import tensorflow as tf
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Dense, Conv2D, Dropout, Flatten, MaxPooling2D, Input, Activation, BatchNormalization

from sklearn.model_selection import train_test_split
from tensorflow.keras.optimizers import Adam
import math
import pickle

from numpy.random import seed
from tensorflow import random

from sklearn.model_selection import KFold

In [None]:
DATA_DIR = './data8/'
ids = os.listdir(DATA_DIR)
pvm_angles = []
json_fps = [os.path.join(DATA_DIR, image_id) for image_id in ids if image_id.startswith('catalog') and image_id.endswith('catalog')]
for file in json_fps:
    with open(file, 'r') as f:
        for line in f:            
            if (line.startswith('{')):
                jsonObj = json.loads(line)   
                imgFile = jsonObj['cam/image_array']
                path = os.path.join(DATA_DIR, 'images', imgFile)
                throttle = jsonObj['user/throttle']
                if throttle != 0.0  and os.path.exists(path):
                    image = cv2.imread(path)
                    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB )
                    image = cv2.resize(image, None, fx=0.50, fy= 0.50)
                    jsonObj['image'] = image
                    pvm_angles.append(jsonObj)


In [None]:
import albumentations as A

transform = A.Compose([
    A.RandomBrightnessContrast(brightness_limit=0.4, contrast_limit=0.5, brightness_by_max=False, p=1),    
     A.OneOf(
        [
            #A.IAASharpen(p=1),
            A.Sharpen(p=1),
            A.Blur(blur_limit=3, p=1),
            A.MotionBlur(blur_limit=3, p=1),
        ],
        p=0.9,
    ),
])

# Prepare

In [None]:
#prepare and split data


def extract(pvm_angles):
    df = pd.DataFrame(pvm_angles)

    images = np.array(list(df['image']), dtype='f')    
    images = images / 255.0
    pwms = np.array(df['user/angle'])
    
    images = np.array(images) 
    pwms = np.array(pwms)
    
    return(images, pwms)

In [None]:
def apply_augmentation(images,pwms):
    #implment augmentation 
    augmentedTrainImages = []
    for i in range(len(images)):
        orig_image = images[i]
        augmentedTrainImages.append(orig_image)
        for j in range(9):
            augmentedTrainImages.append(transform(image=orig_image)['image'])
    return (np.array(augmentedTrainImages), np.array(pwms).repeat(10))

# Usage

In [None]:
def calc_ackerman(pwms):
    #calculates on left wheel
    L = 178 #wheelbase
    b = 165 #
    pwm_max = 14.0 #in degree
    #suppose pwms is between -1.0 and 1.0
    theta = pwms*pwm_max
    R = abs(L/(np.tan(np.radians(theta))+0.0000001))
    tg_alpha = L/(R+np.sign(theta)*(b/2))
    alpha = np.sign(theta) * np.degrees(np.arctan(tg_alpha))

    return alpha/pwm_max

def normalize_ackerman_pwms(pwms):
    [minpwm, maxpwm] = calc_ackerman(np.array([-1.0,1.0]))
    return (pwms+abs(minpwm))/(maxpwm - minpwm)

def denormalize_ackerman_pwms(pwms):
    [minpwm, maxpwm] = calc_ackerman(np.array([-1.0,1.0]))
    return (pwms*(maxpwm - minpwm))-abs(minpwm)


In [None]:
def preproc_pwms(pwms):
    pwms = calc_ackerman(pwms)
    pwms = normalize_ackerman_pwms(pwms)
    
    return pwms

In [None]:
seed_value= 0
np.random.seed(seed_value)
tf.compat.v1.set_random_seed(seed_value)


loss_per_fold = []

num_folds = 4

(inputs, targets) = extract(pvm_angles)

input_shape = inputs[0].shape

kfold = KFold(n_splits=num_folds, shuffle=True)

fold_no = 1
for train, test in kfold.split(inputs, targets):
    (augmentedTrainImages, augmentedPwms) = apply_augmentation(inputs[train], targets[train])
    augmentedPwms = preproc_pwms(augmentedPwms)
    testPwms = preproc_pwms(targets[test])
        
    model = tf.keras.applications.VGG16(
    weights='imagenet',
    input_shape=input_shape,
    include_top=False
    )

    for layer in model.layers:
          layer.trainable = False

    x = model.output
    x = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dense(128, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.2)(x)
    x = tf.keras.layers.Dense(128, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.3)(x)
    x = tf.keras.layers.Dense(1, activation="sigmoid")(x)

    model = tf.keras.Model(inputs=model.input, outputs=x)

    model.compile(optimizer='adam', loss='mean_squared_error')

    
     # Generate a print
    print('------------------------------------------------------------------------')
    print(f'Training for fold {fold_no} ...')

    history = model.fit(x=augmentedTrainImages,y=augmentedPwms, epochs=40, batch_size=8)

    # Generate generalization metrics
    score = model.evaluate(inputs[test], testPwms, verbose=0)
    
    del model
    
    print(f'Score for fold {fold_no}: loss of {score}; ')
    loss_per_fold.append(score)

    # Increase fold number
    fold_no = fold_no + 1
    
# == Provide average scores ==
print('------------------------------------------------------------------------')
print('Score per fold')
for i in range(0, len(loss_per_fold)):
    print('------------------------------------------------------------------------')
    print(f'> Fold {i+1} - Loss: {loss_per_fold[i]} ')
print('------------------------------------------------------------------------')
print('Average scores for all folds:')
#print(f'> Accuracy: {np.mean(acc_per_fold)} (+- {np.std(acc_per_fold)})')
print(f'> Loss: {np.mean(loss_per_fold)}')
print('------------------------------------------------------------------------')