In [None]:
import matplotlib.pylab as plt
import numpy as np
import tensorflow as tf
import pandas as pd
import cv2
from os import listdir,path

In [None]:
# Auswahl Bilder-Pfad
img_path = './shared_images/'
img_files = listdir(img_path)
len(img_files)

In [None]:
# Ansicht eines Beispielbildes
img_example = cv2.imread(img_path + img_files[10])
plt.imshow(img_example)
print(img_example.shape)

In [None]:
# Einlesen aller Bilder -> Kontrolle der Anzahl
imgs = np.array([cv2.imread(img_path+i) for i in img_files])
imgs.shape

In [None]:
# Mittleres Bilder der Trainingsbilder
img_mean = imgs.mean(axis=0).astype('int')
plt.imshow(img_mean)

In [None]:
# Testweise Bildbearbeitung
# Definition Fenster, Graufärbung und Größenänderung
sp=(0,200)
ep=(320,100)
dim = (320,240)
interpolation = cv2.INTER_AREA
img_example_x = img_example.copy()
img_example_x = cv2.cvtColor(img_example_x,cv2.COLOR_BGR2GRAY)
img_example_x= cv2.resize(img_example_x,dim,interpolation)
plt.imshow(cv2.rectangle(img_example_x,sp,ep,(255,255,0),2),cmap='gray')

In [None]:
# Definition einer Funktion zur Vorverarbeitung eines einzelnen Trainingsbildes
# Dieser Verarbeitungsschritte müssen bei der Anwendungung des neuronlen Netzes
# am RPi ebenfalls in identischer Form durchgeführt werden.
def transform_image(img):
    dim = (320,240)
    interpolation = cv2.INTER_AREA
    img = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY) # Verwendung von Graustufenbildern
    img = cv2.resize(img,dim,interpolation) #  Anpassung der Bildgröße
    img = img[100:200,:] # Ausschneiden eines Teilbildes
    return img

plt.imshow(transform_image(imgs[0]),cmap='gray')

In [None]:
# Alternative mit Verkleinerung der Bilder
# Definition einer Funktion zur Vorverarbeitung der Trainingsbilder
def transform_image_small(img):
    dim = (64,48)
    interpolation = cv2.INTER_AREA
    img = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY) # Verwendung von Graustufenbildern
    img = cv2.resize(img,dim,interpolation) #  Anpassung der Bildgröße
    img = img[20:40,:] # Ausschneiden eines Teilbildes
    return img

plt.imshow(transform_image_small(imgs[0]),cmap='gray')

In [None]:
# Laden und Vorverarbeiten aller Trainingsbilder (groß)
imgs_large = np.array([transform_image(cv2.imread(img_path+i)) for i in img_files])
imgs_large.shape

In [None]:
# Laden und Vorverarbeiten aller Trainingsbilder (klein)
imgs_small = np.array([transform_image_small(cv2.imread(img_path+i)) for i in img_files])
imgs_small.shape

In [None]:
# Position des Lenkwinkels im Filenamen (abhängig von gewählter Vorgehensweise)
x = img_files[0]
print(x)
print('-',x[-7:-4],'-') # Raspberry/Linux
print('-',x[50:-4],'-') # Windows

In [None]:
# Extrahieren der Lenkwinkel aus dem Dateinamen
angles = np.array([int(i[-7:-4]) for i in img_files])

In [None]:
# Betrachtung der Verteilung der Lenkwinkel in den Trainingsdaten
# -> typischerweise nicht symmetrisch
pd.Series(angles, name ='angles').hist(bins = 40)

In [None]:
# Spiegeln und Zusammenfügen der ungespiegelten und der gespiegelten Trainingsbilder
# Große Bilder
imgs_large_mirrored = imgs_large[:,:,::-1]
print(imgs_large.shape)
print(imgs_large_mirrored.shape)
imgs_large_all = np.concatenate((imgs_large,imgs_large_mirrored))
print(imgs_large_all.shape)

In [None]:
# Zusammenfügen der ungespiegelten und der gespiegelten Trainingsbilder
# Kleine Bilder
imgs_small_mirrored = imgs_small[:,:,::-1]
print(imgs_small.shape)
print(imgs_small_mirrored.shape)
imgs_small_all = np.concatenate((imgs_small,imgs_small_mirrored))
print(imgs_small_all.shape)

In [None]:
# Spiegeln der Winkel
angles_mirrored = [-a+180 for a in angles] 
# Kontrolle
plt.plot(angles,angles_mirrored)
# Zusammenführen der ungespiegelten und gespiegelten Winkel
angles_all = np.hstack((angles,angles_mirrored)) 

