# INTRO

Imports and GDrive mounting

In [None]:
#@title IMPORTS !!!
import os
import random
import json
import keras
import time
from itertools import zip_longest
import tensorflow as tf
from tensorflow import keras
from keras import layers
from keras.preprocessing.image import load_img, img_to_array
from sklearn.model_selection import StratifiedKFold

import numpy as np

from google.colab import drive

def banner(
    text: str, *, 
    length: int = 65, 
    frame_char: str = '#'
) -> str:
	stext = ' %s ' % text
	mbanner = frame_char*2 + stext.center(length-4) + frame_char*2
	fframe = frame_char*length
	eframe = frame_char*2 + ' '*(length-4) + frame_char*2
	banner = f"{fframe}\n{eframe}\n{mbanner}\n{eframe}\n{fframe}\n\n"
	return banner


In [None]:
#@title !!! Drive mounting
folder_name = "224x224_full_6cat_data" #@param {type:"string"}
drive.mount("/content/drive")
base_path = f'/content/drive/My Drive/Studia/ML/{folder_name}/'
models_path = '/content/drive/My Drive/Studia/ML/models/'
print(base_path)

In [None]:
#@title !!! Dir content loading 
dir_files = os.listdir(base_path)
    class_file = list(filter(lambda file: "classes" in file, dir_files))[0]
    test_files = list(filter(lambda file: "test" in file, dir_files))
    train_files = list(filter(lambda file: "train" in file, dir_files))

    with open(os.path.join(base_path, class_file)) as fjson:
        classes = json.load(fjson)
    
    test_files.sort()
    train_files.sort()

print(banner("INITIAL DATA", length=default_line_length))
print(f"{model_path=}\n")
for trainf, testf in zip_longest(["TRAIN_FILES", train_files], ["TEST_FILES", test_files]):
    print(f'{trainf}\t\t{testf or ""}'.center(default_line_length))

print("CLASSES")
print(f"{classes}".center(default_line_length))
print(f'Checkpoint saves: {"True" if tmp_save else "False"}\n') 

# MODEL CREATION

Model building: layers, compilation and summary

`[CHOOSE ONE]`

## > Simple model

First: Train Model

```
[Layers]
1. Flatten
2. FC-128
3. FC-19
 
Input: 28x28 JPEG
 
In sum: 100 epochs per batch
dataset division: 80% training, 20% testing
```

In [None]:
#@title Create model
model = tf.keras.models.Sequential()

# 1st layer
# flatten output data
model.add(layers.Flatten(input_shape=(28, 28, 3)))

# 2nd layer
model.add(layers.Dense( 128, activation='relu'))
# 3rd layer
model.add(layers.Dense( len(classes.keys()) ))

# Compile model
model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy']
)
print(model.summary())

##  > Convolutional model

```
[Layers]
1. Conv2D_64
2. Maxpool2D, stride 2
3. Conv2D_128
4. Maxpool2D, stride 2
5. Flatten
6. FC-128
7. FC-X

Input: 28x28 JPEG
 
In sum: 100 epochs per batch
dataset division: 80% training, 20% testing
```

In [None]:
#@title Create model
model = tf.keras.models.Sequential()

# 1st layer
# Convolution2D output data 

model.add(layers.Convolution2D( 64,
          kernel_size=(3, 3),
          strides=(1, 1),
          padding='valid',
          input_shape=(28, 28, 3),
          activation='relu'
          
))
# 2nd layer
# MaxPooling2D output data 

model.add(layers.MaxPooling2D(pool_size=(2, 2)))

# 3rd layer
# Convolution2D output data 
model.add(layers.Convolution2D( 128,
          kernel_size=(3, 3),
          strides=(1, 1),
          padding='valid',
          activation='relu'
))
# 4th layer
# MaxPooling2D output data 

model.add(layers.MaxPooling2D(pool_size=(2, 2)))

# 5th layer
# flatten output data 
model.add(layers.Flatten())

# 6th layer
model.add(layers.Dense( 128, activation='relu'))

# 7th layer
model.add(layers.Dense( len(classes.keys()) ))

# Compile model
model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy']
)
print(model.summary())

## > Convolutional Model ++

```
[Layers]
1. Conv2D_64
2. Maxpool2D, stride 2
3. Conv2D_128
4. Maxpool2D, stride 2
3. Conv2D_256
4. Maxpool2D, stride 2
3. Conv2D_512
4. Maxpool2D, stride 2
5. Flatten
6. FC-4096
6. FC-128
7. FC-X
 
Input: 224x224 RGB JPEG
 
In sum: 100 epochs per batch
dataset division: 80% training, 20% testing
```

In [None]:
#@title Create model
model = tf.keras.models.Sequential(name="MODEL_BIQA_NASA")

# 1st layer
# Convolution2D output data 

