In [None]:
import matplotlib.pyplot as plt 
import tensorflow as tf
import numpy as np
import keras
from tensorflow.python.keras import regularizers
from tensorflow.keras.layers import Dense, Flatten, Conv2D, Dropout, BatchNormalization, MaxPooling2D
from tensorflow.keras import Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import pandas as pd
from numpy import asarray
from tensorflow.keras.applications.nasnet import preprocess_input
import random
from keras import layers
from tensorflow.keras.utils import plot_model

In [None]:
tf.config.list_physical_devices(
    device_type=None
)
path = "machine-learning-in-science-2022/"

In [None]:
def read_dataset(img_path,label):
    img = tf.io.read_file(img_path)
    img = tf.image.decode_png(img,channels=3)
    img = tf.cast(img, tf.float32)
    #y = tf.reshape(tf.convert_to_tensor(labels),(1,2))
    return img,label

In [None]:
def add_noise(img):
    noise = tf.random.normal(tf.shape(img),0,5)
    img += noise
    img = tf.clip_by_value(img, 0, 255)
    return img

df = pd.read_csv(path + 'training_norm.csv')
img_list = df["image_id"].values
img_path = []
for i in range(len(img_list)):
    img_path.append(path + 'training_data/training_data/' + str(img_list[i]) + '.png')
labels = np.stack((df["angle"].values,df["speed"].values),axis = 1)

In [None]:
df = pd.read_csv(path + 'training_norm.csv')
img_path = []
angle = []
speed = []
for _,row in df.iterrows():
    img_path.append(path + 'training_data/training_data/' + str(int(row["image_id"])) + '.png')
    angle.append(row["angle"])
    speed.append(int(row["speed"]))

In [None]:
ds_train = tf.data.Dataset.from_tensor_slices((img_path, {'angle_prediction':angle,'speed_prediction':speed}))
ds_train = ds_train.map(read_dataset).map(lambda image, label: (tf.image.random_contrast(image, lower = 0.9, upper = 1), label)
                                                                ).map(lambda image, label: (tf.image.random_saturation(image, lower = 0.8, upper = 1.2),label)
                                                                     ).map(lambda image, label: (tf.image.random_brightness(image, 0.5 ,1.5),label)
                                                                          ).map(lambda image, label: (add_noise(image), label)
                                                                               ).shuffle(1000).batch(128).repeat(4)
                      

In [None]:
x = next(iter(ds_train))

In [None]:
x[1]

In [None]:
#mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/cpu:0"])
#with mirrored_strategy.scope():
base_model = tf.keras.Sequential(
    [
    keras.Input(shape=(240, 320, 3)),
    layers.Rescaling(scale = 1./127.5, offset=-1),
    Conv2D(filters = 16,kernel_size = 5,strides=(2, 2), padding='same', activation="relu"),
    Dropout(0.1),

    layers.Conv2D(filters = 8,kernel_size = 3,strides=(1,1), padding='same', activation="relu"),
    layers.AveragePooling2D((2, 2), strides=2),

    layers.Flatten(),
    layers.Dense(128, activation="relu"),
    Dropout(0.2),
    layers.BatchNormalization()
    ]
)

dense1 = Dense(64,activation = 'relu')(base_model.layers[-1].output)
angle_pred = Dense(1, activation='relu',name="angle_prediction")(dense1)

denseA = Dense(64,activation = 'relu')(base_model.layers[-1].output)
speed_pred = Dense(1, activation='sigmoid',name="speed_prediction")(denseA)

model = Model(inputs=base_model.input, outputs=[angle_pred,speed_pred])
model.compile(optimizer="adam", loss={"angle_prediction":'mse',"speed_prediction":'binary_crossentropy'})
model.summary()
plot_model(model,show_shapes=True)

In [None]:
model.fit(ds_train,epochs=10)

In [None]:
model.save("own_net-10epochs")

In [None]:
#mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/cpu:0"])
#with mirrored_strategy.scope():
base_model = tf.keras.applications.NASNetMobile(
weights=None,  # Load weights pre-trained on ImageNet.
input_shape=(240, 320, 3),
include_top=False,
pooling='avg')
flat = Flatten()(base_model.layers[-1].output)
bn1 = layers.BatchNormalization()(flat)

dense1 = Dense(1056,activation = 'relu')(bn1)
bn2 = layers.BatchNormalization()(dense1)
dense2 = Dense(1056,activation = 'relu')(bn2)
bn3 = layers.BatchNormalization()(dense2)
dense3 = Dense(1056,activation = 'relu')(bn3)
bn4 = layers.BatchNormalization()(dense3)
angle_pred = Dense(1, activation='relu',name="angle_prediction")(bn4)

denseA = Dense(1056,activation = 'relu')(bn1)
bnA = layers.BatchNormalization()(denseA)
denseB = Dense(1056,activation = 'relu')(bn2)
bnB = layers.BatchNormalization()(denseB)
denseC = Dense(1056,activation = 'relu')(bnB)
bnC = layers.BatchNormalization()(denseC)
speed_pred = Dense(1, activation='sigmoid',name="speed_prediction")(bnC)
model = Model(inputs=base_model.inputs, outputs=[angle_pred,speed_pred])
model.compile(optimizer="adam", loss={"angle_prediction":'mse',"speed_prediction":'binary_crossentropy'})
model.summary()

plot_model(model,show_shapes=True)

In [None]:
model.fit(ds_train,epochs=5)

In [None]:
def read_dataset_test(img_path):
    img = tf.io.read_file(img_path)
    img = tf.image.decode_png(img,channels=3)
    img = tf.cast(img, tf.float32)
    img = tf.reshape(img,[1,240,320,3])
    return img

In [None]:
model.predict(read_dataset_test(path + 'training_data/training_data/' + '7' + '.png'))

In [None]:
model = keras.models.load_model("nasnet_working")

In [None]:
predictions = []
for i in range(1,1021):
    predictions.append(model.predict(read_dataset_test(path + 'test_data/test_data/' + str(i) + '.png')))

In [None]:
for i in range(len(predictions)):
    speed = predictions[i][1]
    if abs(1-speed) < abs(speed-0):
        predictions[i][1] = 1
    else:
        predictions[i][1] = 0
    predictions[i][0] = predictions[i][0][0][0]

In [None]:
predictions

In [None]:
df1 = pd.DataFrame(predictions,columns=['angle', 'speed'])

In [None]:
df1

In [None]:
df1.to_excel("output.xlsx")