# Environment setup

In [None]:
import tensorflow as tf
import os

try:
  tpu = tf.distribute.cluster_resolver.TPUClusterResolver()  # TPU detection
  print('Running on TPU ', tpu.cluster_spec().as_dict()['worker'])
  tf.config.experimental_connect_to_cluster(tpu)
  tf.tpu.experimental.initialize_tpu_system(tpu)
  tpu_strategy = tf.distribute.TPUStrategy(tpu)
  tpu_worker = os.environ['COLAB_TPU_ADDR'].replace('8470', '8466') 
  print(tf.profiler.experimental.client.monitor(tpu_worker,1))
except ValueError:
  print('ERROR: Not connected to a TPU runtime; please see the previous cell in this notebook for instructions!')
  #raise BaseException('ERROR: Not connected to a TPU runtime; please see the previous cell in this notebook for instructions!')


In [None]:
!apt install --allow-change-held-packages libcudnn8=8.1.0.77-1+cuda11.2
!pip install opencv-python==4.5.5.64
!pip install opencv-contrib-python
!pip install tensorflow_addons
!pip install vit-keras
!pip3 install tf-models-official
!pip install -q -U keras-tuner
!pip install focal-loss

!pip uninstall tensorflow --yes
!pip install tensorflow==2.8.1

In [None]:
from google.colab import auth
auth.authenticate_user()

project_id = 'stiarnet-v2'

!gcloud config set project {project_id}

In [None]:
!gsutil ls -al gs://

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Imports

In [None]:
import os
import gc
import re
import ast
import time
import math
import numpy as np
from tqdm import tqdm
import seaborn as sns
from tqdm import trange
import multiprocessing
import tensorflow as tf
import keras_tuner as kt
import typing_extensions as tx
import matplotlib.pyplot as plt
from vit_keras import vit, utils
from sklearn.metrics import accuracy_score
from focal_loss import SparseCategoricalFocalLoss

from official.vision.configs import video_classification
from official.projects.movinet.configs import movinet as movinet_configs
from official.projects.movinet.modeling import movinet
from official.projects.movinet.modeling import movinet_layers
from official.projects.movinet.modeling import movinet_model
from official.projects.movinet.tools import export_saved_model


from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score

In [None]:
tf.__version__, np.__version__

In [None]:
AUTO = tf.data.experimental.AUTOTUNE

In [None]:
# NEW on TPU in TensorFlow 24: shorter cross-compatible TPU/GPU/multi-GPU/cluster-GPU detection code

try: # detect TPUs
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver.connect() # TPU detection
    strategy = tf.distribute.TPUStrategy(tpu)
except ValueError: # detect GPUs
    #strategy = tf.distribute.MirroredStrategy() # for GPU or multi-GPU machines
    strategy = tf.distribute.get_strategy() # default strategy that works on CPU and single GPU
    #strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() # for clusters of multi-GPU machines

print("Number of accelerators: ", strategy.num_replicas_in_sync)

# FLAGS

In [None]:
tf.random.set_seed(42)

CLASS_MAP = {'IS': 0 , 'ISLG': 1, 'LG': 2, 'LGIS': 3}       

GCS_PATH = 'gs://stairnet-v2/StairNet_Video_splits/'
print(GCS_PATH)


TRAIN_FILENAMES = tf.io.gfile.glob(GCS_PATH + 'train_*.tfrecord')
print(TRAIN_FILENAMES)
VAL_FILENAMES = tf.io.gfile.glob(GCS_PATH + 'val_*.tfrecord')
print(VAL_FILENAMES)
TEST_FILENAMES = tf.io.gfile.glob(GCS_PATH + 'test_*.tfrecord') # predictions on this dataset should be submitted for the competition
print(TEST_FILENAMES)


# set the number of epochs for the run                        
EPOCHS = 10

# set the initial learning rate
BASE_LR = 0.00001

# 8 TPU cores, so 16 will be 128
BATCH_SIZE = 16 * strategy.num_replicas_in_sync
BATCH_SIZE_VAL = 32 * strategy.num_replicas_in_sync

#To study the effect of transfer training:
FINE_TUNE_BUFFER = -1

# Buffer size of the dataset
BUFFER_SIZE = 100

#Change Droprate 
DROPOUT_VALUE = 0.2

# For hyperparameter optimization set to 144 
# For final run set to 'None'
SEED_NUMBER = 144 

SEQ_LEN = 5
INIT_IMAGE_SIZE = 256
IMAGE_CROP_SIZE = 256
NUM_CHANNELS = 3

ENCODER_NAME = 'efficient_b0' # ['movinet', 'mobilevit', 'vit', 'vgg', 'mobilenetv2', 'efficient_b0']
TEMPORAL_MODEL = 'lstm' # ['transformer', 'lstm']
MANY2ONE = True

OPTIMIZER = 'adam'

EVALUATE_INFERENCE_SPEED = False
CHECK_UPSAMPLE_DISTRBUTION = False

In [None]:
len(TRAIN_FILENAMES), len(VAL_FILENAMES), len(TEST_FILENAMES)

# Dataset

In [None]:
def decode_image_seq(image_data):
    ''' reading sequence from bytes array '''
    image = tf.io.decode_raw(image_data, 'float64')
    image = tf.reshape(image, [SEQ_LEN, INIT_IMAGE_SIZE, INIT_IMAGE_SIZE, NUM_CHANNELS]) # explicit size needed for TPU
    image = tf.cast(image, tf.float32) / 255.
    image = tf.image.random_crop(value=image, size=(SEQ_LEN, IMAGE_CROP_SIZE, IMAGE_CROP_SIZE, NUM_CHANNELS))
    return image

def convert_str_label(tf_id):
    ''' casting bytes to integer label '''
    _id = tf_id.numpy().decode('utf-8')
    _id = ast.literal_eval(_id)
    labels = [CLASS_MAP[el] for el in _id][-1]
    return tf.cast(labels, tf.int32)

def read_labeled_tfrecord(example):
    ''' reading sample from tfrecord '''
    LABELED_TFREC_FORMAT = {
        "image": tf.io.FixedLenFeature([], tf.string), # tf.string means bytestring
        "label": tf.io.FixedLenFeature([], tf.string),  # shape [] means single element
    }
    example = tf.io.parse_single_example(example, LABELED_TFREC_FORMAT)
    image_seq = decode_image_seq(example['image'])
    label = tf.io.decode_raw(example['label'], 'int32')[-2]
    return image_seq, label # returns a dataset of image(s)

