In [None]:
import os
import pandas as pd
import numpy as np
import cv2
import math

import matplotlib.pyplot as plt

## Leer csv

In [None]:
def img_manivelas(datadir, df):
    image_path = []
    manivela = []
    for i in range(1,len(data)):
        indexed_data = data.iloc[i]
        centro, izq, der = indexed_data[0], indexed_data[1], indexed_data[2]
        
        # centro
        image_path.append(os.path.join(datadir, centro.strip()))
        manivela.append(float(indexed_data[3]))
        
        # izquierda
        image_path.append(os.path.join(datadir, izq.strip()))
        manivela.append(float(indexed_data[3])+0.15)
        
        # derecha
        image_path.append(os.path.join(datadir, der.strip()))
        manivela.append(float(indexed_data[3])-0.15)
    
    image_paths = np.asarray(image_path)
    manivelas = np.asarray(manivela)
    return image_paths, manivelas

directorio = "C:\\Users\\XPC\\Desktop\\sdc\\"

# primera carga de datos del csv
columnas = ["centro","izquierda","derecha","manivela","potencia","reversa","velocidad"]
data = pd.read_csv(directorio + "\\driving_log_limpio.csv", names = columnas)

# dos arreglos, uno con todas las rutas de los volantes,
# el segundo con todos los angulos
image_paths, valores_manivelas = img_manivelas(directorio + 'IMG', data)

# convertir esto en un pandas DF
imagenes = pd.DataFrame({'path': image_paths, 'manivela': valores_manivelas})

print("total datos:", len(imagenes))

imagenes.head()

## Cargar Imagenes y Aug con random flip

In [None]:
def img_preprocess(img):
    img = img[60:135,:,:]
    img = cv2.cvtColor(img, cv2.COLOR_RGB2YUV) #cv2.COLOR_RGB2YUV
    img = cv2.GaussianBlur(img,  (3, 3), 0)
    img = cv2.resize(img, (200, 66))
    #img = img/255
    return img

# este metodo carga las imagenes en un tensor.
# tambien selecciona un conjunto de imagenes, le hace un horizontal flip y las agrega al tensor.
def cargar_imagenes(sample_size, df):
    
    tensor = []
    
    # clonamos el df
    temp = df.copy()
    #temp = temp[temp["manivela"] != 0]
    
    # sacamos la muestra a modificar
    sample = temp.sample(frac=sample_size)
    
    # cambiar la direccion del viraje de la manivela en la muestra
    sample["manivela"] = [-1 * x for x in sample["manivela"]]
    
    # vamos a cargar las imagenes de temp en el tensor.
    temp_path = temp["path"]
    for path in temp_path:
        # se carga la imagen
        img_ = cv2.imread(path)
        # transformar la imagen
        img_ = img_preprocess(img_)
        # se agrega al tensor
        tensor.append(img_)
        
    # vamos a hacer un flip de los samples y los agregamos al tensor
    sample_path = sample["path"]
    for path in sample_path:
        # se carga la imagen
        img_ = cv2.imread(path)
        # flip
        img_ = cv2.flip(img_, 1)
        # transformar la imagen
        img_ = img_preprocess(img_)
        # se agrega al tensor
        tensor.append(img_)
    
    y_labels = temp["manivela"].to_list() + sample["manivela"].to_list() 
    
    return np.array(tensor), y_labels
    
X, y = cargar_imagenes(0.2, imagenes)

print(X.shape)
# imagenes quedan con tamano: (66, 200, 3)

print("nuevo total datos:", len(X))

In [None]:
y = [round(i, 2) for i in y]

In [None]:
plt.hist(y, 30)

In [None]:
plt.figure(figsize=(20,20))

plt.subplot(1,2,1)
plt.imshow(cv2.imread(imagenes["path"][13]))
plt.title(imagenes["manivela"][13])

plt.subplot(1,2,2)
plt.imshow(img_preprocess(cv2.imread(imagenes["path"][13])), cmap='Greys')
plt.title(imagenes["manivela"][13])

## Train - Test Split

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=34)
print("Tamano train", len(X_train))
print("Tamano test", len(X_test))

#X_train=np.reshape(X_train,(X_train.shape[0], X_train.shape[1],X_train.shape[2],1))
#X_test=np.reshape(X_test,(X_test.shape[0], X_test.shape[1],X_test.shape[2],1))

print(X_train.shape)

## Modelo ML - Convolucional

In [None]:
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.optimizers import SGD

In [None]:
activation = 'relu'
reg = 'l2'

model = tf.keras.models.Sequential([
  # ===============================
  tf.keras.layers.Conv2D(24, (5,5), strides=(2,2), activation=activation,  kernel_regularizer=reg, input_shape=(66, 200, 3)),
  tf.keras.layers.Dropout(0.5),
  tf.keras.layers.Conv2D(36, (5,5), strides=(2,2), activation=activation,  kernel_regularizer=reg),
  tf.keras.layers.Conv2D(48, (5,5), strides=(2,2), activation=activation,  kernel_regularizer=reg),
  tf.keras.layers.Dropout(0.5),
  tf.keras.layers.Conv2D(64, (3,3), strides=(1,1), activation=activation,  kernel_regularizer=reg),
  tf.keras.layers.Conv2D(64, (3,3), strides=(1,1), activation=activation,  kernel_regularizer=reg),
  tf.keras.layers.Dropout(0.5),
  # ===============================
  tf.keras.layers.Flatten(),
  tf.keras.layers.Dense(100, activation=activation,  kernel_regularizer=reg),
  tf.keras.layers.Dropout(0.5),
  tf.keras.layers.Dense(50, activation=activation,  kernel_regularizer=reg),
  tf.keras.layers.Dropout(0.5),
  tf.keras.layers.Dense(10, activation=activation,  kernel_regularizer=reg),
  tf.keras.layers.Dense(1)
])

#model.build((66,200,3))
model.summary()

In [None]:
# iniciar entrenamiento
BATCH_SIZE = 64
TRAINING_SIZE = len(X_train)
VALIDATION_SIZE = len(X_test)

optim = Adam(
    learning_rate=1e-3,
    beta_1=0.9,
    beta_2=0.999,
    epsilon=1e-07,
    amsgrad=False
)

# aplical algoritmo de optimizacion.
model.compile(optimizer=optim, loss='mse')

y_train_ = np.array(y_train).reshape(-1,1)
y_test_ =  np.array(y_test).reshape(-1,1)

history = model.fit(
    X_train, 
    y_train_, 
    epochs=50, 
    batch_size=BATCH_SIZE,
    steps_per_epoch = int(TRAINING_SIZE / BATCH_SIZE),
    validation_steps= int(VALIDATION_SIZE / BATCH_SIZE),
    shuffle=1,
    validation_data=(X_test, y_test_),
    verbose=1)

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
model.save('model.h5')