<a href="https://colab.research.google.com/github/HenningBuhl/DLKTF/blob/master/Aufgabe%202/KI_Praktikum_Aufgabe_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# KI Praktikum - Aufgabe 2

# TODO

In [0]:
'''

TODO:
- Implement Batch Gradient Descent
- Implement Dropout
- Implement Momentum Gradient Descent
- Implement Gradient Constraint

'''

# Imports

In [0]:
# Imports.
import os
import cv2
import itertools

import numpy as np
import matplotlib.pyplot as plt

from keras.datasets import mnist
from keras.utils import to_categorical
from keras.preprocessing.image import ImageDataGenerator

from sklearn.metrics import accuracy_score
from sklearn.preprocessing import MinMaxScaler

!pip install googledrivedownloader
from google_drive_downloader import GoogleDriveDownloader as gdd

# Neural Network Class

In [0]:
class NeuralNetwork():
    '''
    Contains basic neural network functionality.
    '''

    def __init__(self, layer_sizes=[], epsilon=1e-6):
        '''
        Creates a new instance of the class.

        Args:
            layer_sizes: An array containing the number of neurons in each
            layer.
            epsilon: Value used to provide numerical stability.
        
        Returns:
            A new instance of the class.
        '''
        
        # Infer information from input parameter(s).
        self.layer_sizes = layer_sizes
        self.input_size = self.layer_sizes[0]
        self.output_size = self.layer_sizes[-1]
        self.num_layers = len(self.layer_sizes)
        self.num_parameters = np.sum(
            [(self.layer_sizes[i] + 1) * self.layer_sizes[i+1]
            for i in range(self.num_layers - 1)])

        # Numerical stability.
        self.epsilon = epsilon

        # Initialize the network parameters.
        self.weights = [np.random.normal( # Xavier initialization.
            0, # Mean.
            (1 / (self.layer_sizes[i-1] + self.layer_sizes[i]))**(1 / 2), # Std.
            (self.layer_sizes[i-1], self.layer_sizes[i])) # Shape
            for i in range(1, self.num_layers)]
        self.biases = [np.zeros( # Zero
            (self.layer_sizes[i])) # Shape.
            for i in range(1, self.num_layers)]


    def sigmoid(self, x):
        '''
        Returns the sigmoid activation function.

        Args:
            x: The input.
        
        Returns:
            The sigmoid activation function.
        '''

        # Calculate the sigmoid activation function.
        sigmoid = 1 / (1 + np.exp(-x))

        # Return the sigmoid activation function.
        return sigmoid


    def sigmoid_prime(self, x):
        '''
        Returns the first derivative of the sigmoid activation function.

        Args:
            x: The input.
        
        Returns:
            The first derivative of the sigmoid activation function.
        '''

        # Calculate the sigmoid activation function.
        sigmoid = self.sigmoid(x)

        # Calculate sigmoid prime.
        sigmoid_prime = sigmoid * (1 - sigmoid)
        
        # Return sigmoid prime.
        return sigmoid_prime


    def softmax(self, x):
        '''
        Return the softmax activation function.

        Args:
            x: The input.
        
        Returns:
            The softmax activation function.
        '''

        # Calculate the exponentials (numerically safe).
        e_x = np.exp(x - np.max(x))

        # Calculate the softmax activation function.
        softmax = e_x / np.maximum(self.epsilon, e_x.sum())

        # Return the softmax activation function.
        return softmax


    def cross_entropy(self, y, y_hat):
        '''
        Return the cross entropy loss function.

        Args:
            y: The labels.
            y_hat: The predictions.
        
        Returns:
            The cross entropy loss.
        '''

        # Clip y_hat for numerical stability.
        y_hat = np.clip(y_hat, self.epsilon, 1 - self.epsilon)

        # Calculate cross entropy loss.
        ce_loss = -np.sum(y * np.log(y_hat))

        # Return cross entropy loss.
        return ce_loss


    def cross_entropy_prime(self, y, y_hat):
        '''
        Return the first derivative of the cross entropy loss function.

        Args:
            y: The labels.
            y_hat: The predictions.
        
        Returns:
            The first derivative of the cross entropy loss.            
        '''

        # Calculate cross entrioy prime.
        ce_prime = y_hat - y

        # Return cross entropy prime.
        return ce_prime #/ (y_hat * (1 - y_hat)) # MATHEMATICAL SHORTCUT


    def predict(self, x, return_all=False):
        '''
        Return the output of the neural network.

        Args:
            x: The input.
            return_all: If True, return pre- and post-activations and y_hat,
            else only y_hat (default False).

        Returns:
            The output of the neural network.
        '''

        # Initialize pre- and post-activation values.
        z = [x] # Dummy entry so a and z can be indexed with the same index.
        a = [x] # The first layers' activations are x.

        # Calculate the layers output with the output of the previous layer.
        for i in range(1, self.num_layers):
            z.append(a[-1] @ self.weights[i-1] + self.biases[i-1])
            if i == self.num_layers - 1: # Last layer.
                a.append(self.softmax(z[-1]))
            else: # Any other layer.
                a.append(self.sigmoid(z[-1]))
        
        # If return_all=True, return pre- and post-activations.
        if return_all:
            return z, a, a[-1]
        else:
            return a[-1]


    def fit(self, x, y, learning_rate=0.1, weight_decay=0, momentum=0):
        '''
        Perform gradient descent.

        Args:
            x: The input.
            y: The label.
            learning_rate: The learning rate (default 0.1).
            weight_decay: The weight decay coefficient (default 0).
            momentum: The momentum term used in the deltas (default 0).
        
        Returns:
            The cost.
        '''

        # Perform a feed forward.
        z, a, y_hat = self.predict(x, return_all=True)

        # Calculate the error for each layer.
        errors = [self.cross_entropy_prime(y, y_hat)] #* self.sigmoid_prime(z[-1])] # MATHEMATICAL SHORTCUT
        for i in reversed(range(1, self.num_layers - 1)):
            errors.append(self.weights[i] @ errors[-1] * self.sigmoid_prime(z[i]))

        # Calculate the gradients and deltas, immediately apply the deltas.
        for i in range(self.num_layers - 1):
            self.weights[i] -= learning_rate * np.outer(a[i], errors[-(i+1)]) + weight_decay * self.weights[i]
            self.biases[i] -= learning_rate * errors[-(i+1)]

        # Return the cost.
        return self.cross_entropy(y, y_hat)


    def evaluate(self, x, y):
        '''
        Return the loss.

        Args:
            x: The inputs.
            y: The labels.
        
        Returns:
            The loss.
        '''

        # Perform a feed forward on the input.
        y_hat = np.array([self.predict(x_in) for x_in in x])

        # Calculate the losses for each output.
        losses = np.array([self.cross_entropy(y[i], y_hat[i]) for i in range(y_hat.shape[0])])

        # Calculate the averge loss over all examples.
        loss = np.mean(losses)

        # Return the loss.
        return loss


    def accuracy(self, x, y):
        '''
        Returns the accuracy.

        Args:
            x: The inputs.
            y: The labels.
        
        Returns:
            The accuracy.
        '''

        # Perform a feed forward on the input.
        y_hat = np.array([self.predict(x_in) for x_in in x])

        # Calculate the accuary.
        acc = accuracy_score(np.argmax(y, axis=1), np.argmax(y_hat, axis=1))

        # Return the accuracy.
        return acc