In [None]:
def load_dataset(filenames, ordered=False):
    # Read from TFRecords. For optimal performance, reading from multiple files at once and
    # disregarding data order. Order does not matter since we will be shuffling the data anyway.

    ignore_order = tf.data.Options()
    if not ordered:
        ignore_order.experimental_deterministic = False # disable order, increase speed

    dataset = tf.data.TFRecordDataset(filenames, num_parallel_reads=AUTO) # automatically interleaves reads from multiple files
    dataset = dataset.with_options(ignore_order) # uses data as soon as it streams in, rather than in its original order
    dataset = dataset.map(read_labeled_tfrecord, num_parallel_calls=AUTO)
    # returns a dataset of (image, label) pairs if labeled=True or (image, id) pairs if labeled=False
    return dataset

In [None]:
def get_validation_dataset(repeated=False, ordered=False, distributed=True):
    dataset = load_dataset(VAL_FILENAMES)
    if repeated:
      dataset = dataset.repeat(EPOCHS)
      # for hyperparameter testing using random seed to shuffle the data the same to elimite this variable from results
      dataset = dataset.shuffle(buffer_size=BUFFER_SIZE, seed=SEED_NUMBER, reshuffle_each_iteration=None)
    dataset = dataset.batch(BATCH_SIZE_VAL, drop_remainder=repeated)
    #dataset = dataset.cache()
    dataset = dataset.prefetch(AUTO) # prefetch next batch while training (autotune prefetch buffer size)
    if distributed:
        dataset = strategy.experimental_distribute_dataset(dataset)
    return dataset

def get_train_dataset():
    dataset = load_dataset(TRAIN_FILENAMES)
    dataset = dataset.repeat()
    # for hyperparameter testing using random seed to shuffle the data the same to elimite this variable from results
    dataset = dataset.shuffle(buffer_size=BUFFER_SIZE, seed=SEED_NUMBER, reshuffle_each_iteration=None)
    dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
    #dataset = dataset.cache()
    dataset = dataset.prefetch(AUTO) # prefetch next batch while training (autotune prefetch buffer size)
    dataset = strategy.experimental_distribute_dataset(dataset)
    return dataset

def get_test_dataset():
    dataset = load_dataset(TEST_FILENAMES)
    dataset = dataset.batch(BATCH_SIZE_VAL)
    dataset = dataset.prefetch(AUTO) # prefetch next batch while training (autotune prefetch buffer size)
    return dataset

In [None]:
def count_data_items(filenames):
    n = [int(re.compile(r"-([0-9]*)\.").search(filename).group(1)) for filename in filenames]
    return np.sum(n)

In [None]:
NUM_TRAINING_IMAGES = 426177 
NUM_VALIDATION_IMAGES = 32487 
NUM_TEST_IMAGES = 56729
STEPS_PER_EPOCH = NUM_TRAINING_IMAGES // BATCH_SIZE
VALIDATION_STEPS = NUM_VALIDATION_IMAGES // BATCH_SIZE # The "-(-//)" trick rounds up instead of down :-)
TEST_STEPS = NUM_TEST_IMAGES // BATCH_SIZE            # The "-(-//)" trick rounds up instead of down :-)
print('Dataset: {} training images, {} validation images, {} test images'.format(
    NUM_TRAINING_IMAGES, NUM_VALIDATION_IMAGES, NUM_TEST_IMAGES))
print('Dataset: {} training steps, {} validation steps, {} test steps'.format(
    STEPS_PER_EPOCH, VALIDATION_STEPS, TEST_STEPS))

In [None]:
# Get labels and their countings
from collections import Counter

def get_training_dataset_raw():
    dataset = load_dataset(TRAIN_FILENAMES, ordered=False)
    return dataset

raw_training_dataset = get_training_dataset_raw() # default dataset 

label_counter = Counter()
for images, labels in raw_training_dataset:
    label_counter.update([labels.numpy()])

del raw_training_dataset    
    
label_counting_sorted = label_counter.most_common()

NUM_TRAINING_IMAGES = sum([x[1] for x in label_counting_sorted])
print("number of examples in the original training dataset: {}".format(NUM_TRAINING_IMAGES))

print("labels in the original training dataset, sorted by occurrence")
print(label_counting_sorted)

In [None]:
# We want each class occur at least (approximately) `TARGET_MIN_COUNTING` times

TARGET_MIN_COUNTING = 300000

def get_num_of_repetition_for_class(class_id):
    counting = label_counter[class_id]
    if counting >= TARGET_MIN_COUNTING:
        return 1.0
    num_to_repeat = TARGET_MIN_COUNTING / counting
    return num_to_repeat

numbers_of_repetition_for_classes = {class_id: get_num_of_repetition_for_class(class_id) for class_id in range(4)}

print("number of repetitions for each class (if > 1)")
{k: v for k, v in sorted(numbers_of_repetition_for_classes.items(), key=lambda item: item[1], reverse=True) if v > 1}

In [None]:
keys_tensor = tf.constant([k for k in numbers_of_repetition_for_classes])
vals_tensor = tf.constant([numbers_of_repetition_for_classes[k] for k in numbers_of_repetition_for_classes])
table = tf.lookup.StaticHashTable(tf.lookup.KeyValueTensorInitializer(keys_tensor, vals_tensor), -1)

def get_num_of_repetition_for_example(train_sample):
  ''' counting number of samples that share the same label '''
  _, label  = train_sample
  num_to_repeat = table.lookup(label)
  num_to_repeat_integral = tf.cast(int(num_to_repeat), tf.float32)
  residue = num_to_repeat - num_to_repeat_integral
  num_to_repeat = num_to_repeat_integral + tf.cast(tf.random.uniform(shape=()) <= residue, tf.float32)
  return tf.cast(num_to_repeat, tf.int64)

