In [None]:

!pip install tensorflow==2.10.0
!pip install keras==2.10.0

In [None]:
!pip install GPUtil

In [None]:
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!unzip /content/drive/MyDrive/iot2/20 -d /content/img/
!unzip /content/drive/MyDrive/iot2/26 -d /content/img/
!unzip /content/drive/MyDrive/iot2/102_No -d /content/img/
!unzip /content/drive/MyDrive/iot2/104_No -d /content/img/

In [None]:
import os
import random
import fnmatch
import pickle
import GPUtil
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.layers import Dropout

# data processing
import numpy as np
import pandas as pd


# tensorflow
import tensorflow as tf
import tensorflow.keras

#V2 is tensorflow.keras.xxxx, V1 is keras.xxx
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPool2D, Dropout, Flatten, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import load_model
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
import cv2
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from PIL import Image

## TPU 및 GPU 설정

In [None]:
try:
  tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
except ValueError:
  tpu = None
  gpus = tf.config.experimental.list_logical_devices("GPU")


if tpu:
  tf.config.experimental_connect_to_cluster(tpu)
  strategy = tf.distribute.TPUStrategy(tpu)
  print('Running on TPU ', tpu.cluster_spec().as_dict()['worker'])
elif len(gpus) > 0:
  strategy = tf.distribute.MirroredStrategy(gpus) #
  print('Running on ', len(gpus), ' GPU(s) ')
else:
  strategy = tf.distribute.get_strategy()
  print('Running on CPU')

print("Number of accelerators: ", strategy.num_replicas_in_sync)

## 전처리및 모델 정의

In [None]:
class GpuUsageCallback(Callback):
      def on_epoch_end(self, epoch, logs=None):
        GPUs = GPUtil.getGPUs()
        gpu = GPUs[0]
        print("\nGPU RAM Free: {0:.0f}MB | Used: {1:.0f}MB | Util {2:3.0f}% | Total {3:.0f}MB".format(gpu.memoryFree, gpu.memoryUsed, gpu.memoryUtil*100, gpu.memoryTotal))



gpu_usage_callback = GpuUsageCallback()
def my_imread(image_path):
    image = cv2.imread(image_path)
    return image

def img_preprocess(image):
    image = image / 255

    return image

def image_data_generator(image_paths, steering_angles, batch_size):
    while True:
        batch_images = []
        batch_steering_angles = []

        for i in range(batch_size):
            random_index = random.randint(0, len(image_paths) - 1)
            image_path = image_paths[random_index]
            image = my_imread(image_paths[random_index])
            steering_angle = steering_angles[random_index]

            image = img_preprocess(image)
            batch_images.append(image)
            batch_steering_angles.append(steering_angle)

        yield(np.asarray(batch_images), np.asarray(batch_steering_angles))

def nvidia_model():
    model = Sequential(name='Nvidia_Model')

    model.add(Conv2D(24, (5, 5), strides=(2, 2), input_shape=(80, 220,3), activation='relu'))
    model.add(Conv2D(36, (5, 5), strides=(2, 2), activation='relu'))
    model.add(Conv2D(48, (5, 5), strides=(2, 2), activation='relu'))
    model.add(Conv2D(64, (3, 3), activation='relu'))
    model.add(Dropout(0.2))
    model.add(Conv2D(64, (3, 3), activation='relu'))

    model.add(Flatten())
    model.add(Dropout(0.2))
    model.add(Dense(100, activation='relu'))
    model.add(Dense(50, activation='relu'))
    model.add(Dense(10, activation='relu'))

    model.add(Dense(1))

    optimizer = Adam(learning_rate=1e-3)
    model.compile(loss='mse', optimizer=optimizer)

    return model

model = nvidia_model()

## 모델 학습

In [None]:
image_paths = []


data_dir = '/content/img'

for root, dirs, files in os.walk(data_dir):
    for file in files:
        if file.endswith('.png'):
            image_paths.append(os.path.join(root, file))
file_list = image_paths
image_paths = [] #이미지 경로를 저장하는 변수
steering_angles = [] #이미지 각도를 저장하는 변수
pattern = "*.png"
print(len(file_list))
for filename in file_list:
    if fnmatch.fnmatch(filename, pattern):
        image_paths.append(os.path.join(data_dir,filename))
        angle = int(filename[-7:-4]) #

        steering_angles.append(angle)

df = pd.DataFrame()
df['ImagePath'] = image_paths
df['Angle'] = steering_angles


model_output_dir = "/content/"

early_stop = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=3)

checkpoint_callback = tensorflow.keras.callbacks.ModelCheckpoint(filepath=os.path.join(model_output_dir,'lane_navigation_check.h5'), verbose=1, save_best_only=True)
X_train, X_valid, y_train, y_valid = train_test_split( image_paths,steering_angles, test_size=0.2)
history = model.fit(image_data_generator(X_train, y_train, batch_size=128),
                              steps_per_epoch=400,
                              epochs=12,
                              validation_data = image_data_generator( X_valid, y_valid, batch_size=128),
                              validation_steps=200,
                              verbose=1,
                              shuffle=10,
                              callbacks=[checkpoint_callback,early_stop],batch_size=128)

model.save(os.path.join(model_output_dir,'1_lane_1.h5'))

## 모델 경량화

In [None]:

# 핵심 TFLite로 변환
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()


with open('1_lane_1.tflite', 'wb') as f:
    f.write(tflite_model)



history_path = os.path.join(model_output_dir,'history.pickle')
with open(history_path, 'wb') as f:
    pickle.dump(history.history, f, pickle.HIGHEST_PROTOCOL)
