In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import os
import numpy as np
import pandas as pd
import skimage
from skimage import io
import matplotlib.pyplot as plt
import random
import gc
import tensorflow as tf
from tensorflow import keras
import seaborn as sns
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import confusion_matrix

In [None]:
#Methods for image segmentation.
def ll(f_path,label_dir_path):
    comp_path = os.path.join(label_dir_path,f_path)
    labels = []
    with open(comp_path,"r") as file:
        lines = file.readlines()
        for line in lines:
            labels.append(line[0])
    return labels

def extract_from_path(image_path,dir_path):
    image_dir_path = dir_path + "/images/"
    label_dir_path = dir_path + "/labels/"
    label_path = image_path[:-3]+ "txt"
    labels = ll(label_path,label_dir_path)
    return (image_dir_path + image_path),labels 

def extract_from_dir(dir):
    image_dir_path = dir + "/images/"
    image_dir = os.listdir(image_dir_path)
    comp_list = list(map(lambda p: extract_from_path(p,dir),image_dir))
    return comp_list

#Reads
def segment_images(segment,images_with_labels):
    return list(map(lambda i: segment(i),images_with_labels))

def level_classes(image_label_list):
    in_list = list(filter(lambda e: e[1] == 1,image_label_list))
    not_in_list = list(filter(lambda e: e[1] == 0,image_label_list))[:len(in_list)]
    comp_list = in_list + not_in_list
    random.shuffle(comp_list)
    return comp_list

In [None]:
#Segmenting images.
train_dir = "/kaggle/input/traffic-road-object-detection-polish-12k/road_detection/road_detection/train"
valid_dir = "/kaggle/input/traffic-road-object-detection-polish-12k/road_detection/road_detection/valid"
test_dir = "/kaggle/input/traffic-road-object-detection-polish-12k/road_detection/road_detection/test"
in_classes = ["4"]

#This function is used to segment annotated images by their classes.
def in_func(image_labels):
    image,labels = image_labels
    if set(in_classes).issubset(set(labels)):
        return (image,1)
    else:
        return (image,0)

train_images_classes = extract_from_dir(train_dir)
valid_images_classes = extract_from_dir(valid_dir)
test_images_classes = extract_from_dir(test_dir)

train_image_path_label = segment_images(in_func,train_images_classes)
valid_image_path_label = segment_images(in_func,valid_images_classes)
test_image_path_label = segment_images(in_func,test_images_classes)

Xy_train = level_classes(train_image_path_label)
Xy_valid = level_classes(valid_image_path_label)
Xy_test = level_classes(test_image_path_label)

comp_list = Xy_train + Xy_valid + Xy_test
random.shuffle(comp_list)
total = len(comp_list)
train_count = int(0.7*total)
valid_count = int(0.15*total)
test_count = int(0.15*total)

Xy_train = comp_list[:train_count]
Xy_valid = comp_list[train_count+1:train_count+valid_count]
Xy_test = comp_list[train_count+valid_count+1:]

X_train,y_train = zip(*Xy_train)
X_valid,y_valid = zip(*Xy_valid)
X_test,y_test = zip(*Xy_test)

X_train = list(X_train)
X_valid = list(X_valid)
X_test = list(X_test)
y_train = list(y_train)
y_valid = list(y_valid)
y_test = list(y_test)


size_arr = np.array([train_count,valid_count,test_count,total]).reshape(1,-1)
sf = pd.DataFrame(size_arr)

sf.index = ["Images"]

sf.columns = ["Train","Validation","Test","Total"]

print(sf)

In [None]:
#Methods for preprocessing.    
def normalizeImage(gray_image):
    return (gray_image - np.min(gray_image)) / (np.max(gray_image) - np.min(gray_image))

def preprocess_one(image_path):
    image = io.imread(image_path)
    image = skimage.transform.resize(image,(252,252))
    image = skimage.color.rgb2gray(image)
    image = normalizeImage(image)
    return image

def preprocess(image_paths):
    images = list(map(lambda i: preprocess_one(i),image_paths))
    return images

In [None]:
#Visualising data.
image_count = 3

fig,axes = plt.subplots(image_count,1,figsize=(50,50))

im = X_test[:image_count]
im_labels = y_test[:image_count]

for i in range(image_count):
    axes[i].imshow(io.imread(im[i]))
    s = str(im_labels[i]) + " path:\n" + str(X_train[i])
    axes[i].set_title(s)

plt.tight_layout()
plt.show()


In [None]:
im.clear(); del im
im_labels.clear(); del im_labels

train_images_classes.clear(); del train_images_classes
valid_images_classes.clear(); del valid_images_classes
test_images_classes.clear(); del test_images_classes

train_image_path_label.clear(); del train_image_path_label
valid_image_path_label.clear(); del valid_image_path_label
test_image_path_label.clear(); del test_image_path_label

Xy_train.clear(); del Xy_train
Xy_valid.clear(); del Xy_valid
Xy_test.clear(); del Xy_test
gc.collect()

In [None]:
X_train = preprocess(X_train)
print("Preprosessing X_train done.")
X_valid = preprocess(X_valid)
print("Preprosessing X_valid done.")
X_test = preprocess(X_test)
print("Preprosessing X_test done.")

In [None]:
import tensorflow as tf
from tensorflow import keras

model = keras.models.Sequential()
input_tensor = keras.Input(shape=(252,252,1))
model.add(input_tensor)