def get_train_dataset_with_oversample(oversample=False):
  ''' costructing new dataset with class oversampling '''
  dataset = load_dataset(TRAIN_FILENAMES)

  if oversample:
    dataset = dataset.flat_map(
        lambda sequence, label: tf.data.Dataset.from_tensors((sequence, label)).repeat(
            get_num_of_repetition_for_example((sequence, label))
        )
    )
  dataset = dataset.repeat()
  dataset = dataset.shuffle(20000)
  dataset = dataset.batch(BATCH_SIZE)
  dataset = dataset.prefetch(AUTO) # prefetch next batch while training (autotune prefetch buffer size)
  dataset = strategy.experimental_distribute_dataset(dataset)
  return dataset

# Model (Encoders)

## Visual Transformer model

In [None]:
BASE_URL = "https://github.com/faustomorales/vit-keras/releases/download/dl"
WEIGHTS = {"imagenet21k": 21_843, "imagenet21k+imagenet2012": 1_000}

ConfigDict = tx.TypedDict(
    "ConfigDict",
    {
        "dropout": float,
        "mlp_dim": int,
        "num_heads": int,
        "num_layers": int,
        "hidden_size": int,
    },
)

CONFIG_B: ConfigDict = {
    "dropout": 0.1,
    "mlp_dim": 3072,
    "num_heads": 12,
    "num_layers": 12,
    "hidden_size": 768,
}

class AddPositionEmbs(tf.keras.layers.Layer):
    """Adds (optionally learned) positional embeddings to the inputs."""

    def build(self, input_shape):
        assert (
            len(input_shape) == 3
        ), f"Number of dimensions should be 3, got {len(input_shape)}"
        self.pe = tf.Variable(
            name="pos_embedding",
            initial_value=tf.random_normal_initializer(stddev=0.06)(
                shape=(1, input_shape[1], input_shape[2])
            ),
            dtype="float32",
            trainable=True,
        )

    def compute_output_shape(self, input_shape):
        return (input_shape[0], input_shape[1], input_shape[2])

    def call(self, inputs):
        return inputs + tf.cast(self.pe, dtype=inputs.dtype)

    def get_config(self):
        config = super().get_config()
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)

class MultiHeadSelfAttention(tf.keras.layers.Layer):
    def __init__(self, *args, num_heads, **kwargs):
        super().__init__(*args, **kwargs)
        self.num_heads = num_heads

    def build(self, input_shape):
        hidden_size = input_shape[-1]
        num_heads = self.num_heads
        if hidden_size % num_heads != 0:
            raise ValueError(
                f"embedding dimension = {hidden_size} should be divisible by number of heads = {num_heads}"
            )
        self.hidden_size = hidden_size
        self.projection_dim = hidden_size // num_heads
        self.query_dense = tf.keras.layers.Dense(hidden_size, name="query")
        self.key_dense = tf.keras.layers.Dense(hidden_size, name="key")
        self.value_dense = tf.keras.layers.Dense(hidden_size, name="value")
        self.combine_heads = tf.keras.layers.Dense(hidden_size, name="out")

    # pylint: disable=no-self-use
    def attention(self, query, key, value):
        score = tf.matmul(query, key, transpose_b=True)
        dim_key = tf.cast(tf.shape(key)[-1], score.dtype)
        scaled_score = score / tf.math.sqrt(dim_key)
        weights = tf.nn.softmax(scaled_score, axis=-1)
        output = tf.matmul(weights, value)
        return output, weights

    def separate_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        query = self.query_dense(inputs)
        key = self.key_dense(inputs)
        value = self.value_dense(inputs)
        query = self.separate_heads(query, batch_size)
        key = self.separate_heads(key, batch_size)
        value = self.separate_heads(value, batch_size)

        attention, weights = self.attention(query, key, value)
        attention = tf.transpose(attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(attention, (batch_size, -1, self.hidden_size))
        output = self.combine_heads(concat_attention)
        return output, weights

    def get_config(self):
        config = super().get_config()
        config.update({"num_heads": self.num_heads})
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)


class TransformerBlock(tf.keras.layers.Layer):
    """Implements a Transformer block."""

    def __init__(self, *args, num_heads, mlp_dim, dropout, **kwargs):
        super().__init__(*args, **kwargs)
        self.num_heads = num_heads
        self.mlp_dim = mlp_dim
        self.dropout = dropout

    def build(self, input_shape):
        self.att = MultiHeadSelfAttention(
            num_heads=self.num_heads,
            name="MultiHeadDotProductAttention_1",
        )
        self.mlpblock = tf.keras.Sequential(
            [
                tf.keras.layers.Dense(
                    self.mlp_dim,
                    activation="linear",
                    name=f"{self.name}/Dense_0",
                ),
                tf.keras.layers.Lambda(
                    lambda x: tf.keras.activations.gelu(x, approximate=False)
                )
                if hasattr(tf.keras.activations, "gelu")
                else tf.keras.layers.Lambda(
                    lambda x: tf.activations.gelu(x, approximate=False)
                ),
                tf.keras.layers.Dropout(self.dropout),
                tf.keras.layers.Dense(input_shape[-1], name=f"{self.name}/Dense_1"),
                tf.keras.layers.Dropout(self.dropout),
            ],
            name="MlpBlock_3",
        )
        self.layernorm1 = tf.keras.layers.LayerNormalization(
            epsilon=1e-6, name="LayerNorm_0"
        )
        self.layernorm2 = tf.keras.layers.LayerNormalization(
            epsilon=1e-6, name="LayerNorm_2"
        )
        self.dropout_layer = tf.keras.layers.Dropout(self.dropout)

    def call(self, inputs, training):
        x = self.layernorm1(inputs)
        x, weights = self.att(x)
        x = self.dropout_layer(x, training=training)
        x = x + inputs
        y = self.layernorm2(x)
        y = self.mlpblock(y)
        return x + y, weights

    def compute_output_shape(self, input_shape):
        return (input_shape[0], input_shape[1], input_shape[2])

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "num_heads": self.num_heads,
                "mlp_dim": self.mlp_dim,
                "dropout": self.dropout,
            }
        )
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)

