<a href="https://colab.research.google.com/github/RoboticsLabURJC/2019-phd-sergio-paniego/blob/main/behavior_studio_networks/BS_lstm_tinypilotnet_1_network.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!ls "/content/drive/My Drive"
!ls "/content/drive/My Drive/complete_dataset.zip"
!unzip "/content/drive/My Drive/curves_only.zip"
!unzip "/content/drive/My Drive/complete_dataset.zip"

In [3]:
from keras.models import Sequential
from keras.layers import Flatten,Dense,Conv2D,BatchNormalization,Dropout,ConvLSTM2D,Reshape,Activation,MaxPooling2D, LSTM, Input
from keras.layers.wrappers import TimeDistributed
from keras.optimizers import Adam


# LSTM tinypilotnet
def lstm_tinypilotnet_model(img_shape, type_image):
    model = Sequential()
    model.add(Conv2D(8, (3, 3), strides=(2, 2), input_shape=img_shape, activation="relu"))
    model.add(Conv2D(16, (3, 3), strides=(2, 2), activation="relu"))
    model.add(Conv2D(32, (3, 3), strides=(2, 2), activation="relu"))
    if type_image == 'cropped':
        model.add(Reshape((1, 6, 19, 32)))
    else:
        model.add(Reshape((1, 14, 19, 32)))

    model.add(ConvLSTM2D(filters=32, kernel_size=(3, 3), padding="same", return_sequences=True, input_shape=img_shape))
    if type_image == 'cropped':
        model.add(Reshape((6, 19, 32)))
    else:
        model.add(Reshape((14, 19, 40)))
    model.add(Conv2D(1, (3, 3), strides=(2, 2), activation="relu"))
    model.add(Flatten())
    model.add(Dense(2))
    adam = Adam(lr=0.0001)
    model.compile(optimizer=adam, loss="mse", metrics=['accuracy', 'mse', 'mae'])
    return model



In [None]:
import glob
import numpy as np
import cv2
from sklearn.model_selection import train_test_split


def load_data(folder):
    name_folder = '/content/' + folder + '/Images/'
    list_images = glob.glob(name_folder + '*')
    print(list_images)
    images = sorted(list_images, key=lambda x: int(x.split('/')[4].split('.png')[0]))
    name_file = '/content/' + folder + '/data.json'
    file = open(name_file, 'r')
    data = file.read()
    file.close()
    return images, data

def get_images(list_images, type_image, array_imgs):
    # Read the images
    for name in list_images:
        img = cv2.imread(name)
        if type_image == 'cropped':
            img = img[240:480, 0:640]
        img = cv2.resize(img, (int(img.shape[1] / 4), int(img.shape[0] / 4)))
        array_imgs.append(img)

    return array_imgs

def parse_json(data, array):
    # Process json
    data_parse = data.split('}')[:-1]
    for d in data_parse:
        v = d.split('"v": ')[1]
        d_parse = d.split(', "v":')[0]
        w = d_parse.split(('"w": '))[1]
        array.append((float(v), float(w)))

    return array

def preprocess_data(array, imgs):
    # Data augmentation
    # Take the image and just flip it and negate the measurement
    flip_imgs = []
    array_flip = []
    for i in range(len(array)):
        flip_imgs.append(cv2.flip(imgs[i], 1))
        array_flip.append((array[i][0], -array[i][1]))
    new_array = array + array_flip
    new_array_imgs = imgs + flip_imgs
    return new_array, new_array_imgs

def add_extreme_data(array, imgs):
    for i in range(0, len(array)):
        if abs(array[i][1]) >= 1:
            if abs(array[i][1]) >= 2:
                num_iter = 10
            else:
                num_iter = 5
            for j in range(0, num_iter):
                array.append(array[i])
                imgs.append(imgs[i])
        if float(array[i][0]) <= 2:
            for j in range(0, 1):
                array.append(array[i])
                imgs.append(imgs[i])
    return array, imgs


# Load data
images, data = load_data('complete_dataset')
images_curve, data_curve = load_data('curves_only')

# CHANGE type_image
type_image = 'cropped'
#type_image='normal'

# Preprocess images
array_imgs = []
array_imgs = get_images(images, type_image, array_imgs)
array_imgs = get_images(images_curve, type_image, array_imgs)
# Preprocess json
array_annotations = []
array_annotations = parse_json(data, array_annotations)
array_annotations = parse_json(data_curve, array_annotations)


if type_image == 'cropped':
    img_shape = (65, 160, 3)
else:
    img_shape = (120, 160, 3)


# Adapt the data
array_annotations, array_imgs = preprocess_data(array_annotations, array_imgs)
# x = x[:]
array_annotations, array_imgs = add_extreme_data(array_annotations, array_imgs)
images_train, images_validation, annotations_train, annotations_validation = train_test_split(array_imgs, array_annotations, test_size=0.20, random_state=42)


# Adapt the data
images_train = np.stack(images_train, axis=0)
annotations_train = np.stack(annotations_train, axis=0)
images_validation = np.stack(images_validation, axis=0)
annotations_validation = np.stack(annotations_validation, axis=0)



In [8]:
print(images_train[0].shape)
print(annotations_train[0])
print(img_shape)
img_shape = (60, 160, 3)
print(img_shape)

model = lstm_tinypilotnet_model(img_shape, 'cropped')
batch_size = 64  # 16
nb_epoch = 25  # 223


if type_image == 'cropped':
    model_file = '/content/drive/My Drive/model_lstm_tinypilotnet_cropped_25.h5'
else:
    model_file = '/content/drive/My Drive/model_lstm_tinypilotnet_25.h5'

# Print layers
print(model.summary())

# Train
model_history = model.fit(images_train, annotations_train, epochs=nb_epoch, batch_size=batch_size, verbose=2, validation_data=(images_train, annotations_train), callbacks=[])

# Save the model
model.save(model_file)


# Evaluate the model
score = model.evaluate(images_validation, annotations_validation, verbose=0)
print('Evaluating')
print('Test loss: ', score[0])
print('Test accuracy: ', score[1])
print('Test mean squared error: ', score[2])
print('Test mean absolute error: ', score[3])




In [None]:
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt

%matplotlib inline

# Loss Curves
plt.figure(figsize=[8, 6])
plt.plot(model_history.history['loss'], 'r', linewidth=3.0)
plt.plot(model_history.history['val_loss'], 'b', linewidth=3.0)
plt.legend(['Training loss', 'Validation Loss'], fontsize=18)
plt.xlabel('Epochs ', fontsize=16)
plt.ylabel('Loss', fontsize=16)
plt.title('Loss Curves', fontsize=16)
plt.show()

# Accuracy Curves
plt.figure(figsize=[8, 6])
plt.plot(model_history.history['accuracy'], 'r', linewidth=3.0)
plt.plot(model_history.history['val_accuracy'], 'b', linewidth=3.0)
plt.legend(['Training Accuracy', 'Validation Accuracy'], fontsize=18)
plt.xlabel('Epochs ', fontsize=16)
plt.ylabel('Accuracy', fontsize=16)
plt.title('Accuracy Curves', fontsize=16)
plt.show()