In [None]:
# Betrachtung der Verteilung der Lenkwinkel in den Trainingsdaten mit gespiegelten Bildern
# -> sollte symmetrisch sein, hier nicht wegen ungerader Anzahl Bins
pd.Series(angles_all, name ='angles').hist(bins = 40)
pd.Series(angles, name ='angles').hist(bins = 40)

In [None]:
# Import notwendiger Klassen und Funktionen
#from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D,Dense,Flatten,Dropout
from tensorflow.keras.optimizers import Adam

In [None]:
# Die Trainingsdaten sollen eine Shape erhalten, welche die Farbkanäle berücksichtigen...
# ... auch wenn in diesem Beispiele nur ein Farbkanal vorhanden ist.
n,h,w = imgs_large_all.shape
imgs_large_all_tf = imgs_large_all.reshape((n,h,w,1))
n,h,w,f = imgs_large_all_tf.shape
input_shape = (h,w,f)
print(input_shape)

In [None]:
# Erstellen des Netzes für die großen Bilder
# Keras Sequntial API
model = tf.keras.Sequential(name='dnn') 

# Convolution Layers 
# elu: Expenential Linear Unit, similar to leaky Relu 
model.add(Conv2D(24, (5, 5), strides=(2, 2), input_shape=input_shape, activation='elu')) 
model.add(Conv2D(36, (5, 5), strides=(2, 2), activation='elu'))
model.add(Conv2D(48, (5, 5), strides=(2, 2), activation='elu'))
model.add(Conv2D(64, (3, 3), activation='elu')) 
model.add(Dropout(0.2)) # more robustness 
model.add(Conv2D(64, (3, 3), activation='elu')) 

# Fully Connected Layers 
model.add(Flatten()) 
model.add(Dropout(0.2)) # more robustness 
model.add(Dense(100, activation='elu')) 
model.add(Dense(50, activation='elu')) 
model.add(Dense(10, activation='elu'))

# Output Layer: turning angle
model.add(Dense(1)) 

In [None]:
# Überblick über das Modell
model.summary()

In [None]:
# Die Trainingsdaten sollen eine Shape erhalten, welche die Farbkanäle berücksichtigen...
# ... auch wenn in diesem Beispiele nur ein Farbkanal vorhanden ist.
# Alternative für kleines Bild
n,h,w = imgs_small_all.shape
imgs_small_all_tf = imgs_small_all.reshape((n,h,w,1))
n,h,w,f = imgs_small_all_tf.shape
input_shape_small = (h,w,f)
print(input_shape_small)

In [None]:
# Alternative für kleinens Bild und normales NN
model_small = tf.keras.Sequential(name='dnn_dense') 

# Convolution Layers 
# elu: Expenential Linear Unit, similar to leaky Relu 
model_small.add(Conv2D(10, (2, 6), strides=(2, 3), input_shape=input_shape_small, activation='elu')) 
model_small.add(Conv2D(10, (2, 5), strides=(2, 2), activation='elu'))
model_small.add(Conv2D(10, (2, 4), strides=(2, 2), activation='elu'))
model_small.add(Conv2D(20, (2, 2), strides=(2, 2), activation='elu'))
# Fully Connected Layers 
model_small.add(Flatten()) 
#model_small.add(Dropout(0.2)) # more robustness 
#model_small.add(Dense(20, activation='elu')) 
model_small.add(Dense(10, activation='elu')) 
model_small.add(Dense(10, activation='elu')) 

# Output Layer: turning angle
model_small.add(Dense(1))

In [None]:
# Kontrolle der Architektur
model_small.summary()

In [None]:
# Erzeugen eines flachen Datensatzes aus den kleinen Bildern
n,h,w = imgs_small_all.shape
print(imgs_small_all.shape)
imgs_small_all_tf_flat = imgs_small_all.reshape((n,h*w))
imgs_small_all_tf_flat.shape

In [None]:
# Alternative für kleines Bild
# Erstellen des Netzes für das stark verkleinert Bild
# Die Größte des Netzes muss angepasst werden. (Anzahl der Schichten, Kernelsizes, Strides)
model_small_flat = tf.keras.Sequential(name='dnn_small') 
model_small_flat.add(Dense(100, input_shape=(imgs_small_all_tf_flat.shape), activation='elu')) 
model_small_flat.add(Dense(20, activation='elu')) 

# Output Layer: turning angle
model_small_flat.add(Dense(1))

In [None]:
# Kontrolle der Architektur
# Das kleine Modell hat bedeuteten weniger Parameter
model_small_flat.summary()

