<a href="https://colab.research.google.com/github/DataScienceAndEngineering/deep-learning-final-project-project-sidewalk/blob/nicholas/notebooks/data_exploration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Loading all necessary packages
import tensorflow as tf
import numpy as np
import os
import PIL
import matplotlib.pyplot as plt

#Extract the dataset into colab
#!unzip ./drive/MyDrive/tensorflow_datasets/downloads/manual/leftImg8bit_trainvaltest.zip
#!unzip ./drive/MyDrive/tensorflow_datasets/downloads/manual/gtFine_trainvaltest.zip


In [None]:
def obtain_paths(subset='train'):
  #Returns filepaths of all labels and images which include a sidewalk segmentation
  LabelsDir = f'./gtFine/{subset}'
  labels_files = []
  img_files = []
  for root, dirs, files in os.walk(LabelsDir):
    for filename in files:
      if filename.endswith('labelIds.png'):
        f = os.path.join(root, filename)
        labels = np.array(PIL.Image.open(f))
        if 8 in labels:
          labels_files.append(f)
          img_files.append('./leftImg8bit' + f[8:-19] + 'leftImg8bit.png')
  return labels_files, img_files

train_masks, train_images = obtain_paths('train')
val_masks, val_images = obtain_paths('val')
test_masks, test_images = obtain_paths('test')

In [None]:
print(f'n_train: {len(train_masks)}')
print(f'n_val: {len(val_masks)}')
print(f'n_test: {len(test_masks)}')


In [None]:
def dice_coef(y_true, y_pred):
    smooth = 1e-6

    y_true_f = tf.keras.layers.Flatten()(y_true)
    y_pred_f = tf.keras.layers.Flatten()(y_pred)    

    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    denom =(tf.reduce_sum(y_pred_f) + tf.reduce_sum(y_true_f) + smooth)
    return (2. * intersection + smooth) / denom

def dice_score(y_true, y_pred, numLabels=2):
    """This is the loss function to MINIMIZE. A perfect overlap returns 0. Total disagreement returns numeLabels"""
    dice=0
    for index in range(numLabels):
        dice -= dice_coef(y_true[:,:,:,index], y_pred[:,:,:,index])
    return numLabels + dice


In [None]:
def mask_processing(mask_path):
    #Removes extra segmentations from mask, returning only sidewalk info.
    #Input path to mask file, returns image (possibly re-save in new directory_)
    mask = tf.io.decode_png(tf.io.read_file(mask_path), channels=1)
    new_mask = tf.cast(tf.math.equal(mask, 8), tf.float32)
    return new_mask

def input_function(image_paths, mask_paths, batch_size):
    images = tf.data.Dataset.from_tensor_slices(image_paths).map(tf.io.read_file).map(tf.image.decode_png)
    masks = tf.data.Dataset.from_tensor_slices(mask_paths).map(mask_processing)
    
    # Combine the image and mask datasets
    dataset = tf.data.Dataset.zip({"input": images, "output": masks})
    
    # Shuffle and batch the data
    dataset = dataset.shuffle(len(image_paths)).batch(batch_size)
    example = next(iter(dataset))
    image, mask = example[0], example[1]
    
    return image
    #return dataset

input_fn = lambda: input_function(train_images, train_masks, 32)


In [None]:
# Model architecture

initial=tf.keras.initializers.glorot_uniform(seed=0)

def conv_block(input_layer, n_filters, kernel=(3, 3), padding='same', strides=(1, 1), L2=0):
    layer = tf.keras.layers.Conv2D(n_filters, kernel, padding=padding, strides=strides, kernel_initializer = initial, kernel_regularizer=tf.keras.regularizers.l2(L2))(input_layer)
    layer = tf.keras.layers.BatchNormalization(center=False,scale=False)(layer)
    return tf.keras.layers.Activation('relu')(layer)

def conv_up(n_filters, pool_size=(2,2), kernel_size=(2,2), strides=(2, 2), L2=0):
    return tf.keras.layers.Conv2DTranspose(filters=n_filters, kernel_size=(2,2), kernel_initializer = initial, strides=strides, kernel_regularizer=tf.keras.regularizers.l2(L2))            

    
def gen_model(input_shape =  (1024, 2048, 3), pool_size=(2, 2),initial_learning_rate=1e-5,
                      depth=4, n_base_filters=16, activation_name="softmax", L2=0, n_classes=2):
        
        inputs = tf.keras.layers.Input(input_shape) #Declare input shape
        levels = list()
        current_layer = tf.keras.layers.Conv2D(n_base_filters, (1, 1), kernel_initializer = initial)(inputs) # initial input layer
        
        """ Down slope portion of U-net"""
        # add levels with max pooling
        for layer_depth in range(depth): #Creats 2 convolutional blocks per depth unit
            layer1 = conv_block(input_layer=current_layer, kernel=(3,3), n_filters=n_base_filters*(layer_depth+1), padding='same', L2=L2)
            layer2 = conv_block(input_layer=layer1, kernel=(3,3), n_filters=n_base_filters*(layer_depth+1), padding='same', L2=L2)
            if layer_depth < depth - 1: #If the current layer is less then the second to last down sampling
                #Apply a pooling layer
                current_layer = tf.keras.layers.MaxPooling2D(pool_size=(2,2))(layer2)
                levels.append([layer1, layer2, current_layer]) #layers are recorded for reference
            else: #If current layer is not less then the second to last, skip the pooling
                current_layer = layer2
                levels.append([layer1, layer2])

        """ Up slope portion of U-net"""
        for layer_depth in range(depth-2, -1, -1): #Going from the last layer up to 0
            
            up_convolution = conv_up(pool_size=(2,2), n_filters=n_base_filters*(layer_depth+2), L2=L2)(current_layer)
            
            concat = tf.keras.layers.concatenate([up_convolution, levels[layer_depth][1]] , axis=-1)
            current_layer = conv_block(n_filters=n_base_filters*(layer_depth+1), kernel=(3,3), input_layer=concat, padding='same', L2=L2)
            current_layer = conv_block(n_filters=n_base_filters*(layer_depth+1), kernel=(3,3), input_layer=current_layer, padding='same', L2=L2)
            
        final_convolution = tf.keras.layers.Conv2D(n_classes, (1, 1), kernel_initializer=initial)(current_layer)
        act = tf.keras.layers.Activation(activation_name)(final_convolution)
        model = tf.keras.Model(inputs=[inputs], outputs=act)
        return model

model = gen_model()
model.compile(loss=dice_score, optimizer=tf.keras.optimizers.Adam(learning_rate = 0), metrics=['acc'])

In [None]:
model.fit(input_function(train_images, train_masks, 32))

In [None]:
#Ensuring values of 8 correspond to sidewalk segmentations

#cities = ['berlin', 'bielefeld', 'bonn', 'leverkusen', 'mainz', 'munich']
#for i in cities:
i = 'cologne'
tImage = PIL.Image.open(f'./leftImg8bit/train/{i}/{i}_000100_000019_leftImg8bit.png')
tMask = PIL.Image.open(f'./gtFine/train/{i}/{i}_000100_000019_gtFine_labelIds.png')
Mask = np.array(tMask)
Image = np.array(tImage)

#Displaying mask and image pair, printing mask value for sidewalk location
plt.figure()
plt.imshow(Mask)
plt.figure()
plt.imshow(Image)
print(Mask[800,2000])