In [None]:
!pip install nilearn==0.9.2

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import os
import numpy as np
from pathlib import Path
from PIL import Image
from tqdm import tqdm

from tensorflow.keras.layers.experimental.preprocessing import Resizing

from google.colab import drive
import h5py
import os
import random
import tensorflow as tf
import time


import datetime

%load_ext tensorboard

# Load the data

In [None]:
drive.mount('/content/drive', force_remount=True)
os.chdir ("drive/MyDrive")

filename = "/content/drive/MyDrive/miniecoset_100obj_64px.h5"

Mounted at /content/drive


In [None]:

class HDF5Sequence(tf.keras.utils.Sequence):
    def __init__(self, filename, batch_size):
        self.batch_size = batch_size
        with h5py.File(filename, 'r') as f:
            self.length = f['train']['labels'].shape[0]
        self.file = filename

    def __len__(self):
        return int(tf.math.ceil(self.length / float(self.batch_size)))

    def __gettrain__(self, idx):
        with h5py.File(self.file, 'r') as f:
            start_idx = idx * self.batch_size
            end_idx = (idx + 1) * self.batch_size
            train_img = f['train']['data'][start_idx:end_idx]
            train_labels = f['train']['labels'][start_idx:end_idx]
        return train_img, train_labels

    def __gettest__(self, idx):
        with h5py.File(self.file, 'r') as f:
            start_idx = idx * self.batch_size
            end_idx = (idx + 1) * self.batch_size
            test_img = f['test']['data'][start_idx:end_idx]
            test_labels = f['test']['labels'][start_idx:end_idx]
        return test_img, test_labels


In [None]:
# specify the path to your hdf5 file and the batch size
filename = "/content/drive/MyDrive/miniecoset_100obj_64px.h5"
batch_size = 32

# create a Sequence object
hdf5_sequence = HDF5Sequence(filename, batch_size)

# define the output signature of the generator
output_signature = (
    tf.TensorSpec(shape=(None, 32, 32, 1), dtype=tf.float32),
    tf.TensorSpec(shape=(None,), dtype=tf.int32)
)

#data augmentation
#add slightly transformed copies of already included data
augmentation_model = tf.keras.Sequential([
  tf.keras.layers.RandomFlip("horizontal_and_vertical"),
  tf.keras.layers.RandomRotation(0.1),
])


In [None]:
# create a tf dataset from the generator
train_dataset = tf.data.Dataset.from_generator(
    generator=hdf5_sequence.__gettrain__,
    output_signature=output_signature,
    args=(0,)
)


In [None]:

# create a tf dataset from the generator
test_dataset = tf.data.Dataset.from_generator(
    generator=hdf5_sequence.__gettest__,
    output_signature=output_signature,
    args=(0,)
)

In [None]:
def preprocess(data, augmentation_model):
    # perform any additional preprocessing steps on the dataset as needed
    data = data.map(lambda x, y: (tf.image.convert_image_dtype(x, tf.float32), y))


    data = data.map(lambda img, target: (img, tf.one_hot(target, depth=100)))

    data = data.map(lambda img, target: (tf.cast(img, tf.float32), target))
    #sloppy input normalization, just bringing image values from range [0, 255] to [-1, 1]
    data = data.map(lambda img, target: ((img/128.)-1., target))

    # resize_layer = Resizing(32, 32)

    # Apply the Resizing layer to the datasets
    # data = data.map(lambda img, target: (resize_layer(img), target))

    #cache this progress in memory
    data = data.cache()
    data = data.shuffle(1000)
    # data = data.batch(32)
    data = data.prefetch(20)

    return data


In [None]:

train_data = preprocess(train_dataset, augmentation_model = augmentation_model)
test_data = preprocess(test_dataset, augmentation_model = augmentation_model)

Instructions for updating:
Lambda fuctions will be no more assumed to be used in the statement where they are used, or at least in the same block. https://github.com/tensorflow/tensorflow/issues/56089


##CNN Model

In [None]:
######## NEUES MODEL yaYYYY ##########

#from tensorflow.python.framework.importer import import_graph_def_for_function