model.add(layers.Convolution2D( 64,
        kernel_size=(3, 3),
        strides=(1, 1),
        padding='valid',
        activation='relu',
        name="Conv2D_64",
        input_shape=(None, 224, 224, 3),
))
# 2nd layer
# MaxPooling2D output data 

model.add(layers.MaxPooling2D(pool_size=(2, 2), name="MaxPool2D"))

# 3rd layer
# Convolution2D output data 
model.add(layers.Convolution2D( 128,
          kernel_size=(3, 3),
          strides=(1, 1),
          padding='valid',
          activation='relu',
          name="Conv2D_128"
))
# 4th layer
# MaxPooling2D output data 

model.add(layers.MaxPooling2D(pool_size=(2, 2), name="MaxPool2D"))

# 5th layer
# Convolution2D output data 
model.add(layers.Convolution2D( 256,
          kernel_size=(3, 3),
          strides=(1, 1),
          padding='valid',
          activation='relu',
          name="Conv2D_256"
))
# 6th layer
# MaxPooling2D output data 

model.add(layers.MaxPooling2D(pool_size=(2, 2), name="MaxPool2D"))

# 7th layer
# Convolution2D output data 
model.add(layers.Convolution2D( 512,
          kernel_size=(3, 3),
          strides=(1, 1),
          padding='valid',
          activation='relu',
          name="Conv2D_512"
))
# 8th layer
# MaxPooling2D output data 

model.add(layers.MaxPooling2D(pool_size=(2, 2), name="MaxPool2D"))

# 9th layer
# Convolution2D output data 
model.add(layers.Convolution2D( 1024,
          kernel_size=(3, 3),
          strides=(1, 1),
          padding='valid',
          activation='relu',
          name="Conv2D_1024"
))
# 10th layer
# MaxPooling2D output data 

model.add(layers.MaxPooling2D(pool_size=(2, 2),name="MaxPool2D"))

# 11th layer
# flatten output data 
model.add(layers.Flatten(name="Flatten"))

# 12th layer
model.add(layers.Dense( 4096, activation='relu', name="FC_4096"))

# 13th layer
model.add(layers.Dense( 128, activation='relu', name="FC_128"))

# 14th layer
model.add(layers.Dense( len(classes.keys()), name=f"FC_{len(classes.keys())}" ))

# 15th layer
model.add(layers.Softmax(name="Softmax"))

# Compile model
model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy']
)
print(model.summary())

# MODEL TRAINING AND EVALUATION

Loading data in batches and fitting and evaluating

## GUIDED WALKTHROUGH

=== BATCH LOOP (in iter form) ===

In [None]:
#@title Create batch-iterator
itbatch = iter(numpy_batches)

In [None]:
#@title Take next file
batch = next(itbatch)

In [None]:
#@title Load data to variables from file 
with np.load(os.path.join(base_path, batch)) as data_batch:
    x = data_batch['dataset']
    y = data_batch['labels']
print(f"FILE: {batch} => dataset: {x.shape}, labels: {y.shape}, classes: {classes}")

Load batch into shuffler to distinguish training and testing dataset

In [None]:
#@title Make StratifiedKFold data divider and create iterator for folds
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=1)
skfiter = skf.split(x, y)

In [None]:
#@title Take next fold and split data into particular sets [training, testing]
train_indices, test_indices = next(skfiter)

x_train, x_test = x[train_indices], x[test_indices]
y_train, y_test = y[train_indices], y[test_indices]
print(f"TRAIN: {train_indices[:10]} & TEST: {test_indices[:10]}")

In [None]:
#@title Train model
model.fit(x=x_train, y=y_train, epochs=20, verbose=2)

In [None]:
#@title Evaluate model after training cycle
score = model.evaluate(x_test, y_test, verbose=1)

## ONE BLOCK TRAINING/VALIDATING LOOP