class ClassToken(tf.keras.layers.Layer):
    """Append a class token to an input layer."""

    def build(self, input_shape):
        cls_init = tf.zeros_initializer()
        self.hidden_size = input_shape[-1]
        self.cls = tf.Variable(
            name="cls",
            initial_value=cls_init(shape=(1, 1, self.hidden_size), dtype="float32"),
            trainable=True,
        )

    def compute_output_shape(self, input_shape):
        return (input_shape[0], input_shape[1] + 1, input_shape[2])

    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        cls_broadcasted = tf.cast(
            tf.broadcast_to(self.cls, [batch_size, 1, self.hidden_size]),
            dtype=inputs.dtype,
        )
        return tf.concat([cls_broadcasted, inputs], 1)

    def get_config(self):
        config = super().get_config()
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)

class SliceClassToken(tf.keras.layers.Layer):
    """Append a class token to an input layer."""

    def compute_output_shape(self, input_shape):
        return (input_shape[0], input_shape[1] - 1, input_shape[2])

    def call(self, inputs):
        return inputs[:, :-1, :]

    def get_config(self):
        config = super().get_config()
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)

def interpret_image_size(image_size_arg):
    """Process the image_size argument whether a tuple or int."""
    if isinstance(image_size_arg, int):
        return (image_size_arg, image_size_arg)
    if (
        isinstance(image_size_arg, tuple)
        and len(image_size_arg) == 2
        and all(map(lambda v: isinstance(v, int), image_size_arg))
    ):
        return image_size_arg
    raise ValueError(
        f"The image_size argument must be a tuple of 2 integers or a single integer. Received: {image_size_arg}"
    )

def build_model(
    image_size,
    patch_size: int,
    num_layers: int,
    hidden_size: int,
    num_heads: int,
    name: str,
    mlp_dim: int,
    classes: int,
    dropout=0.1,
    activation="linear",
    include_top=True,
    representation_size=None,
):
    """Build a ViT model.
    Args:
        image_size: The size of input images.
        patch_size: The size of each patch (must fit evenly in image_size)
        classes: optional number of classes to classify images
            into, only to be specified if `include_top` is True, and
            if no `weights` argument is specified.
        num_layers: The number of transformer layers to use.
        hidden_size: The number of filters to use
        num_heads: The number of transformer heads
        mlp_dim: The number of dimensions for the MLP output in the transformers.
        dropout_rate: fraction of the units to drop for dense layers.
        activation: The activation to use for the final layer.
        include_top: Whether to include the final classification layer. If not,
            the output will have dimensions (batch_size, hidden_size).
        representation_size: The size of the representation prior to the
            classification layer. If None, no Dense layer is inserted.
    """
    image_size_tuple = interpret_image_size(image_size)
    assert (image_size_tuple[0] % patch_size == 0) and (
        image_size_tuple[1] % patch_size == 0
    ), "image_size must be a multiple of patch_size"
    x = tf.keras.layers.Input(shape=(image_size_tuple[0], image_size_tuple[1], 3))
    y = tf.keras.layers.Conv2D(
        filters=hidden_size,
        kernel_size=patch_size,
        strides=patch_size,
        padding="valid",
        name="embedding",
    )(x)
    y = tf.keras.layers.Reshape((y.shape[1] * y.shape[2], hidden_size))(y)
    y = ClassToken(name="class_token")(y)
    y = AddPositionEmbs(name="Transformer/posembed_input")(y)
    for n in range(num_layers):
        y, _ = TransformerBlock(
            num_heads=num_heads,
            mlp_dim=mlp_dim,
            dropout=dropout,
            name=f"Transformer/encoderblock_{n}",
        )(y)
    y = tf.keras.layers.LayerNormalization(
        epsilon=1e-6, name="Transformer/encoder_norm"
    )(y)
    y = tf.keras.layers.Lambda(lambda v: v[:, 0], name="ExtractToken")(y)
    if representation_size is not None:
        y = tf.keras.layers.Dense(
            representation_size, name="pre_logits", activation="tanh"
        )(y)
    if include_top:
        y = tf.keras.layers.Dense(classes, name="head", activation=activation)(y)
    return tf.keras.models.Model(inputs=x, outputs=y, name=name)


def validate_pretrained_top(
    include_top: bool, pretrained: bool, classes: int, weights: str
):
    """Validate that the pretrained weight configuration makes sense."""
    assert weights in WEIGHTS, f"Unexpected weights: {weights}."
    expected_classes = WEIGHTS[weights]
    assert include_top, "Can only use pretrained_top with include_top."
    assert pretrained, "Can only use pretrained_top with pretrained."
    return expected_classes


def load_pretrained(
    size: str,
    weights: str,
    pretrained_top: bool,
    model: tf.keras.models.Model,
    image_size,
    patch_size: int,
):
    """Load model weights for a known configuration."""
    image_size_tuple = interpret_image_size(image_size)
    fname = f"ViT-{size}_{weights}.npz"
    origin = f"{BASE_URL}/{fname}"
    local_filepath = tf.keras.utils.get_file(fname, origin, cache_subdir="weights")
    utils.load_weights_numpy(
        model=model,
        params_path=local_filepath,
        pretrained_top=pretrained_top,
        num_x_patches=image_size_tuple[1] // patch_size,
        num_y_patches=image_size_tuple[0] // patch_size,
    )


def vit_b16(
    image_size = (224, 224),
    classes=1000,
    activation="linear",
    include_top=True,
    pretrained=True,
    pretrained_top=True,
    weights="imagenet21k+imagenet2012",
):
    """Build ViT-B16. All arguments passed to build_model."""
    if pretrained_top:
        classes = validate_pretrained_top(
            include_top=include_top,
            pretrained=pretrained,
            classes=classes,
            weights=weights,
        )
    model = build_model(
        **CONFIG_B,
        name="vit-b16",
        patch_size=16,
        image_size=image_size,
        classes=classes,
        activation=activation,
        include_top=include_top,
        representation_size=768 if weights == "imagenet21k" else None,
    )

    if pretrained:
        load_pretrained(
            size="B_16",
            weights=weights,
            model=model,
            pretrained_top=pretrained_top,
            image_size=image_size,
            patch_size=16,
        )
    return model


In [None]:
# Visual Transfromer model check

#image_size = 224
#base_model = vit_b16(
#    image_size=image_size,
#    activation='sigmoid',
#    pretrained=True,
#    include_top=False,
#    pretrained_top=False,
#)
#x = base_model.layers[-2].output
#x = SliceClassToken()(x) 
#patch_size = int(math.sqrt(x.shape[-2]))
#embedding_dim = x.shape[-1]
#embedding = tf.keras.layers.Reshape((
#    patch_size, patch_size, 
#    embedding_dim))(x)
#base_model = tf.keras.Model(inputs=base_model.input, outputs=embedding)
#base_model.summary()