# Helper Functions

In [0]:
def shuffle_data(x, y):
    '''
    Shuffle the arguments the same way.
    
    Args:
        x: An array.
        y: An array.
    
    Returns:
        Shuffled array (same way).
    '''
    
    indices = np.arange(x.shape[0])
    np.random.shuffle(indices)
    x, y = x[indices], y[indices]
    return x, y

# Hyper Parameters

In [0]:
# Not (all) used yet.
use_data_augmentation = True
iterations = 100_000
batch_size = 1
learning_rate = 0.001

# Alien Symbols

## Data

### Fetch Data

In [0]:
# Data path.
base_dir = "./data/"

In [0]:
# Label dictionary.
label_dict = {}

In [0]:
# Download data
gdd.download_file_from_google_drive(file_id='1hidaKlWV_tLnNcabiVQIzfW_8tH54gy3',
                                    dest_path= base_dir + 'data.zip',
                                    unzip=True)

In [0]:
# Meta data.
x_dim = 25
y_dim = 25
channels = 1

In [0]:
x, y = [], [] # Inputs, labels.

# Read data from disk.
dirs = [os.path.join(base_dir, dir) for dir in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, dir))] # All folders.
for target_index, (dir) in enumerate(dirs): # For all folders.
    files = [os.path.join(dir, f) for f in os.listdir(dir)] # All files.
    for f in files: # For all files.
        im = cv2.imread(f, 0) # mode=0 means grayscale.
        im = im.reshape(-1) # Flatten image.
        x.append(im)
        y.append(target_index)
    label_dict[target_index] = dir.split('/')[-1]

# Convert to numpy array.
x = np.asarray(x)
y = np.asarray(y)

# labels to categorical.
y = to_categorical(y)

In [0]:
# Shuffle  data.
x, y = shuffle_data(x, y)

In [0]:
print(f'x: [min: {np.min(x)}, max: {np.max(x)}]')

In [0]:
print(x.shape)
print(y.shape)

### Preprocess Data

In [0]:
scaler = MinMaxScaler()

