# 모방학습 기반 자율주행 (실습)

## Data 다운로드

In [None]:
from google.colab import files
import os
if not 'data' in os.listdir():
    files.upload() #upload kaggle.json
    !pip install -q kaggle
    !mkdir -p ~/.kaggle
    !cp kaggle.json ~/.kaggle/
    !ls ~/.kaggle
    !chmod 600 /root/.kaggle/kaggle.json
    !kaggle datasets download -d hyunkunkookminuniv/behavioralcloning
    !unzip -q behavioralcloning.zip -d ./data

## 필수 라이브러리 임포트

In [None]:
import glob
import shutil
import os
import random
from PIL import Image

import pandas as pd
import numpy as np
from tqdm import notebook

import matplotlib.pyplot as plt

In [None]:
# TensorFlow and tf.keras
import tensorflow as tf
from tensorflow import keras 

from tensorflow.keras.utils import Sequence
from tensorflow.keras.layers import Dense, Input, Activation, Flatten
from tensorflow.keras.layers import BatchNormalization,Add,Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import LeakyReLU, ReLU, Conv2D, MaxPooling2D, BatchNormalization, Conv2DTranspose, UpSampling2D, concatenate
from tensorflow.keras import callbacks
from tensorflow.keras import backend as K
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.image import ImageDataGenerator


# 실습 데이터

In [None]:
df = pd.read_csv('/content/data/data1/log.csv')
print(len(df))

In [None]:
df.head()

In [None]:
plt.plot(df.steer)

In [None]:
log_df1 = pd.read_csv('/content/data/data1/log.csv')
log_df2 = pd.read_csv('/content/data/data2/log.csv')
log_df3 = pd.read_csv('/content/data/data3/log.csv')
log_df4 = pd.read_csv('/content/data/data4/log.csv')

logs = [log_df1, log_df2, log_df3, log_df4]

for i in range(len(logs)):
    logs[i] = 10*logs[i]['steer'].values
    
    logs[i][logs[i] > 1] = 1.0
    logs[i][logs[i] < -1] = -1.0
    logs[i] += 1
    logs[i] /= 2


In [None]:
plt.plot(logs[0])
plt.xlabel("time (s)")
plt.ylabel("control value")
plt.title("steering control")
plt.show()

# 인공 신경망 생성

In [None]:
class Autonomous_Model(keras.Model):
    def __init__(self):
        super().__init__()
        self.conv1 = Conv2D(24, 5, padding='valid')
        self.mp1 = MaxPooling2D(pool_size=(2,2), strides=(2,2), padding='valid')
        
        self.conv2 = Conv2D(36, 5, padding='valid')
        self.mp2 = MaxPooling2D(pool_size=(2,2), strides=(2,2), padding='valid')
        
        self.conv3 = Conv2D(48, 5, padding='valid')
        self.mp3 = MaxPooling2D(pool_size=(2,2), strides=(2,2), padding='valid')

        self.conv4 = Conv2D(64, 3, padding='valid')
        self.mp4 = MaxPooling2D(pool_size=(2,2), strides=(2,2), padding='valid')

        self.conv5 = Conv2D(64, 3, padding='valid')
        self.mp5 = MaxPooling2D(pool_size=(2,2), strides=(2,2), padding='valid')

        self.flatten = tf.keras.layers.Flatten()

        self.d1 = Dense(1164)

        self.d2 = Dense(100)

        self.d3 = Dense(50)

        self.d4 = Dense(10)

        self.d5 = Dense(1)


    def call(self, input_tensor, *args, **kwargs):
        x = self.conv1(input_tensor)
        x = tf.nn.relu(x)
        x = self.mp1(x)

        return tf.keras.activations.sigmoid(x)

In [None]:
Steer_model = Autonomous_Model()

Steer_model.compile(optimizer='Adam', loss = 'BCE',  metrics=['mae'])
Steer_model.build(input_shape=(None, 256,512,3))
Steer_model.summary()

# 데이터 불러오기