## MobileViT model

In [None]:
class InvertedRes(tf.keras.layers.Layer):
  """Inverted Residual Block"""
  
  def __init__(self, expand_channels, output_channels, strides=1):
    super().__init__()
    self.output_channels = output_channels
    self.strides = strides
    self.expand_channels = expand_channels
    self.expand = tf.keras.models.Sequential([
        tf.keras.layers.Conv2D(expand_channels, 1, padding="same", use_bias=False),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('swish')
                                        ], name="expand")
    self.dw_conv = tf.keras.models.Sequential([
        tf.keras.layers.DepthwiseConv2D(3, strides=strides, padding="same", use_bias=False),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('swish')
                                        ], name="depthwise")
    self.pw_conv = tf.keras.models.Sequential([
        tf.keras.layers.Conv2D(output_channels, 1, padding="same", use_bias=False),
        tf.keras.layers.BatchNormalization(),
                                        ], name='pointwise')
  
  def compute_output_shape(self, input_shape):
    return (
      input_shape[0], 
      input_shape[1] // self.strides, 
      input_shape[2]  // self.strides, 
      self.output_channels
    )

  def get_config(self):
    config = super().get_config().copy()
    config.update({
            'expand_channels': self.expand_channels,
            'output_channels': self.output_channels,
            'strides': self.strides,
    })
    return config

  def call(self, x):
    o = self.expand(x)
    o = self.dw_conv(o)
    o = self.pw_conv(o)
    if self.strides == 1 and o.shape[-1] == self.output_channels:
      return o + x
    return o

class FullyConnected(tf.keras.layers.Layer):
  """Fully Connected Block"""

  def __init__(self, hidden_units, dropout_rate):
    super().__init__()
    l = []
    for units in hidden_units:
      l.append(tf.keras.layers.Dense(units, activation=tf.nn.swish))
      l.append(tf.keras.layers.Dropout(dropout_rate))
    self.mlp = tf.keras.models.Sequential(l)

  def compute_output_shape(self, input_shape):
    pass

  def call(self, x):
    return self.mlp(x)

class Transformer(tf.keras.layers.Layer):
  """Transformer Block"""
  def __init__(self, projection_dim, heads=2):
    super().__init__()
    self.norm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
    self.attention = tf.keras.layers.MultiHeadAttention(
        num_heads=heads, key_dim=projection_dim, dropout=0.1)
    self.norm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
  
  def build(self, input_shape):
    self.mlp = FullyConnected(
        [input_shape[-1] * 2, input_shape[-1]], dropout_rate=0.1)

  def compute_output_shape(self, input_shape):
    pass

  def call(self, x):
    x1 = self.norm1(x)
    att = self.attention(x1, x1)
    x2 = x + att
    x3 = self.norm2(x2)
    x3 = self.mlp(x3)
    return x3 + x2

class MobileVitBlock(tf.keras.layers.Layer):
  """MobileViT Block"""

  def __init__(self, num_blocks, projection_dim, strides=1, patch_size=4):
    super().__init__()
    self.projection_dim = projection_dim
    self.num_blocks = num_blocks
    self.patch_size = patch_size
    self.strides = strides
    self.conv_local = tf.keras.models.Sequential([
        tf.keras.layers.Conv2D(projection_dim, 3, padding="same", strides=strides, activation=tf.nn.swish),
        tf.keras.layers.Conv2D(projection_dim, 1, padding="same", strides=strides, activation=tf.nn.swish),
                                           ])
    self.transformers = tf.keras.models.Sequential([Transformer(projection_dim, heads=2) for i in range(num_blocks)])
    self.conv_folded = tf.keras.layers.Conv2D(projection_dim, 1, padding="same", strides=strides, activation=tf.nn.swish)
    self.conv_local_global = tf.keras.layers.Conv2D(projection_dim, 3, padding="same", strides=strides, activation=tf.nn.swish)

  def build(self, input_shape):
    num_patches = int((input_shape[1] * input_shape[2]) / self.patch_size)
    self.unfold = tf.keras.layers.Reshape((self.patch_size, num_patches, self.projection_dim))
    self.fold = tf.keras.layers.Reshape((input_shape[1], input_shape[2], self.projection_dim))

  def get_config(self):
    config = super().get_config().copy()
    config.update({
            'num_blocks': self.num_blocks,
            'projection_dim': self.projection_dim,
            'strides': self.strides,
            'patch_size' : self.patch_size,
    })
    return config

  def compute_output_shape(self, input_shape):
    return (input_shape[0], input_shape[1], input_shape[2], self.projection_dim)

  def call(self, x):
    local_features = self.conv_local(x)
    patches = self.unfold(local_features)
    global_features = self.transformers(patches)
    folded_features = self.fold(global_features)
    folded_features = self.conv_folded(folded_features)
    local_global_features = tf.concat([x, folded_features], axis=-1)
    local_global_features = self.conv_local_global(local_global_features)
    return local_global_features

def MobileViT(input_shape=None, include_top=True, classes=1000, expansion_ratio = 2.0):

    img_input = tf.keras.layers.Input(shape=input_shape) # (None, 256, 256, 3)
    x = tf.keras.layers.Conv2D(
        16, 3, padding="same", strides=(2, 2), activation=tf.nn.swish
      )(img_input)  # (None, 128, 128, 16) 
    x = InvertedRes(16 * expansion_ratio, 16, strides=1)(x) # (None, 128, 128, 16)
    x = InvertedRes(16 * expansion_ratio, 24, strides=2)(x) # (None, 64, 64, 24) 
    x = InvertedRes(24 * expansion_ratio, 24, strides=1)(x) # (None, 64, 64, 24)
    x = InvertedRes(24 * expansion_ratio, 24, strides=1)(x) # (None, 64, 64, 24)
    x = InvertedRes(24 * expansion_ratio, 48, strides=2)(x) # (None, 32, 32, 48)
    x = MobileVitBlock(2, 64, strides=1)(x)                 # (None, 32, 32, 64)
    x = InvertedRes(64 * expansion_ratio, 64, strides=2)(x) # (None, 16, 16, 64)
    x = MobileVitBlock(4, 80, strides=1)(x)                 # (None, 16, 16, 80)
    x = InvertedRes(80 * expansion_ratio, 80, strides=2)(x) # (None, 8, 8, 80)
    x = MobileVitBlock(3, 96, strides=1)(x)                 # (None, 8, 8, 96)

    if include_top:
        x = tf.keras.layers.Conv2D(320, 1, padding="same", strides=(1, 1), activation=tf.nn.swish)(x)
        x = tf.keras.layers.GlobalAvgPool2D()(x)
        x = tf.keras.layers.Dense(classes, activation="sigmoid")(x)
        
    # Create model.
    model = tf.keras.models.Model(img_input, x)#, name='MobileViT')

    return model