# Normalize data horizontally.
x = scaler.fit_transform(x.T).T

# Normalize data vertically.
#x = scaler.fit_transform(x)

In [0]:
print(f'x: [min: {np.min(x)}, max: {np.max(x)}]')

In [0]:
print(x.shape)

In [0]:
# Training, validation and test set ratio.
train_split = 0.8
validation_split = 0.2

# Calculate split indices.
train_split_idx = int(x.shape[0] * train_split)
validation_split_idx = int(train_split_idx * validation_split)

# Create training set.
x_train = x[:train_split_idx]
y_train = y[:train_split_idx]

# Create testing set.
x_test = x[train_split_idx:]
y_test = y[train_split_idx:]

# Normalize data.
#scaler.fit(x_train)
#x_train = scaler.transform(x_train)
#x_test = scaler.transform(x_test)

# Create validation set.
x_val = x_train[:validation_split_idx]
y_val = y_train[:validation_split_idx]

# Re-Create training set.
x_train = x_train[validation_split_idx:]
y_train = y_train[validation_split_idx:]

In [0]:
print(f'Train: [min: {np.min(x_train)}, max: {np.max(x_train)}]')
print(f'Val:   [min: {np.min(x_val)}, max: {np.max(x_val)}]')
print(f'Test:  [min: {np.min(x_test)}, max: {np.max(x_test)}]')

In [0]:
print(x_train.shape)
print(x_test.shape)
print(x_val.shape)

In [0]:
# Shuffle  data.
x_train, y_train = shuffle_data(x_train, y_train)
x_test, y_test = shuffle_data(x_test, y_test)
x_val, y_val = shuffle_data(x_val, y_val)

### Visualize Data

In [0]:
# Inspect images manually.
index = np.random.randint(0, x_train.shape[0])
plt.imshow(x_train[index].reshape(x_dim, y_dim), cmap='gray', clim=(0, 1))
plt.title(f'Class: {label_dict[np.argmax(y_train[index])]}')
plt.show()

### Data Augmentation

In [0]:
train_datagen = ImageDataGenerator(
    featurewise_center=False, # Default: False
    samplewise_center=False, # Default: False
    featurewise_std_normalization=False, # Default: False
    samplewise_std_normalization=False, # Default: False
    zca_whitening=False, # Default: False
    zca_epsilon=1e-06, # Default: 1e-06
    rotation_range=20, # Default: 0
    width_shift_range=0.05, # Default: 0.0
    height_shift_range=0.05, # Default: 0.0
    brightness_range=None, # Default: None
    shear_range=0.05, # Default: 0.0
    zoom_range=0.05, # Default: 0.0
    channel_shift_range=0.05, # Default: 0.0
    fill_mode='nearest', # Default: 'nearest'
    cval=0.0, # Default: 0.0
    horizontal_flip=False, # Default: False
    vertical_flip=False, # Default: False
    rescale=None, # Default: None
    preprocessing_function=None, # Default: None
    data_format=None, # Default: None
    validation_split=0.0, # Default: 0.0
    dtype=None # Default: None
)

train_datagen.fit(
    x=x_train.reshape(-1, x_dim, y_dim, channels),
    augment=False, # Default: False
    rounds=1, # Default: 1
    seed=None # Default: None
)

train_iterator = train_datagen.flow(
    x=x_train.reshape(-1, x_dim, y_dim, channels),
    y=y_train, # Default: None
    batch_size=1, # Default: 32
    shuffle=True, # Default: True
    sample_weight=None, # Default: None
    seed=None, # Default: None
    save_to_dir=None, # Default: None
    save_prefix='', # Default: ''
    save_format='png', # Default: 'png'
    subset=None # Default: None
)

In [0]:
x_batch, y_batch = train_iterator.next()
plt.imshow(x_batch[0].reshape(x_dim, y_dim) ,cmap='gray', clim=(0, 1))
plt.title(f'Class: {label_dict[np.argmax(y_batch[0])]}')
plt.show()

## Model

In [0]:
input_neurons = x_dim * y_dim
output_neurons = len(label_dict)
layer_sizes = [input_neurons, output_neurons]

In [0]:
nn = NeuralNetwork(layer_sizes) # Adjust for perceptron vs MLP.
print(f'Number of parameters: {nn.num_parameters}')

## Training

In [0]:
# Dictionary to log metrics.
history = {'train_loss' : [],
           'train_acc' : [],
           'val_loss' : [],
           'val_acc' : []}

# Training parameters.
iterations = 100_000
report_iterations = 500
learning_rates = np.linspace(0.01, 0.0001, iterations)
weight_decays = np.linspace(0.000001, 0.00001, iterations)

# Running losses list.
losses = []