In [None]:
class steer_loader(Sequence):
    def __init__(self, img_path, labels, batch_size = 16, shuffle = False, flip=False, bias=0, shift=False, rotate=False):
        self.batch_size = batch_size
        self.img_path = img_path
        self.img_list_c = sorted(os.listdir(self.img_path+"center/rgb/"))
        self.img_list_l = sorted(os.listdir(self.img_path+"left/rgb/"))
        self.img_list_r = sorted(os.listdir(self.img_path+"right/rgb/"))
        self.labels = labels
        self.dataset_size = len(self.img_list_c)
        self.shuffle = shuffle
        self.flip = flip
        self.on_epoch_end()
        self.bias = bias
        self.shift = shift
        self.rotate = rotate
        
    def __len__(self):

        return int(np.floor(self.dataset_size) / self.batch_size)
        
    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(self.dataset_size)
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __getitem__(self, idx):

        indexes = self.indexes[idx*self.batch_size:(idx+1)*self.batch_size]
        cur_batch_name_list = [self.img_list_c[k] for k in indexes]
        label_list = np.array([self.labels[k] for k in indexes])
        
        data_list = []
        for i, data in enumerate(cur_batch_name_list):
            if self.shift:
                img_from = np.random.choice(["center/rgb/", "left/rgb/", "right/rgb/"])
            else:
                img_from = "center/rgb/"
            if img_from == "left/rgb/":
                label_list[i] += self.bias
                data = self.img_list_l[indexes[i]]
            if img_from == "right/rgb/":
                label_list[i] -= self.bias
                data = self.img_list_r[indexes[i]]

            img_data = np.array(image.img_to_array(Image.open(self.img_path + img_from + data)))/255
            img_data = img_data[:,:,:3]

            
            if self.flip and np.random.uniform() > 0.5:
                img_data = img_data[:,::-1,:]
                label_list[i] = 1 - label_list[i]


            data_list.append(img_data)
        data_list = np.array(data_list)
        data_list = tf.convert_to_tensor(data_list, dtype=tf.float32)
        return data_list , label_list

In [None]:
train_loader1 =  steer_loader('/content/data/data1/', logs[0],flip=True, shift=True, bias=0.05, rotate=False)
train_loader2 =  steer_loader('/content/data/data2/', logs[1],flip=True, shift=True, bias=0.05, rotate=False)
train_loader3 =  steer_loader('/content/data/data3/', logs[2],flip=False, bias=0.05, rotate=False)

val_loader = steer_loader('/content/data/data3/', logs[2],flip=True)

test_loader = steer_loader('/content/data/data4/', logs[3], shuffle = False, flip=True)

In [None]:
imgs, labels = train_loader1.__getitem__(2)

i = 10
plt.imshow(imgs[i])
plt.title(f"steering: {labels[i]:.2f}")
plt.show()

plt.imshow(imgs[i][:,::-1,:])
plt.title(f"steering: {1 - labels[i]:.2f}")
plt.show()

In [None]:
i = 10
plt.imshow(imgs[i])
plt.title(f"steering: {labels[i]:.2f}")
plt.show()
plt.imshow(imgs[i][:,::-1,:])
plt.title(f"steering: {1 - labels[i]:.2f}")
plt.show()

# 학습 및 평가

In [None]:
for i in range(3):
    Steer_model.fit(train_loader1, validation_data = val_loader, epochs=1)

    Steer_model.fit(train_loader2, validation_data = val_loader, epochs=1)

    Steer_model.fit(train_loader3, validation_data = val_loader, epochs=1)



In [None]:
test_loader = steer_loader('/content/data/data3/', logs[2], shuffle = False, flip=False)
target = []
pred = []
a = 0
for data, label in notebook.tqdm(test_loader):
    y_pred = Steer_model(data)
    for i, j in zip(label, y_pred):
        target.append(i)
        pred.append(j)

    a += 1
# before 1 epoch
time_idx = np.arange(len(target))
plt.plot(time_idx, target, time_idx, pred)