In [None]:
# MobileViT model check

#model = MobileViT((256, 256, 3), include_top=True, classes=1)
#model.build((None, 256, 256, 3))
#model.summary()

# Model (Temporal)

## Transformer model

In [None]:
class TransformerEncoder(tf.keras.layers.Layer):
    """Transformer Encoder Block"""
    
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = tf.keras.layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=0.3
        )
        self.dense_proj = tf.keras.Sequential(
            [
                tf.keras.layers.Dense(dense_dim, activation=tf.nn.gelu), 
                tf.keras.layers.Dense(embed_dim),
             ]
        )
        self.layernorm_1 = tf.keras.layers.LayerNormalization()
        self.layernorm_2 = tf.keras.layers.LayerNormalization()

    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'embed_dim': self.embed_dim,
            'dense_dim': self.dense_dim,
            'num_heads': self.num_heads,
        })
        return config
        
    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]

        attention_output = self.attention(inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

In [None]:
def TransformerModel(embed_dim, dense_dim=512, num_heads=4, num_classes=4):
  inputs = tf.keras.Input(shape=(512, embed_dim))
  x = TransformerEncoder(embed_dim, dense_dim, num_heads, name="transformer_layer")(inputs)
  x = tf.keras.layers.GlobalMaxPooling1D()(x)
  x = tf.keras.layers.Dropout(0.5)(x)
  outputs = tf.keras.layers.Dense(num_classes, activation="softmax")(x)
  model = tf.keras.Model(inputs, outputs)
  return model

In [None]:
# Transformer Encoder model check

#temporal_model = TransformerModel(embed_dim=512)
#temporal_model.build((None, 512, 512))
#temporal_model.summary()

# Model

In [None]:
def create_model(encoder_name, temporal_model_name='lstm', many2one=True):

  temporal_input = tf.keras.layers.Input(shape=(SEQ_LEN, IMAGE_CROP_SIZE, IMAGE_CROP_SIZE, NUM_CHANNELS))
  
  # Encoder model
  if encoder_name == 'mobilenetv2':   # MobileNetV2 model
    base_model = tf.keras.applications.MobileNetV2(
      input_tensor=tf.keras.layers.Input(shape=(IMAGE_CROP_SIZE, IMAGE_CROP_SIZE, NUM_CHANNELS)),
      input_shape=(IMAGE_CROP_SIZE, IMAGE_CROP_SIZE, NUM_CHANNELS),
      include_top=False,
      weights='imagenet'
    )
    encoder_output_shape = base_model.output_shape

  elif encoder_name == 'vgg':   # VGG 19 model
    base_model = tf.keras.applications.vgg19.VGG19(
        input_shape=(IMAGE_CROP_SIZE, IMAGE_CROP_SIZE, NUM_CHANNELS),
        include_top=False,
        weights='imagenet'
    )
    encoder_output_shape = base_model.output_shape

  elif encoder_name == 'efficient_b0':    # EfficientNet b0 model
    base_model = tf.keras.applications.EfficientNetB0(
        input_shape=(IMAGE_CROP_SIZE, IMAGE_CROP_SIZE, NUM_CHANNELS),
        include_top=False, 
        weights='imagenet'
    )
    encoder_output_shape = base_model.output_shape

  elif encoder_name == 'mobilevit':     # MobileViT model
    base_model = MobileViT(
      input_shape=(IMAGE_CROP_SIZE, IMAGE_CROP_SIZE, NUM_CHANNELS),
      include_top=False
    )
    encoder_output_shape = base_model.output_shape

  elif encoder_name == 'vit':     # Visual Transformer base 16 model
    # https://github.com/faustomorales/vit-keras/blob/28815edc5c24492612af726d1b2ca78295128d84/vit_keras/vit.py
    base_model = vit_b16(
      image_size=IMAGE_CROP_SIZE,
      activation='sigmoid',
      pretrained=True,
      include_top=False,
      pretrained_top=False,
    )
    x = base_model.layers[-2].output
    x = SliceClassToken()(x) 
    patch_size = int(math.sqrt(x.shape[-2]))  
    embedding_dim = x.shape[-1]
    embedding = tf.keras.layers.Reshape((
      patch_size, patch_size, 
      embedding_dim))(x)
    base_model = tf.keras.Model(inputs=base_model.input, outputs=embedding)
    encoder_output_shape = base_model.output_shape

  elif encoder_name == 'movinet': # MoViNet model 
    # downloading pretrained model checkpoints
    !wget https://storage.googleapis.com/tf_model_garden/vision/movinet/movinet_a3_base.tar.gz -O movinet_a3_base.tar.gz -q
    !tar -xvf movinet_a3_base.tar.gz

    checkpoint_dir = 'movinet_a3_base'
    checkpoint_path = tf.train.latest_checkpoint(checkpoint_dir)
    checkpoint = tf.train.Checkpoint(model=model)
    status = checkpoint.restore(checkpoint_path)
    status.assert_existing_objects_matched()

    # creating backbone model
    model_id = 'a3'
    backbone = movinet.Movinet(model_id=model_id)
    if many2one:
      model = movinet_model.MovinetClassifier(
        backbone=backbone,
        num_classes=4)
      return model

    else:
      model_encoder = movinet_model.MovinetClassifier(
        backbone=backbone,
        num_classes= 4 * SEQ_LEN)
    
      temporal_reshape = tf.keras.layers.Reshape(
        (SEQ_LEN, 4)
      )
      model = tf.keras.Sequential([
          model_encoder, 
          temporal_reshape, 
      ])
      return model
      
  else:
    raise NotImplementedError



  # Number of layers to freeze
  fine_tune_at = FINE_TUNE_BUFFER
  for layer in base_model.layers[:fine_tune_at]:
    layer.trainable = False


  # Intermidiary layers to match shapes of encoder outputs and temporal model inputs
  base_model = tf.keras.layers.TimeDistributed(
      base_model, 
      input_shape=(SEQ_LEN, IMAGE_CROP_SIZE, IMAGE_CROP_SIZE, NUM_CHANNELS))

  global_avg_pool = tf.keras.layers.TimeDistributed(
      tf.keras.layers.GlobalAveragePooling2D(),
      input_shape=(8, 8, 96))
  flatten = tf.keras.layers.TimeDistributed(tf.keras.layers.Flatten())
  dropout = tf.keras.layers.Dropout(0.2)


  # temporal model
  if temporal_model_name == 'lstm':   # LSTM temporal model
    temporal_layer = tf.keras.layers.LSTM(
        16, return_sequences= not many2one, dropout=0.2, use_bias=False)

  elif temporal_model_name == 'transformer':  # Transformer Encoder temporal model
    temporal_layer = TransformerEncoder(
        encoder_output_shape[-1], dense_dim=512, num_heads=5, 
        name="transformer_layer"
    )
    if many2one == True:   # many 2 one classification head of Transformer model
      temporal_reshape = tf.keras.layers.Reshape(
        (SEQ_LEN * encoder_output_shape[-1], )
      )
      linear = tf.keras.layers.Dense(128, activation='relu')
      temporal_layer = tf.keras.Sequential([
          temporal_layer, 
          temporal_reshape, 
          linear
      ])

  else:
    raise NotImplementedError

  # output layer
  dense = tf.keras.layers.Dense(4, activation='softmax')


  model = tf.keras.Sequential([
      temporal_input,
      base_model,
      global_avg_pool,
      flatten,
      dropout,
      temporal_layer,
      dense
  ])
  return model

In [None]:
import tensorflow as tf
from tensorflow.python.framework.convert_to_constants import  convert_variables_to_constants_v2_as_graph

def get_flops(model, write_path=None):
    concrete = tf.function(lambda inputs: model(inputs))
    concrete_func = concrete.get_concrete_function(
        [tf.TensorSpec([1, *inputs.shape[1:]]) for inputs in model.inputs])
    frozen_func, graph_def = convert_variables_to_constants_v2_as_graph(concrete_func)
    with tf.Graph().as_default() as graph:
        tf.graph_util.import_graph_def(graph_def, name='')
        run_meta = tf.compat.v1.RunMetadata()
        opts = tf.compat.v1.profiler.ProfileOptionBuilder.float_operation()
        if write_path:
            opts['output'] = 'file:outfile={}'.format(write_path)  # suppress output
        flops = tf.compat.v1.profiler.profile(graph=graph, run_meta=run_meta, cmd="op", options=opts)
        return flops.total_float_ops

In [None]:
lr = tf.keras.experimental.CosineDecay(BASE_LR, STEPS_PER_EPOCH * EPOCHS) # cosine learning decay

with strategy.scope():
  print(ENCODER_NAME, TEMPORAL_MODEL)
  model = create_model(ENCODER_NAME, TEMPORAL_MODEL, MANY2ONE)
  if OPTIMIZER == 'adam':
    optimizer = tf.keras.optimizers.Adam(lr) # adam optimizer
  else:
    raise NotImplementedError
      
  model.compile(
    optimizer=optimizer,
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False), #'sparse_categorical_crossentropy',
    # predict True positives/total images
    metrics=['accuracy'],
    # NEW on TPU in TensorFlow 24: sending multiple batches to the TPU at once saves communications
    # overheads and allows the XLA compiler to unroll the loop on TPU and optimize hardware utilization.
    steps_per_execution=16
  )