# Training loop.
for i in range(iterations):
    # Get training examples.
    if use_data_augmentation:
        x_batch, y_batch = train_iterator.next()
        x_batch = x_batch.reshape(x_dim * y_dim)
        y_batch = y_batch.reshape(5)
    else:
        index = int(np.random.uniform(0, x_train.shape[0])) # Random sample.
        x_batch, y_batch = x_train[index,:], y_train[index]

    # Train on examples.
    loss = nn.fit(x_batch,
                  y_batch,
                  learning_rate=learning_rates[i],
                  weight_decay=weight_decays[i])
    losses.append(loss)

    # Report every n iterations.
    if not i % report_iterations:
        # Calculate metrics.
        train_loss = nn.evaluate(x_train, y_train)
        train_acc = nn.accuracy(x_train, y_train)
        val_loss = nn.evaluate(x_val, y_val)
        val_acc = nn.accuracy(x_val, y_val)

        # Debug metrics.
        print(f'Iternation: {i:{int(np.log10(iterations))}d} ' \
              f'[Running loss: {np.mean(np.array(losses)):8.5f}] ' \
              f'[Train loss: {train_loss:8.5f}, acc: {train_acc * 100:6.2f}%] ' \
              f'[Val loss: {val_loss:8.5f} acc: {val_acc * 100:6.2f}%]')

        # Log metrics.
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)

        # Reset running losses.
        losses = []

In [0]:
# Plot loss.
plt.plot(history['train_loss'], label='Train Loss')
plt.plot(history['val_loss'], label='Val Loss')
plt.title('Train and Val Loss')
plt.legend(loc='best')
plt.show()

In [0]:
# Plot accuracy.
plt.plot(history['train_acc'], label='Train Accuracy')
plt.plot(history['val_acc'], label='Val Accuracy')
plt.title('Train and Val Accuracy')
plt.legend(loc='best')
plt.show()

In [0]:
# Final results.
train_loss = nn.evaluate(x_train, y_train)
train_acc = nn.accuracy(x_train, y_train)

val_loss = nn.evaluate(x_val, y_val)
val_acc = nn.accuracy(x_val, y_val)

test_loss = nn.evaluate(x_test, y_test)
test_acc = nn.accuracy(x_test, y_test)

# Print final results.
print(f'Train loss: {train_loss:8.5f}, acc: {train_acc * 100:6.2f}%')
print(f'Val loss:   {val_loss:8.5f}, acc: {val_acc * 100:6.2f}%')
print(f'Test loss:  {test_loss:8.5f}, acc: {test_acc * 100:6.2f}%')

# Playground

# Old Code

# Testing

## Data

In [0]:
# Dummy data.
x = np.array([0.24, 0, 0.84, 0.46, 0.72])
y = np.array([1, 0, 0, 0, 0, 0, 0, 0, 0])

## Model

In [0]:
# Create neural network.
nn = NeuralNetwork([5, 30, 50, 70, 9])
print(f'Number of parameters: {nn.num_parameters}')

## Activations

In [0]:
z, a, y_hat = nn.predict(x, return_all=True)

In [0]:
for z_ in z:
    print(z_.shape)

In [0]:
for a_ in a:
    print(a_.shape)

In [0]:
print(y_hat.shape)

## Training

In [0]:
# Test when training on a single data point.
for i in range(10):
    loss = nn.fit(x, y, learning_rate=0.1)
    print(f'Epoch: {i:6d}, Loss: {loss:14.12f}')

# MNIST

## Data

In [0]:
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.reshape(-1, 28 * 28)
x_test = x_test.reshape(-1, 28 * 28)

In [0]:
x_train = x_train / 255
x_test = x_test / 255

y_train = to_categorical(y_train)
y_test = to_categorical(y_test)

In [0]:
print(f'x_train shape: {x_train.shape}')
print(f'y_train shape: {y_train.shape}')
print(f'x_test shape: {x_test.shape}')
print(f'y_test shape: {y_test.shape}')

In [0]:
print(f'x_train, min: {np.min(x_train)}, max: {np.max(x_train)}')
print(f'x_test, min: {np.min(x_test)}, max: {np.max(x_test)}')

## Model

In [0]:
nn = NeuralNetwork([28 * 28, 300, 10])
print(f'Number of parameters: {nn.num_parameters}')

## Training

In [0]:
losses = []
for i in range(100_000):
    index = int(np.random.uniform(0, x_test.shape[0]))
    loss = nn.fit(x_test[index,:], y_test[index], learning_rate=0.01)
    losses.append(loss)
    
    if not i % 500:
        acc = nn.accuracy(x_test, y_test)
        print(f'Loss: {np.mean(np.array(losses)):6.4f}, Accuracy: {acc * 100:5.2f}%')
        losses = []