class BasicConv(tf.keras.Model):
    def __init__(self, L2_reg=0, dropout_rate=0, batch_norm=False): # penalties, dropout, batch normalization

        """ 
        subclass of the tf.keras.Model class, creates metrics
        Args:
            L2_reg(float) = regularizer that applies a L2 regularization penalty
            dropout_rate(float) = sets input units to 0 with a frequency dropout_rate
            batch_norm(bool): using tf.keras.layers.BatchNormalization() or None
        """  
        super(BasicConv, self).__init__()

        kernel_regularizer = tf.keras.regularizers.L2(L2_reg) if L2_reg else None # Wer das liest ist blöd

        self.dropout_rate = dropout_rate
        if self.dropout_rate: # include dropout_rate
           self.dropout_layer = tf.keras.layers.Dropout(dropout_rate)

        self.batch_norm = batch_norm
        if self.batch_norm: # include dropout_rate
           self.batch_norm_layer = tf.keras.layers.BatchNormalization()
        
        # creating a list of layers
        self.layer_list = [tf.keras.layers.Conv2D(input_shape=(32, 32, 3), kernel_size=(2, 2), padding='same', strides=(2, 2), filters=32),
                           #tf.keras.layers.Conv2D(input_shape=(32, 32, 3), kernel_size=(2, 2), padding='same', strides=(2, 2), filters=32),
                           #tf.keras.layers.Conv2D(input_shape=(32, 32, 3), kernel_size=(2, 2), padding='same', strides=(2, 2), filters=32),
                           tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(1, 1), padding='same'),
                           tf.keras.layers.Conv2D(kernel_size=(2, 2), padding='same', strides=(2, 2), filters=64),
                           #tf.keras.layers.Conv2D(kernel_size=(2, 2), padding='same', strides=(2, 2), filters=64),
                           #tf.keras.layers.Conv2D(kernel_size=(2, 2), padding='same', strides=(2, 2), filters=64),
                           tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(1, 1), padding='same'),
                           tf.keras.layers.Conv2D(kernel_size=(2, 2), padding='same', strides=(2, 2), filters=128),
                           #tf.keras.layers.Conv2D(kernel_size=(2, 2), padding='same', strides=(2, 2), filters=32),
                           #tf.keras.layers.Conv2D(kernel_size=(2, 2), padding='same', strides=(2, 2), filters=32),
                           tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(1, 1), padding='same'),
                           #tf.keras.layers.Conv2D(kernel_size=(2, 2), padding='same', strides=(2, 2), filters=256), #neu
                           #tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(1, 1), padding='same'), #neu
                           tf.keras.layers.Flatten(),
                           tf.keras.layers.Dense(256, activation='relu'),
                           tf.keras.layers.Dense(128, activation='relu'),
                           tf.keras.layers.Dense(100, activation='softmax')]

        if batch_norm:
          # layer_list with BatchNorm
          self.layer_list = [tf.keras.layers.Conv2D(input_shape=(32, 32, 3), kernel_size=(2, 2), padding='same', strides=(2, 2), filters=32),
                              tf.keras.layers.BatchNormalization(),
                              tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(1, 1), padding='same'),
                              tf.keras.layers.BatchNormalization(),
                              tf.keras.layers.Conv2D(kernel_size=(2, 2), padding='same', strides=(2, 2), filters=64),
                              tf.keras.layers.BatchNormalization(),
                              tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(1, 1), padding='same'),
                              tf.keras.layers.BatchNormalization(),
                              tf.keras.layers.Conv2D(kernel_size=(2, 2), padding='same', strides=(2, 2), filters=128),
                              tf.keras.layers.BatchNormalization(),
                              tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=(1, 1), padding='same'),
                              tf.keras.layers.BatchNormalization(),
                              tf.keras.layers.Flatten(),
                              tf.keras.layers.BatchNormalization(),
                              tf.keras.layers.Dense(256, activation='relu'),
                              tf.keras.layers.BatchNormalization(),
                              tf.keras.layers.Dense(128, activation='relu'),
                              tf.keras.layers.BatchNormalization(),
                              tf.keras.layers.Dense(100, activation='softmax')]

        '''
        if batch_norm: # if batch_norm, add tf.keras.layers.BatchNormalization() layers
          self.layer_list = self.batched_layer_list
        else:
          self.layer_list = self.normal_layer_list
        '''

        # optimizer, loss function and metrics
        self.metrics_list = [tf.keras.metrics.Mean(name="loss"),
                             tf.keras.metrics.CategoricalAccuracy(name="acc")]
        

        self.loss_function = tf.keras.losses.CategoricalCrossentropy()
        self.optimizer = tf.keras.optimizers.Adam(learning_rate=0.001) # optimizer Adam

            
    def call(self, x, training=False):

      ind=0  
      for layer in self.layer_list[:-1]:

        if self.batch_norm==True:
          
          # for every second layer <-> batchNorm layer
          if ind%2==1:
            ind+=1
            x = layer(x, training)
          else:
            ind+=1
            x = layer(x)
        
        else:
          x = layer(x)

        if self.dropout_rate:
            x = self.dropout_layer(x, training)

      return self.layer_list[-1](x)

    
    # 3. metrics property
    @property
    def metrics(self):
        return self.metrics_list
        # return a list with all metrics in the model

    # 4. reset all metrics objects
    def reset_metrics(self):
        """
        return a list with all metrics in the model
        """
        for metric in self.metrics:
            metric.reset_states()

    # added to get frobenius graph
    def compute_frobenius(self):
        """compute and return frobenius norm
        Returns:
            (tensor): frobenius norm
        """
        frobenius_norm = tf.zeros((1,))
        for var in self.trainable_variables:
            frobenius_norm += tf.norm(var, ord="euclidean")
        return frobenius_norm

    # 5. train step method
    def train_step(self, data):
        """
        training the network for once
        Args:
            data: input data (image with target)
        Returns:
            Return a dictionary mapping metric names to current value
        """

        img, label = data
        
        with tf.GradientTape() as tape:
            output = self(img, training=True)
            #label = tf.reshape(label, output.shape)
            #loss = self.loss_function(label, output)
            loss = self.loss_function(label, output) + tf.reduce_sum(self.losses)
            
        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        
        # update the state of the metrics according to loss and accuracy
        self.metrics[0].update_state(loss)
        self.metrics[1].update_state(label, output)

        # return a dictionary with metric names as keys and metric results as values
        return {m.name: m.result() for m in self.metrics}


    # 6. test_step method
    def test_step(self, data):

        """
        testing the network for once
        Args:
            data: input data (image with target)
        Returns:
            Return a dictionary mapping metric names to current value
        """

        img, label = data
        output = self(img, training=False)
        #label = tf.reshape(label, output.shape)
        #loss = self.loss_function(label, output)
        loss = self.loss_function(label, output) + tf.reduce_sum(self.losses)

        # update the state of the metrics according to loss and accuracy
        self.metrics[0].update_state(loss)
        self.metrics[1].update_state(label, output)
        #print({m.name: m.result() for m in self.metrics})

        # return a dictionary with metric names as keys and metric results as values
        return {m.name: m.result() for m in self.metrics}