model.add(keras.layers.Conv2D(filters=16,kernel_size=(3,3),activation="relu"))
model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))

model.add(keras.layers.Conv2D(filters=32,kernel_size=(3,3),activation="relu"))
model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))

model.add(keras.layers.Conv2D(filters=32,kernel_size=(3,3),activation="relu"))
model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))

model.add(keras.layers.Conv2D(filters=32,kernel_size=(3,3),activation="relu"))
model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))

model.add(keras.layers.Conv2D(filters=32,kernel_size=(3,3),activation="relu"))
model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))

model.add(keras.layers.Flatten())

model.add(keras.layers.Dense(252,activation="relu"))
model.add(keras.layers.Dense(1,activation="sigmoid"))

model.compile(optimizer="adam",loss="binary_crossentropy",metrics=["accuracy"])
model.summary()

In [None]:
def train_with_batch(model,X_tr,y_tr,X_val,y_val):
    return model.fit(X_tr,y_tr,epochs=12,validation_data=(X_val,y_val),callbacks=keras.callbacks.EarlyStopping(monitor="val_loss"))

performance_recap = train_with_batch(model,np.array(X_train),np.array(y_train),np.array(X_valid),np.array(y_valid))

In [None]:
#Visualize CNN.
#This is copied from reference 3.
plt.plot(performance_recap.history['accuracy'], label='train_accuracy')
plt.plot(performance_recap.history['val_accuracy'], label = 'val_accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.ylim([0.5, 1])
plt.legend(loc='lower right')

y_test_pred_cnn = model.predict(np.array(X_test))

test_loss, test_acc = model.evaluate(np.array(X_test),np.array(y_test),verbose = 1)

print("Test loss for CNN:")
print(test_loss)
print("Test accuracy for CNN:")
print(test_acc)

In [None]:
def round_probability_to_binary(threshold,prob):
    if prob > threshold:
        return 1
    else:
        return 0

y_test_pred_cnn = list(map(lambda p: round_probability_to_binary(0.5,p),y_test_pred_cnn))   

conf_mat_cnn = confusion_matrix(y_test,y_test_pred_cnn)

aax = plt.subplot()

sns.heatmap(conf_mat_cnn, annot=True, fmt='g', ax=aax)

aax.set_xlabel('Predicted labels',fontsize=15)
aax.set_ylabel('True labels',fontsize=15)
aax.set_title('Confusion Matrix',fontsize=15)
aax.xaxis.set_ticklabels(['no pedestrian', 'pedestrians'],fontsize=15)
aax.yaxis.set_ticklabels(['no pedestrian', 'pedestrians'],fontsize=15)


In [None]:
weights = ["uniform","distance"]
bestScore = 0.0
bestK = 1
bestWeight = "uniform"

#making a visual representation for k to score
uniformX = []
uniformY = []
distanceX = []
distanceY = []

X_train_KNC = np.array(X_train)
print(X_train_KNC.shape)
X_train_KNC= np.reshape(X_train_KNC, (3957, -1))
print(X_train_KNC.shape)

y_train_KNC = np.array(y_train)

X_valid_KNC = np.array(X_valid)
print(X_valid_KNC.shape)
X_valid_KNC = np.reshape(X_valid_KNC, (847, -1))
print(X_valid_KNC.shape)

y_valid_KNC = np.array(y_valid)



for weight in weights:
    print(f"starting with {weight}")
    for k in range(1,100):
        KNC = KNeighborsClassifier(n_neighbors = k, weights = weight).fit(X_train_KNC, y_train_KNC)
        score = KNC.score(X_valid_KNC, y_valid_KNC)
        if weight == "uniform":
            uniformX.append(k)
            uniformY.append(score)
        else:
            distanceX.append(k)
            distanceY.append(score)
            
        if score > bestScore:
            bestScore = score
            bestK = k
            bestWeight = weight
        if k%10 == 0:
            print(f"{k} done") 
        print(f"{weight} done")
    
     
print("Uniform")
plt.plot(uniformX,uniformY)
plt.show()

print("Distance")
plt.plot(distanceX,distanceY)
plt.show()


print(f"The best classifier was found with parameters K: {bestK}, weight: {bestWeight} and accuracy score of: {bestScore}")


bestKNC = KNeighborsClassifier(n_neighbors = bestK, weights = bestWeight).fit(X_train_KNC, y_train_KNC)
y_valid_pred = bestKNC.predict(X_valid_KNC)
conf_matrix = confusion_matrix(y_valid_KNC, y_valid_pred)
recall_score = conf_matrix[1,1]/(conf_matrix[1,0]+conf_matrix[1,1])

y_train_acc = bestKNC.score(X_train_KNC, y_train_KNC)
y_valid_acc = bestKNC.score(X_valid_KNC, y_valid_KNC)

print(f"The training accuracy for the best KNC with k: {bestK} was {y_train_acc}")
print(f"The validation accuracy for the best KNC with k: {bestK} was {y_valid_acc}")
print(f"the recall score for the validation set is {recall_score}")



ax = plt.subplot()

sns.heatmap(conf_matrix, annot=True, fmt='g', ax=ax)

ax.set_xlabel('Predicted labels',fontsize=15)
ax.set_ylabel('True labels',fontsize=15)
ax.set_title('Confusion Matrix',fontsize=15)
ax.xaxis.set_ticklabels(['no pedestrian', 'pedestrians'],fontsize=15)
ax.yaxis.set_ticklabels(['no pedestrian', 'pedestrians'],fontsize=15)