model.build((None, SEQ_LEN, IMAGE_CROP_SIZE, IMAGE_CROP_SIZE, NUM_CHANNELS))
model.summary()
parameters = model.count_params()

print('Number of parameters: ', parameters)
tf.compat.v1.enable_eager_execution()
print('FLOPS :', get_flops(model))

# Frames per second measures

In [None]:
# Frames Per Second 
if EVALUATE_INFERENCE_SPEED:
  x = tf.ones((1, SEQ_LEN, IMAGE_CROP_SIZE, IMAGE_CROP_SIZE, NUM_CHANNELS))
  print(x.shape)
  start_time = time.time()
  for i in trange(100):
    y = model(x)
  elapsed = time.time() - start_time
  print('FPS: ', 100 / elapsed)

# Training

In [None]:
print("Loading... Please wait for data to shuffle")
start_time = time.time()
history = model.fit(
    get_train_dataset_with_oversample(True), # get_train_dataset(), 
    steps_per_epoch=STEPS_PER_EPOCH, 
    epochs=5,
    validation_data=get_validation_dataset(repeated=True), 
    validation_steps=VALIDATION_STEPS,
    use_multiprocessing=True
)
print('Elasped time: ', time.time() - start_time)

In [None]:
os.makedirs('/content/drive/MyDrive/CV_Research/StairNet/Plots/', exist_ok=True)
os.makedirs('/content/drive/MyDrive/CV_Research/StairNet/CSV/', exist_ok=True)

## Saving a model

In [None]:
if ENCODER_NAME == 'movinet':
  model_location = f"/content/drive/MyDrive/CV_Research/StairNet/Models/{ENCODER_NAME}_Many2one:{MANY2ONE}"
else:
  model_location = f"/content/drive/MyDrive/CV_Research/StairNet/Models/{ENCODER_NAME}_{TEMPORAL_MODEL}_Many2one:{MANY2ONE}"

save_locally = tf.saved_model.SaveOptions(experimental_io_device='/job:localhost')
model.save(model_location, options=save_locally)
model.save_weights(model_location + '_weights.h5')
#model.save(model_location, save_format='h5')

In [None]:
import pandas as pd
import matplotlib as mpl
mpl.rcParams['figure.figsize'] = (10, 5)
data = pd.DataFrame(history.history)
metrics = ['loss', 'accuracy']

metrics_array = []