In [None]:
#@title Start training
    score = []
    
    print(banner("TRAINING", length=default_line_length))
    
    start = time.time()
    epoch_num = 16
    for bnum, batch in enumerate(train_files):
        batch_time = time.time()
        with np.load(os.path.join(base_path, batch)) as data_batch:
    # Loading from *.npz
            x = data_batch['dataset']
            y = data_batch['labels']
        print(f"┏━ FILE: {batch} :\n┃ train_set:\t{x.shape} » train_labels:\t{y.shape}")
        print("┃ "+"_"*63)
    # Splitting for training and evaluation
        skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=1)

        n = 0
        for train_indices, test_indices in skf.split(x, y):    
            x_train, x_test = x[train_indices], x[test_indices]
            y_train, y_test = y[train_indices], y[test_indices]
            n += 1
    # Training..
            training = time.time()
            fit_loss, fit_accuracy = model.fit(x=x_train, y=y_train, batch_size=96, epochs=epoch_num, verbose=0)
            print(f"┃ ⤷Fold_{n} training ({epoch_num} epochs): {round(time.time() - training)} seconds..")
            print(f"┃ \t↻Fold_{n} training: {fit_loss=}, {fit_accuracy=}")
    # Evaluating..
            eval_loss, eval_accuracy = model.evaluate(x_test, y_test, verbose=0)
            print(f"┃ \t↺Fold_{n} evaluation: {eval_loss=:.2f}, {eval_accuracy=:.2f}")
            score.append({
                "fit_loss": fit_loss,
                "fit_accuracy": fit_accuracy,
                "eval_loss": eval_loss,
                "eval_accuracy": eval_accuracy
            })
    # Ending..
        print(f'┗━ FILE: {batch} time: {time.strftime("%H h %M min %S sec", time.gmtime(round(time.time() - batch_time)))}')
        if tmp_save:
            checkpoint = f"{model_path}{bnum:02}"
            model.save(checkpoint)
            print(f"🗘 Model saved as: {checkpoint}".center(default_line_length))
        print("="*default_line_length)
    # Summary time spent training and evaluating
    print(f'⇶ Full training took: {time.strftime("%H h %M min %S sec", time.gmtime(round(time.time() - start)))}')
    
    """ ~~~~ SAVE MODEL ~~~~ """
    model.save(model_path)
    print(f"⮔ Model saved as: {model_path}\n".center(default_line_length))

    # Save score for plotting
    with open(os.path.join(base_path, f'{model_name}_train_stats.json'), 'w') as fjson:
        json.dump(score, fjson)

    """ ~~~~ TEST MODEL'S ACCURACY ~~~~ """
    print(banner("TESTING", length=default_line_length))

    nasa_predictions = []
    nasa_labels = []
    nature_predictions = []
    nature_labels = []

    for test_file in test_files:
    # Loading from *.npz
        with np.load(os.path.join(base_path, test_file)) as test_batch:
            nasa_dataset    = test_batch["nsdtest"]
            nature_dataset  = test_batch["ntdtest"]
    # Storing real labels
            nasa_labels.extend(test_batch["nsltest"])
            nature_labels.extend(test_batch["ntltest"])
    # Predicting labels and storing
            nasa_predictions.extend(model.predict(nasa_dataset))
            nature_predictions.extend(model.predict(nature_dataset))
          
    # Save data for plotting
    np.savez(os.path.join(base_path, f'{model_name}_stats.npz'),
        nasa_predictions=nasa_predictions,
        nasa_labels=nasa_labels,
        nature_predictions=nature_predictions,
        nature_labels=nature_labels
    )

# SAVE / LOAD MODEL

In [None]:
#@title SAVE MODEL
model_name = "simple_fc_model" #@param {type:"string"}
model.save(os.path.join(models_path, model_name))

In [None]:
#@title LOAD MODEL
model_name = "simple_fc_model" #@param {type:"string"}
model = keras.models.load_model(os.path.join(models_path, model_name))

# MODEL TESTING

We have now a model! Lets test its accuracy.

In [None]:
#@title Load necessary functions to test model
### Matplotlib functions to plot image
import matplotlib.pyplot as plt
classes_reversed = list(classes.keys())

def plot_image(i, predictions_array, true_label, img):
    true_label, img = true_label[i], img[i]
    plt.grid(False)
    plt.xticks([])
    plt.yticks([])

    plt.imshow(img, cmap=plt.cm.binary)

    predicted_label = np.argmax(predictions_array)
    if predicted_label == true_label:
        color = 'blue'
    else:
        color = 'red'

    plt.xlabel("{} {:2.0f}% ({})".format(classes_reversed[predicted_label],
                                100*np.max(predictions_array),
                                classes_reversed[true_label]),
                                color=color)
  
def plot_value_array(i, predictions_array, true_label):
    true_label = true_label[i]
    plt.grid(False)
    plt.xticks(range(6))
    plt.yticks([])
    thisplot = plt.bar(range(6), predictions_array, color="#777777")
    plt.ylim([0, 1])
    predicted_label = np.argmax(predictions_array)

    thisplot[predicted_label].set_color('red')
    thisplot[true_label].set_color('blue')

In [None]:
# with np.load(os.path.join(base_path, numpy_batches[0])) as data:
#     x = data['dataset']
#     y = data['labels']

# prob_model = tf.keras.Sequential([model, layers.Softmax()])

predictions = prob_model.predict(x)

### see results
i = random.randint(0, len(x)-1)
plt.figure(figsize=(6,3))
plt.subplot(1,2,1)
plot_image(i, predictions[i], y, x)
plt.subplot(1,2,2)
plot_value_array(i, predictions[i],  y)
_ = plt.xticks(range(len(classes)), classes_reversed, rotation=90)
plt.show()