In [None]:
# Auswahl der großen oder kleinen Bilder und des zugehörigen Modells
#imgs_all_tf= imgs_small_all_tf_flat
#model = model_small_flat
imgs_all_tf= imgs_small_all_tf
model = model_small

In [None]:
# Kontrolle und Reshapen der Trainingsdaten
print(angles_all.shape)
print(imgs_all_tf.shape)

In [None]:
# Split Trainings- und Testdaten
from sklearn.model_selection import train_test_split
X_train, X_validate, y_train, y_validate = train_test_split(imgs_all_tf, angles_all, test_size = 0.2, random_state = 1)

In [None]:
# Kontrolle
print(X_train.dtype)
print(X_train.shape)
print(y_train.shape)
print(X_validate.dtype)
print(X_validate.shape)
print(X_validate.shape)

In [None]:
# Kompilieren des Modells und Wahl von Fehlerfunktion und Optimizer
model.compile(loss ='mse', optimizer=Adam(lr=0.002))
val_loss = []
loss = []

In [None]:
# Training
history = model.fit(
    X_train,
    y_train,
    batch_size=32,
    epochs= 50,
    verbose=1,
    validation_data=(X_validate,y_validate)
)
loss.extend(history.history['loss'])
val_loss.extend(history.history['val_loss'])

In [None]:
# Darstellung Trainingswerte über Epochen
skip=3
plt.figure(figsize=(10, 10))
plt.subplot(2, 2, 1)
plt.plot(loss[skip:], label='Loss')
plt.plot(val_loss[skip:], label='Validation Loss')
plt.xlabel('epochs')
plt.ylabel('loss / val_loss')
plt.legend()
plt.title('Training - Loss Function')

In [None]:
# Trainings- und Testfehler
mse_train = model.evaluate(X_train,y_train)
mse_validate = model.evaluate(X_validate,y_validate)

print('MSE train/val: {:.4f} / {:.4f}'.format(mse_train,mse_validate))
print('RMSE train/val: {:.4f} / {:.4f}'.format(np.sqrt(mse_train),np.sqrt(mse_validate)))

In [None]:
# Visualisierung der Fehler
y_train_p = model.predict(X_train)
y_validate_p = model.predict(X_validate)

plt.figure(figsize=(14,7))
plt.subplot(121)
plt.plot(y_train,y_train_p,'rx',ms=2)
plt.plot([45,135],[45,135],'k-')
plt.xlabel('Winkel real')
plt.ylabel('Winkel geschätzt')
plt.title('Trainingsset')

plt.subplot(122)
plt.plot(y_validate,y_validate_p,'bx',ms=2)
plt.plot([45,135],[45,135],'k-')
plt.xlabel('Winkel real')
plt.ylabel('Winkel geschätzt')
plt.title('Testset')

In [None]:
# Speichern des trainierten Modells
path_to_model_file = './model/MODEL.h5'# Speichert im H5-Format
# path_to_model_file = './model/DEMO_MODEL' # Speichert im SavedModel-Format
model.save(path_to_model_file)

In [None]:
# Laden eines Modells
path_to_model_file = './model/MODEL.h5'
model_loaded = tf.keras.models.load_model(path_to_model_file)

In [None]:
# Shape des Input-Layers
model_loaded.layers[0].input_shape

In [None]:
# Trainings- und Testfehler von geladenem Modell
mse_train = model_loaded.evaluate(X_train,y_train)
mse_validate = model_loaded.evaluate(X_validate,y_validate)

print('MSE train/val: {:.4f} / {:.4f}'.format(mse_train,mse_validate))
print('RMSE train/val: {:.4f} / {:.4f}'.format(np.sqrt(mse_train),np.sqrt(mse_validate)))

In [None]:
# Visualisierung der Fehler
y_train_p = model_loaded.predict(X_train)
y_validate_p = model_loaded.predict(X_validate)
plt.figure(figsize=(14,7))
plt.subplot(121)
plt.plot(y_train,y_train_p,'rx',ms=2)
plt.plot([45,135],[45,135],'k-')
plt.xlabel('Winkel real')
plt.ylabel('Winkel geschätzt')
plt.title('Trainingsset')

plt.subplot(122)
plt.plot(y_validate,y_validate_p,'bx',ms=2)
plt.plot([45,135],[45,135],'k-')
plt.xlabel('Winkel real')
plt.ylabel('Winkel geschätzt')
plt.title('Testset')

In [None]:
# Wahl eines Beispielbildes!
xe = np.array( [X_train[20]] )
print("Nr Bild: ", y_train[20])
xe.shape

In [None]:
# Mittels der Methode predict
model_loaded.predict(xe)