for metric in metrics:
    data[[f'{metric}',f'val_{metric}']].plot()

plot_location = f"/content/drive/MyDrive/CV_Research/StairNet/Plots/{ENCODER_NAME}_{TEMPORAL_MODEL}_Many2one:{MANY2ONE}.jpg"
plt.savefig(plot_location)

csv_location = f"/content/drive/MyDrive/CV_Research/StairNet/CSV/{ENCODER_NAME}_{TEMPORAL_MODEL}_Many2one:{MANY2ONE}.csv"

results_df = pd.DataFrame(history.history)
print(results_df)
results_df.to_csv(csv_location)

## Model evaluation

In [None]:
cmdataset = get_validation_dataset(distributed=False) # since we are splitting the dataset and iterating separately on images and labels, order matters.
labels_ds = cmdataset.map(lambda image, label: label).unbatch()
labels_ds = next(iter(labels_ds.batch(NUM_TEST_IMAGES))).numpy() # get everything as one batch

In [None]:
labels_ds, labels_ds.shape

In [None]:
if ENCODER_NAME == 'movinet':
  model_location = f"/content/drive/MyDrive/CV_Research/StairNet/Models/{ENCODER_NAME}_Many2one:{MANY2ONE}"
else:
  model_location = f"/content/drive/MyDrive/CV_Research/StairNet/Models/{ENCODER_NAME}_{TEMPORAL_MODEL}_Many2one:{MANY2ONE}"

load_locally = tf.saved_model.LoadOptions(experimental_io_device='/job:localhost')
model = tf.keras.models.load_model('./model_total', options=load_locally)

#model = tf.keras.models.load_model(model_location, custom_objects={'TransformerEncoder': TransformerEncoder})

In [None]:
class TQDMPredictCallback(tf.keras.callbacks.Callback):
    def __init__(self, custom_tqdm_instance=None, tqdm_cls=tqdm, **tqdm_params):
        super().__init__()
        self.tqdm_cls = tqdm_cls
        self.tqdm_progress = None
        self.prev_predict_batch = None
        self.custom_tqdm_instance = custom_tqdm_instance
        self.tqdm_params = tqdm_params

    def on_predict_batch_begin(self, batch, logs=None):
        pass

    def on_predict_batch_end(self, batch, logs=None):
        self.tqdm_progress.update(batch - self.prev_predict_batch)
        self.prev_predict_batch = batch

    def on_predict_begin(self, logs=None):
        self.prev_predict_batch = 0
        if self.custom_tqdm_instance:
            self.tqdm_progress = self.custom_tqdm_instance
            return

        total = self.params.get('steps')
        if total:
            total -= 1

        self.tqdm_progress = self.tqdm_cls(total=total, **self.tqdm_params)

    def on_predict_end(self, logs=None):
        if self.tqdm_progress and not self.custom_tqdm_instance:
            self.tqdm_progress.close()

In [None]:
tqdm_callback = TQDMPredictCallback()

preds = model.predict(cmdataset, callbacks=[tqdm_callback], steps=VALIDATION_STEPS)
preds = tf.math.argmax(preds, -1).numpy()

print("Correct   labels: ", labels_ds.shape, labels_ds)
print("Predicted labels: ", preds.shape, preds)
assert labels_ds.shape == preds.shape

In [None]:
def display_confusion_matrix(cmat, accuracy, score, precision, recall, filename):
    plt.figure(figsize=(8,6))
    ax = plt.gca()
    sns.heatmap(cmat, annot=True, fmt='.2g', ax=ax);
    ax.set_xticks(range(len(CLASS_MAP)))
    ax.set_xticklabels(CLASS_MAP, fontdict={'fontsize': 7})
    plt.setp(ax.get_xticklabels(), rotation=45, ha="left", rotation_mode="anchor")
    ax.set_yticks(range(len(CLASS_MAP)))
    ax.set_yticklabels(CLASS_MAP, fontdict={'fontsize': 7})
    plt.setp(ax.get_yticklabels(), rotation=45, ha="right", rotation_mode="anchor")
    titlestring = ""
    if score is not None:
        titlestring += 'F1 = {:.3f} '.format(score)
    if accuracy is not None:
        titlestring += 'Accuracy = {:.3f} '.format(accuracy)
    if precision is not None:
        titlestring += '\nPrecision = {:.3f} '.format(precision)
    if recall is not None:
        titlestring += 'Recall = {:.3f} '.format(recall)
    if len(titlestring) > 0:
        plt.title(titlestring)
    plt.savefig(filename)
    plt.show()

In [None]:
filename = f"/content/drive/MyDrive/CV_Research/StairNet/Plots/{ENCODER_NAME}_{TEMPORAL_MODEL}_Many2one:{MANY2ONE}_confusion_matrix.png"

if ENCODER_NAME == 'movinet':
  filename = f"/content/drive/MyDrive/CV_Research/StairNet/Plots/{ENCODER_NAME}_Many2one:{MANY2ONE}_confusion_matrix.png"
else:
  filename = f"/content/drive/MyDrive/CV_Research/StairNet/Plots/{ENCODER_NAME}_{TEMPORAL_MODEL}_Many2one:{MANY2ONE}_confusion_matrix.png"

cmat = confusion_matrix(labels_ds, preds, labels=range(len(CLASS_MAP)))
# score = f1_score(cm_correct_labels, cm_predictions, labels=range(len(CLASSES)), average='macro')
score = f1_score(labels_ds, preds, labels=range(len(CLASS_MAP)), average='weighted')
# precision = precision_score(cm_correct_labels, cm_predictions, labels=range(len(CLASSES)), average='macro')
precision = precision_score(labels_ds, preds, labels=range(len(CLASS_MAP)), average='weighted')
# recall = recall_score(cm_correct_labels, cm_predictions, labels=range(len(CLASSES)), average='macro')
recall = recall_score(labels_ds, preds, labels=range(len(CLASS_MAP)), average='weighted')
cmat = (cmat.T / cmat.sum(axis=1)).T # normalized
accuracy = accuracy_score(labels_ds, preds)
display_confusion_matrix(cmat, accuracy, score, precision, recall, filename)
print('accuracy: {:.5f}, f1 score: {:.5f}, precision: {:.5f}, recall: {:.5f}'.format(accuracy, score, precision, recall))