Training

In [None]:
mymodel = BasicConv(batch_norm=True)

mymodel.compile(loss='categorical_crossentropy',
            optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
            metrics=['accuracy'])


In [None]:
 # Train the model
mymodel.fit(train_data, epochs=3)

# Test Step
test_loss, test_acc = mymodel.evaluate(test_data)
print("test accuracy: ", test_acc)

Epoch 1/10


InvalidArgumentError: ignored

In [None]:

############## ResNet50 MODEL  #############

resnet_model = tf.keras.applications.resnet.ResNet50(
       input_shape=(32,32,3), #.output_shapes()[0],
       include_top=True,
       weights=None,
       classes=100)

resnet_model.compile(
       optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), #optimizer=tf.keras.optimizers.SGD(lr=0.001),
      loss='categorical_crossentropy', #loss='binary_crossentropy',
       metrics=['categorical_accuracy'])


resnet_model.fit(train_cifar, batch_size=64, epochs=1) #callbacks=[logging_callback]) #batch_size=64, 

# Test Step
test_loss, test_acc = resnet_model.evaluate(test_cifar)
print("test accuracy: ", test_acc)


To load an hdf5 file into tensorflow as a tf.data.Dataset object, you should **define a tf.keras.utils.Sequence object** that reads the images and their labels. You can then treat this Sequence object **as a generator** and turn it into a tf dataset with **tf.data.Dataset.from_generator**(...) by specifying the output signature (dtypes and shapes of the returns of the get_item method in the Sequence object that you need to define. All this sounds more complicated than it is and you can look at examples for this online.