In [None]:
#@title Balance Dataset from folder
import os
import shutil
import numpy as np
from PIL import Image
from imblearn.over_sampling import RandomOverSampler
from multiprocessing import Pool
from tqdm import tqdm  # For progress bar

def balance_load_images_and_labels(dataset_path):
    """Load all images and their class labels from a dataset folder."""
    images = []
    labels = []
    image_paths = []

    class_dirs = [d for d in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, d))]
    for class_label, class_dir in enumerate(class_dirs):
        class_path = os.path.join(dataset_path, class_dir)
        for filename in os.listdir(class_path):
            if filename.endswith(".jpg") or filename.endswith(".png"):
                img_path = os.path.join(class_path, filename)
                image_paths.append(img_path)
                labels.append(class_dir)

    return image_paths, labels

def balance_load_image_and_convert(path, resize):
    """Loads an image, resizes it, and converts it to a NumPy array."""
    img = Image.open(path).convert('RGB')  # Convert image to RGB
    if resize:
        img = img.resize(resize)  # Resize image
    return np.array(img), path

def balance_save_image(img_array, dest_path):
    """Saves an image from a NumPy array to a specified path."""
    img = Image.fromarray(img_array)
    img.save(dest_path)

def balance_process_image(path_resize):
    """Helper function to be used with Pool to load and resize images."""
    path, resize = path_resize
    return balance_load_image_and_convert(path, resize)

def balance_dataset(dataset_path, resize=None):
    """Balances the dataset by resizing images, oversampling each class, and creates a new balanced dataset."""
    # Create a new dataset folder with "-balanced" suffix
    parent_dir, dataset_name = os.path.split(dataset_path)
    new_dataset_name = dataset_name + "-balanced"
    new_dataset_path = os.path.join(parent_dir, new_dataset_name)
    os.makedirs(new_dataset_path, exist_ok=True)

    # Load all images and labels
    print("Loading images and labels...")
    image_paths, labels = balance_load_images_and_labels(dataset_path)

    # Prepare tuples of (image path, resize) for each image
    path_resize_tuples = [(path, (resize, resize)) for path in image_paths]

    # Load and resize images into a NumPy array using multiprocessing with a progress bar
    print(f"Loading and resizing {len(image_paths)} images...")
    with Pool() as pool:
        images_with_paths = list(tqdm(pool.imap(balance_process_image, path_resize_tuples), total=len(image_paths)))

    images = np.array([item[0] for item in images_with_paths])  # Extract images
    paths = [item[1] for item in images_with_paths]  # Extract paths

    n_samples, height, width, channels = images.shape
    flat_images = images.reshape(n_samples, -1)  # Flatten images for oversampling

    # Apply RandomOverSampler across all classes
    print("Applying RandomOverSampler to balance the dataset...")
    ros = RandomOverSampler()
    flat_images_resampled, labels_resampled = ros.fit_resample(flat_images, np.array(labels))

    # Reshape images back to original format
    images_resampled = flat_images_resampled.reshape(-1, height, width, channels)

    # Save resampled images into appropriate class folders in the new dataset
    print("Saving resampled images...")
    for img_array, label in tqdm(zip(images_resampled, labels_resampled), total=len(images_resampled)):
        class_dir = os.path.join(new_dataset_path, label)
        os.makedirs(class_dir, exist_ok=True)

        img_index = len(os.listdir(class_dir))  # To create unique filenames
        dest_path = os.path.join(class_dir, f"resampled_{img_index}.jpg")
        balance_save_image(img_array, dest_path)

    print(f"Balanced dataset created at {new_dataset_path}")

dataset_path = '/mnt/e/Universitate/Doctorat/2024/Articol-Jurnal/skin-cancer/skins'
balance_dataset(dataset_path, 32)

In [None]:
#@title Dataset from folder

import os
gpu_device_number = 0
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_device_number)
import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
print("Available GPUs:", gpus)
current_gpu = tf.config.experimental.get_visible_devices('GPU')
print("Current GPU:", current_gpu)

if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)  # Prevents TensorFlow from consuming all GPU memory at once
    except RuntimeError as e:
        print(e)

import numpy as np
import shutil
import time as ti
from multiprocessing import Pool
from functools import partial
from tqdm import tqdm
from itertools import chain
from PIL import Image
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import random
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from imblearn.over_sampling import RandomOverSampler

def save_image(idx, path, path_export, labels, classes, img, copy_only = True):
    current_class = classes[np.where(labels[idx] == 1)[0][0]]
    class_dir = os.path.join(path_export, current_class)
    channels = img.mode
    r = str(random.randrange(1,1000))
    if copy_only == False:
      im_path = os.path.join(class_dir, f"{str(idx)+r}-augment.png")
      if channels != "RGB":
        print(im_path)
      if not os.path.exists(im_path):
        img.save(im_path)
    else:
      img_name = os.path.basename(path)
      im_path = os.path.join(class_dir, img_name)
      if channels != "RGB":
        print(im_path)
      if not os.path.exists(im_path):
        shutil.copy(path, im_path)

def load_image(idx, file_path, target_size, labels, classes, augment, path_export = None):
    images = []
    copy_only = True

    im = Image.open(file_path)
    width, height = im.size
    if width != target_size[0] or height != target_size[1]:
      im = im.resize(target_size, Image.LANCZOS)
      copy_only = False
    images.append(np.array(im))

    if path_export != None:
      if copy_only == True:
        save_image(idx, file_path, path_export, labels, classes, im)
      else:
        save_image(idx, file_path, path_export, labels, classes, im, False)

    if augment > 0:
      # datagen_augment = ImageDataGenerator(rotation_range=10, width_shift_range=0.2, height_shift_range=0.2, zoom_range=0.2, horizontal_flip=True, brightness_range=[0.2,1.2])
      datagen_augment = ImageDataGenerator(rotation_range=60, horizontal_flip=True, brightness_range=[1,1.2])
      np_image = np.asarray(im)
      np_image = np.reshape(np_image, (1,) + np_image.shape)
      augment_generator = datagen_augment.flow(np_image, batch_size=1)
      for i in range(augment-1):
        gen_img = next(augment_generator)[0].astype(np.uint8)
        images.append(gen_img.squeeze())
        if path_export != None:
          im = Image.fromarray(gen_img.squeeze())
          save_image(idx, file_path, path_export, labels, classes, im, False)

    return idx, images

def load_image_with_progress(idx, image, target_size, labels, classes, augment,  path_export = None):
    return load_image(idx, image, target_size=target_size, labels=labels, classes=classes, augment=augment, path_export = path_export)

def load_images_with_progress(images, target_size, labels, classes, augment, path_export=None):
    with Pool(processes=6) as pool:
        partial_func = partial(load_image_with_progress_wrapper, target_size=target_size, labels=labels, classes=classes, augment=augment, path_export=path_export)
        results = list(tqdm(pool.imap(partial_func, enumerate(images)), total=len(images)))
        indices, processed_images = zip(*results)
    return list(chain.from_iterable(processed_images))

def load_image_with_progress_wrapper(args, target_size, labels, classes, augment, path_export):
    idx, image = args
    return load_image_with_progress(idx, image, target_size=target_size, labels=labels, classes=classes, augment=augment, path_export=path_export)

def normalize(image, label):
  normalization_layer = tf.keras.layers.Rescaling(1./255)
  return normalization_layer(image), label

def dataset(path, reload=False, width=0, augment=0, batch_size = 20, load_from_dir = False):
    t_start = ti.time()

    target_size = (width, width)
    folder = os.path.basename(path)

    if augment > 0:
        if augment > 2:
          folder += "-" + str(augment) + "-augmented"
        else:
          folder += "-augmented"
        datagen_augment = ImageDataGenerator(
            rotation_range=360, horizontal_flip=True, brightness_range=[1, 1.2]
        )

    if load_from_dir == False:
      folder += "-test"

    images = []
    original_labels = []
    labels = []
    total_class_images = []
    classes = []
    num_classes = 0
    total_images = 0
    idx = -1

    for root, dirs, files in os.walk(path):
        if num_classes == 0 and dirs:
            num_classes = len(dirs)
            classes = dirs
        else:
            idx += 1
            total_class_images.append(0)

        empty_label = np.zeros(num_classes)
        empty_label[idx] = 1

        for img in files:
            _, file_extension = os.path.splitext(img)
            if file_extension.lower() in (".png", ".jpg", ".jpeg"):
                total_images += 1
                total_class_images[idx] += 1
                file_path = os.path.join(root, img)
                images.append(file_path)
                labels.append(empty_label)
                original_labels.append(empty_label)
                if augment > 0:
                  for i in range(augment-1):
                    labels.append(empty_label)
                    total_images += 1
                    total_class_images[idx] += 1

    if width > 0:
        folder += f"-{width}x{width}"

    path_export = os.path.join(os.path.dirname(path), folder)
    if reload and os.path.exists(path_export):
        delete_all(path_export)

    print("Dataset path:", path_export)
    print("Total images:", total_images)
    print("Target Width:", width)
    print("Target height:", width)

    x_train, x_test, y_train, y_test = train_test_split(images, original_labels, test_size=0.2)
    x_valid, x_test, y_valid, y_test = train_test_split(x_test, y_test, test_size=0.5)

    # Multiprocessing to load images
    if load_from_dir == False:
      x_train = load_images_with_progress(x_train, target_size=target_size, labels=y_train, classes=classes, augment=augment)
      x_valid = load_images_with_progress(x_valid, target_size=target_size, labels=y_valid, classes=classes, augment=augment)
      for current_class in classes:
        class_dir = os.path.join(path_export, current_class)
        if not os.path.exists(class_dir):
            os.makedirs(class_dir, True)
      x_test = load_images_with_progress(x_test, target_size=target_size, labels=y_test, classes=classes, augment=augment, path_export=path_export)

      # Convert to numpy arrays
      x_train = np.array(x_train).astype('float32') / 255
      x_valid = np.array(x_valid).astype('float32') / 255
      x_test = np.array(x_test).astype('float32') / 255

      y_train = np.array(y_train).astype('float32')
      y_valid = np.array(y_valid).astype('float32')
      y_test = np.array(y_test).astype('float32')

    else:
      if not os.path.exists(path_export + '/train'):
        for current_class in classes:
          class_dir = os.path.join(path_export + '/train', current_class)
          if not os.path.exists(class_dir):
              os.makedirs(class_dir, True)
        load_images_with_progress(x_train, target_size=target_size, labels=y_train, classes=classes, augment=augment, path_export=path_export + '/train')
        for current_class in classes:
          class_dir = os.path.join(path_export + '/valid', current_class)
          if not os.path.exists(class_dir):
              os.makedirs(class_dir, True)
        load_images_with_progress(x_valid, target_size=target_size, labels=y_valid, classes=classes, augment=augment, path_export=path_export + '/valid')
        for current_class in classes:
          class_dir = os.path.join(path_export + '/test', current_class)
          if not os.path.exists(class_dir):
              os.makedirs(class_dir, True)
        load_images_with_progress(x_test, target_size=target_size, labels=y_test, classes=classes, augment=augment, path_export=path_export + '/test')

      train_ds = tf.keras.utils.image_dataset_from_directory(path_export + '/train',label_mode="categorical", image_size=target_size, batch_size=batch_size)
      normalized_train_ds = train_ds.map(normalize)
      normalized_train_ds = normalized_train_ds.prefetch(tf.data.experimental.AUTOTUNE)

      valid_ds = tf.keras.utils.image_dataset_from_directory(path_export + '/valid',label_mode="categorical", image_size=target_size, batch_size=batch_size)
      normalized_valid_ds = valid_ds.map(normalize)
      normalized_valid_ds = normalized_valid_ds.prefetch(tf.data.experimental.AUTOTUNE)

      test_ds = tf.keras.utils.image_dataset_from_directory(path_export + '/test',label_mode="categorical", image_size=target_size, batch_size=batch_size)
      normalized_test_ds = test_ds.map(normalize)
      normalized_test_ds = normalized_test_ds.prefetch(tf.data.experimental.AUTOTUNE)

    t_stop = ti.time()
    print('Dataset loading time:', round((t_stop - t_start), 2), 'seconds')

    if load_from_dir == True:
      return normalized_train_ds, normalized_valid_ds, normalized_test_ds, width, width, folder, num_classes
    else:
      return (x_train, y_train), (x_valid, y_valid), (x_test, y_test), width, width, folder, num_classes

def delete_all(folder):
    for filename in os.listdir(folder):
        file_path = os.path.join(folder, filename)
        try:
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.unlink(file_path)
            elif os.path.isdir(file_path):
                shutil.rmtree(file_path)
        except Exception as e:
            print("Failed to delete %s. Reason: %s" % (file_path, e))
    try:
        shutil.rmtree(folder)
    except Exception as e:
        print("Failed to delete %s. Reason: %s" % (folder, e))

# Example usage:
# dataset_path = '/path/to/your/dataset'
# (x_train, y_train), (x_valid, y_valid), (x_test, y_test), target_width, target_height, folder, num_classes = dataset(dataset_path, reload=True, width=224, augment=2)


In [None]:
#@title Dataset from Tensorflow API

import os
gpu_device_number = 0
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_device_number)
import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
print("Available GPUs:", gpus)
current_gpu = tf.config.experimental.get_visible_devices('GPU')
print("Current GPU:", current_gpu)

import tensorflow as tf
import tensorflow_datasets as tfds
import os
import numpy as np
import time as ti
from PIL import Image

def preprocess_data(image, label, channels, num_classes, size, resize):
    img_weight, img_height, img_channel = image.shape
    if resize == True:
      image = tf.image.resize(image, (size, size))
    if img_channel != channels:
      image = tf.concat(channels, axis=-1)
    label = tf.one_hot(label, depth=num_classes)
    return image, label

def dataset(dataset, path="", channels = 3, augment=0, split = ['train','validation']):
    t_start = ti.time()

    train, test = tfds.load(dataset, split=split, as_supervised = True)
    if augment > 1:
      train.repeat(augment)
      test.repeat(augment)

    total_train_images = len(train)
    total_test_images = len(test)
    total_images = total_train_images + total_test_images

    test_labels = [label.numpy() for _, label in test]
    num_classes = len(set(test_labels))

    valid = test.skip(total_test_images // 2)
    test = test.take(total_test_images // 2)

    width, height = 0, 0
    resize = False
    for image, label in test:
        width, height, _ = image.shape
        break
    if width != height:
      resize = True
      if width > height:
        width = height
      else:
        height = width

    folder = dataset
    if width > 0:
        folder += f"-{width}x{height}"

    print("Number of classes:", num_classes)
    print("Total images:", total_images)
    print("Target Width:", width)
    print("Target height:", height)

    train_preprocessed = train.map(lambda image, label: preprocess_data(image, label, channels, num_classes, width, resize))
    test_preprocessed = test.map(lambda image, label: preprocess_data(image, label, channels, num_classes, width, resize))
    valid_preprocessed = valid.map(lambda image, label: preprocess_data(image, label, channels, num_classes, width, resize))

    if path:
      path_export = os.path.join(path, folder)
      print("Dataset export path:", path_export)
      if not os.path.exists(path_export):
          os.makedirs(path_export)
          no = 0;
          for image, label in test_preprocessed:
            current_class = str(tf.argmax(label, axis=-1).numpy())
            class_dir = os.path.join(path_export, current_class)
            if not os.path.exists(class_dir):
                os.makedirs(class_dir)
            img_array = image.numpy()
            if np.min(img_array) >= 0 and np.max(img_array) <= 1:
                img_array = (img_array * 255).astype(np.uint8)
            else:
                img_array = img_array.astype(np.uint8)
            im = Image.fromarray(img_array)
            im.save(os.path.join(class_dir, f"{no}.png"))
            no += 1

    # Unpack the preprocessed datasets
    x_train, y_train = zip(*train_preprocessed)
    x_test, y_test = zip(*test_preprocessed)
    x_valid, y_valid = zip(*valid_preprocessed)

    # Convert to numpy arrays
    x_train = np.array(x_train).astype('float32') / 255
    x_valid = np.array(x_valid).astype('float32') / 255
    x_test = np.array(x_test).astype('float32') / 255

    y_train = np.array(y_train).astype('float32')
    y_valid = np.array(y_valid).astype('float32')
    y_test = np.array(y_test).astype('float32')

    t_stop = ti.time()
    print('Dataset loading time:', round((t_stop - t_start), 2), 'seconds')

    return (x_train, y_train), (x_valid, y_valid), (x_test, y_test), width, height, folder, num_classes

# Example usage:
# dataset_export_path = '/path/to/your/dataset'
# (x_train, y_train), (x_valid, y_valid), (x_test, y_test), target_width, target_height, folder, num_classes = dataset('mnist', path=dataset_export_path, channels = 3, augment = 2)


In [None]:
#@title Training
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
import matplotlib.pyplot as plt
import openpyxl
import os
import time as ti

def train(model_name, model, epochs, train_data, validation_data, test_data, dataset_name, dataset_path, batch_size = 20, use_generator_from_dir = False, patience=20):

  folder = '';
  path_list = dataset_path.split('/')
  if len(path_list) > 0:
    folder = path_list.pop(-1)
  else:
    folder = 'dataset'

  path_export = '/'.join(path_list)

  early_stopping = EarlyStopping(monitor='val_accuracy', patience=patience, restore_best_weights=True, verbose=0)
  checkpoint = ModelCheckpoint(path_export + '/trained-models-' + folder + '/' + model_name + '-' + dataset_name + ".keras", monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')

  if use_generator_from_dir:
    t1=ti.time()
    history = model.fit(train_data, validation_data=validation_data, epochs=epochs, shuffle=True, verbose=1, batch_size=batch_size, callbacks=[checkpoint, early_stopping])
    t2=ti.time()

    t_start = ti.time()
    score = model.evaluate(test_data, verbose=0)
    t_stop = ti.time()
  else:
    print("Input shape:", train_data[0].shape)
    t1=ti.time()
    history = model.fit(train_data[0], train_data[1], validation_data=validation_data, epochs=epochs, shuffle=True, verbose=1, batch_size=batch_size, callbacks=[checkpoint, early_stopping])
    t2=ti.time()

    t_start = ti.time()
    score = model.evaluate(test_data[0],test_data[1], verbose=0)
    t_stop = ti.time()

  if not os.path.exists(path_export + '/centralizator-' + folder + '.xlsx'):
    workbook = openpyxl.Workbook()
  else:
    workbook = openpyxl.load_workbook(path_export + '/centralizator-' + folder + '.xlsx')

  sheet_name = dataset_name

  if 'Sheet' in workbook.sheetnames:
    del workbook['Sheet']

  if not sheet_name in workbook.sheetnames:
    workbook.create_sheet(sheet_name)

  sheet = workbook[sheet_name]

  model_idx = ''
  for i in range(2, len(sheet['A'])+1):
    if sheet['A' + str(i)].value == model_name:
      model_idx = str(i)

  if model_idx == '':
    model_idx = str(len(sheet['A']) + 1)

  accuracy = round(100*score[1],2)
  sheet['P' + model_idx] = str(accuracy) + "%, " + str(ti.ctime(ti.time()))
  print('Test accuracy: ' + str(accuracy) + "%")

  if sheet['D' + model_idx].value is None or float(sheet['D' + model_idx].value) < accuracy:
    if not os.path.exists(path_export + '/trained-models-' + folder):
      os.mkdir(path_export + '/trained-models-' + folder)

    model = tf.keras.models.load_model(path_export + '/trained-models-' + folder + '/' + model_name + '-' + dataset_name + ".keras")

    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    converter.target_spec.supported_ops = [
      tf.lite.OpsSet.TFLITE_BUILTINS, # enable TensorFlow Lite ops.
      tf.lite.OpsSet.SELECT_TF_OPS # enable TensorFlow ops.
    ]
    tflite_model = converter.convert()
    open(path_export + '/trained-models-' + folder + '/' + model_name + '-' + dataset_name + '.tflite', "wb").write(tflite_model)

    if use_generator_from_dir:
      train_data = train_data.take(-1)
      train_dataset_length = tf.data.experimental.cardinality(train_data).numpy() * batch_size
      validation_data = validation_data.take(-1)
      valid_dataset_length = tf.data.experimental.cardinality(validation_data).numpy() * batch_size
      test_data = test_data.take(-1)
      test_dataset_length = tf.data.experimental.cardinality(test_data).numpy() * batch_size
    else:
      train_dataset_length = len(train_data)
      valid_dataset_length = len(validation_data)
      test_dataset_length = len(test_data)

    sheet['A1'] = 'Model'
    sheet['B1'] = 'Training Acc [%]'
    sheet['C1'] = 'Valid Acc [%]'
    sheet['D1'] = 'Test Acc (on Colab) [%]'
    sheet['E1'] = 'Latency (on Colab) [ms]'
    sheet['F1'] = 'Training time [s]'
    sheet['G1'] = 'Validation time [s]'

    sheet['H1'] = 'Params (M)'
    sheet['I1'] = 'TFLite size [MB]'
    sheet['J1'] = 'Model size [MB]'
    sheet['K1'] = 'FLOPS (M)'

    sheet['L1'] = 'Test Acc (on Android) [%]'
    sheet['M1'] = 'Latency (on Android) [ms]'
    sheet['N1'] = 'Test time (on Android) [s]'

    sheet['O1'] = 'Updated at'
    sheet['P1'] = 'Last try'
    sheet['Q1'] = 'Observatii'

    sheet['A' + model_idx] = model_name
    sheet['B' + model_idx] = round(100*max(history.history['accuracy']),2)
    sheet['C' + model_idx] = round(100*max(history.history['val_accuracy']),2)
    sheet['D' + model_idx] = accuracy
    sheet['E' + model_idx] = round(((t_stop-t_start)/test_dataset_length)*1000,2)
    sheet['F' + model_idx] = round((t2-t1),2)
    sheet['G' + model_idx] = round((t_stop-t_start),2)

    sheet['H' + model_idx] = round(model.count_params()/1000000,2)
    sheet['I' + model_idx] = round(os.path.getsize(path_export + '/trained-models-' + folder + '/' + model_name + '-' + dataset_name + '.tflite')/1024/1024,2)
    sheet['J' + model_idx] = round(os.path.getsize(path_export + '/trained-models-' + folder + '/' + model_name + '-' + dataset_name + '.keras')/1024/1024,2)
    sheet['K' + model_idx] = round(get_flops(path_export + '/trained-models-' + folder + '/' + model_name + '-' + dataset_name + '.keras')/1000000,2)

    sheet['O' + model_idx] = ti.ctime(ti.time())
    sheet['Q' + model_idx] = "Total images: " + str(train_dataset_length+valid_dataset_length+test_dataset_length) + ", Train images: " + str(train_dataset_length) + ", Validation images: " + str(valid_dataset_length) + ", Test images: " + str(test_dataset_length)

    if isinstance(sheet['B2'].value, int) or isinstance(sheet['B2'].value, float):
      valoare_maxima = float(sheet['B2'].value)
      indice_maxim = 2

      for i in range(2, len(sheet['B'])+1):
        sheet['B' + str(i)].font = openpyxl.styles.Font(bold=False)
        if isinstance(sheet['B' + str(i)].value, int) or isinstance(sheet['B' + str(i)].value, float):
          valoare = float(sheet['B' + str(i)].value)
          if valoare is not None and valoare > valoare_maxima:
              valoare_maxima = valoare
              indice_maxim = i

      sheet['B' + str(indice_maxim)].font = openpyxl.styles.Font(bold=True)

    if isinstance(sheet['D2'].value, int) or isinstance(sheet['D2'].value, float):
      valoare_maxima = float(sheet['D2'].value)
      indice_maxim = 2

      for i in range(2, len(sheet['D'])+1):
        sheet['D' + str(i)].font = openpyxl.styles.Font(bold=False)
        if isinstance(sheet['D' + str(i)].value, int) or isinstance(sheet['D' + str(i)].value, float):
          valoare = float(sheet['D' + str(i)].value)
          if valoare is not None and valoare > valoare_maxima:
              valoare_maxima = valoare
              indice_maxim = i

      sheet['D' + str(indice_maxim)].font = openpyxl.styles.Font(bold=True)

    if isinstance(sheet['E2'].value, int) or isinstance(sheet['E2'].value, float):
      valoare_minima = float(sheet['E2'].value)
      indice_minim = 2

      for i in range(2, len(sheet['E'])+1):
        sheet['E' + str(i)].font = openpyxl.styles.Font(bold=False)
        if isinstance(sheet['E' + str(i)].value, int) or isinstance(sheet['E' + str(i)].value, float):
          valoare = float(sheet['E' + str(i)].value)
          if valoare is not None and valoare < valoare_minima:
              valoare_minima = valoare
              indice_minim = i

      sheet['E' + str(indice_minim)].font = openpyxl.styles.Font(bold=True)

  workbook.save(path_export + '/centralizator-' + folder + '.xlsx')

  val_accuracy = history.history['val_accuracy']
  val_loss = history.history['val_loss']
  epochs = range(1, len(val_accuracy) + 1)
  # Create subplots for accuracy and loss
  plt.figure(figsize=(12, 5))

  # Plot Validation Accuracy
  plt.subplot(1, 2, 1)
  plt.plot(epochs, val_accuracy, 'b', label='Validation Accuracy')
  plt.title('Validation Accuracy')
  plt.xlabel('Epochs')
  plt.ylabel('Accuracy')
  plt.legend()

  # Plot Validation Loss
  plt.subplot(1, 2, 2)
  plt.plot(epochs, val_loss, 'r', label='Validation Loss')
  plt.title('Validation Loss')
  plt.xlabel('Epochs')
  plt.ylabel('Loss')
  plt.legend()

  # Show the plots
  plt.tight_layout()
  plt.show()

def get_size(start_path = '.'):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(start_path):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            # skip if it is symbolic link
            if not os.path.islink(fp):
                total_size += os.path.getsize(fp)

    return total_size

def get_flops(model_path):
    session = tf.compat.v1.Session()
    graph = tf.compat.v1.get_default_graph()

    with graph.as_default():
        with session.as_default():
            model = tf.keras.models.load_model(model_path)

            run_meta = tf.compat.v1.RunMetadata()
            opts = tf.compat.v1.profiler.ProfileOptionBuilder.float_operation()

            flops = tf.compat.v1.profiler.profile(graph=graph, run_meta=run_meta, cmd='op', options=opts)

            return flops.total_float_ops

In [None]:
#@title FFTConv2D Tensorflow Keras

import functools

from tensorflow.python.eager import context
from tensorflow.python.framework import tensor_shape
from tensorflow.python.keras import activations
from tensorflow.python.keras import backend
from tensorflow.python.keras import constraints
from tensorflow.python.keras import initializers
from tensorflow.python.keras import regularizers
from tensorflow.keras.layers import Layer
from tensorflow.keras.layers import InputSpec
# imports for backwards namespace compatibility
# pylint: disable=unused-import
from tensorflow.python.keras.layers.pooling import AveragePooling1D
from tensorflow.python.keras.layers.pooling import AveragePooling2D
from tensorflow.python.keras.layers.pooling import AveragePooling3D
from tensorflow.python.keras.layers.pooling import MaxPooling1D
from tensorflow.python.keras.layers.pooling import MaxPooling2D
from tensorflow.python.keras.layers.pooling import MaxPooling3D
# pylint: enable=unused-import
from tensorflow.python.keras.utils import conv_utils
from tensorflow.python.keras.utils import tf_utils
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import array_ops_stack
from tensorflow.python.ops import nn
from tensorflow.python.ops import nn_ops
# pylint: disable=g-classes-have-attributes
from tensorflow.keras.utils import register_keras_serializable

@register_keras_serializable()
class FFTConv2D(Layer):

  def __init__(self,
               filters,
               kernel_size,
               strides=1,
               padding='valid',
               data_format=None,
               dilation_rate=1,
               groups=1,
               activation=None,
               use_bias=True,
               kernel_initializer='glorot_uniform',
               bias_initializer='zeros',
               kernel_regularizer=None,
               bias_regularizer=None,
               activity_regularizer=None,
               kernel_constraint=None,
               bias_constraint=None,
               trainable=True,
               name=None,
               conv_op=None,
               **kwargs):
    super(FFTConv2D, self).__init__(
        trainable=trainable,
        name=name,
        activity_regularizer=regularizers.get(activity_regularizer),
        **kwargs)
    rank = 2
    self.rank = rank

    if isinstance(filters, float):
      filters = int(filters)
    if filters is not None and filters < 0:
      raise ValueError(f'Received a negative value for `filters`.'
                       f'Was expecting a positive value, got {filters}.')
    self.filters = filters
    self.groups = groups or 1
    self.kernel_size = conv_utils.normalize_tuple(
        kernel_size, rank, 'kernel_size')
    self.strides = conv_utils.normalize_tuple(strides, rank, 'strides')
    self.padding = conv_utils.normalize_padding(padding)
    self.data_format = conv_utils.normalize_data_format(data_format)
    self.dilation_rate = conv_utils.normalize_tuple(
        dilation_rate, rank, 'dilation_rate')

    self.activation = activations.get(activation)
    self.use_bias = use_bias
    self.conv_op = conv_op

    self.kernel_initializer = initializers.get(kernel_initializer)
    self.bias_initializer = initializers.get(bias_initializer)
    self.kernel_regularizer = regularizers.get(kernel_regularizer)
    self.bias_regularizer = regularizers.get(bias_regularizer)
    self.kernel_constraint = constraints.get(kernel_constraint)
    self.bias_constraint = constraints.get(bias_constraint)
    self.input_spec = InputSpec(min_ndim=self.rank + 2)

    self._validate_init()
    self._is_causal = self.padding == 'causal'
    self._channels_first = self.data_format == 'channels_first'
    self._tf_data_format = conv_utils.convert_data_format(
        self.data_format, self.rank + 2)

  def _validate_init(self):
    if self.filters is not None and self.filters % self.groups != 0:
      raise ValueError(
          'The number of filters must be evenly divisible by the number of '
          'groups. Received: groups={}, filters={}'.format(
              self.groups, self.filters))

    if not all(self.kernel_size):
      raise ValueError('The argument `kernel_size` cannot contain 0(s). '
                       'Received: %s' % (self.kernel_size,))

    if not all(self.strides):
      raise ValueError('The argument `strides` cannot contains 0(s). '
                       'Received: %s' % (self.strides,))

    if (self.padding == 'causal' and not isinstance(self,
                                                    (FFTConv1D, SeparableConv1D))):
      raise ValueError('Causal padding is only supported for `Conv1D`'
                       'and `SeparableConv1D`.')

  def build(self, input_shape):
    input_shape = tensor_shape.TensorShape(input_shape)
    input_channel = self._get_input_channel(input_shape)
    if input_channel % self.groups != 0:
      raise ValueError(
          'The number of input channels must be evenly divisible by the number '
          'of groups. Received groups={}, but the input has {} channels '
          '(full input shape is {}).'.format(self.groups, input_channel,
                                             input_shape))
    kernel_shape = self.kernel_size + (input_channel // self.groups,
                                       self.filters)

    self.kernel = self.add_weight(
        name='kernel',
        shape=kernel_shape,
        initializer=self.kernel_initializer,
        regularizer=self.kernel_regularizer,
        constraint=self.kernel_constraint,
        trainable=True,
        dtype=self.dtype)
    if self.use_bias:
      self.bias = self.add_weight(
          name='bias',
          shape=(self.filters,),
          initializer=self.bias_initializer,
          regularizer=self.bias_regularizer,
          constraint=self.bias_constraint,
          trainable=True,
          dtype=self.dtype)
    else:
      self.bias = None
    channel_axis = self._get_channel_axis()
    self.input_spec = InputSpec(min_ndim=self.rank + 2,
                                axes={channel_axis: input_channel})

    # Convert Keras formats to TF native formats.
    if self.padding == 'causal':
      tf_padding = 'VALID'  # Causal padding handled in `call`.
    elif isinstance(self.padding, str):
      tf_padding = self.padding.upper()
    else:
      tf_padding = self.padding
    tf_dilations = list(self.dilation_rate)
    tf_strides = list(self.strides)

    tf_op_name = self.__class__.__name__
    if tf_op_name == 'FFTConv1D':
      tf_op_name = 'fftconv1d'  # Backwards compat.

    self._convolution_op = functools.partial(
        nn_ops.convolution_v2,
        strides=tf_strides,
        padding=tf_padding,
        dilations=tf_dilations,
        data_format=self._tf_data_format,
        name=tf_op_name)
    self.built = True

  def call(self, inputs):
    input_shape = inputs.shape

    if self._is_causal:  # Apply causal padding to inputs for Conv1D.
      inputs = array_ops.pad(inputs, self._compute_causal_padding(inputs))

    outputs = self.fft_op(inputs, self.kernel)

    if self.use_bias:
      output_rank = outputs.shape.rank
      if self.rank == 1 and self._channels_first:
        # nn.bias_add does not accept a 1D input tensor.
        bias = array_ops.reshape(self.bias, (1, self.filters, 1))
        outputs += bias
      else:
        # Handle multiple batch dimensions.
        if output_rank is not None and output_rank > 2 + self.rank:

          def _apply_fn(o):
            return nn.bias_add(o, self.bias, data_format=self._tf_data_format)

          outputs = conv_utils.squeeze_batch_dims(
              outputs, _apply_fn, inner_rank=self.rank + 1)
        else:
          outputs = nn.bias_add(
              outputs, self.bias, data_format=self._tf_data_format)

    if not context.executing_eagerly():
      # Infer the static output shape:
      out_shape = self.compute_output_shape(input_shape)
      outputs.set_shape(out_shape)

    if self.activation is not None:
      return self.activation(outputs)
    return outputs

  @tf.function
  def fft_op(self, batch_images, kernels_outputs, strides=(1, 1), padding='valid', dilation_rate=(1, 1), data_format='channels_last'):
    kernels = tf.transpose(kernels_outputs, perm=[3,0,1,2])

    def process_image(image):
      channels = tf.transpose(image, perm=[2,0,1])

      def process_kernel(kernel):
        kernel_channels = tf.transpose(kernel, perm=[2,0,1])

        def process_channel(args):
          channel, kernel_channel = args

          channel = tf.cast(channel, dtype=tf.float32)
          kernel_channel = tf.cast(kernel_channel, dtype=tf.float32)

          if padding == 'same':
              pad_height = max((kernel_channel.shape[0] - 1) // 2, 0)
              pad_width = max((kernel_channel.shape[1] - 1) // 2, 0)
              channel = tf.pad(channel, paddings=[[pad_height, pad_height], [pad_width, pad_width]])
          if channel.shape[0] is not None and channel.shape[1] is not None:
            if channel.shape[0] > kernel_channel.shape[0] or channel.shape[1] > kernel_channel.shape[1]:
              pad_height_kernel = max(channel.shape[0] - kernel_channel.shape[0], 0)
              pad_width_kernel = max(channel.shape[1] - kernel_channel.shape[1], 0)
              kernel_channel = tf.pad(kernel_channel, paddings=[[0, pad_height_kernel], [0, pad_width_kernel]])
            elif channel.shape[0] < kernel_channel.shape[0] or channel.shape[1] < kernel_channel.shape[1]:
              kernel_channel = kernel_channel[:channel.shape[0], :channel.shape[1]]

          signal_fft = tf.signal.rfft2d(channel)
          kernel_fft = tf.signal.rfft2d(kernel_channel)

          result_fft = signal_fft * kernel_fft

          return tf.signal.irfft2d(result_fft)

        output = tf.concat(tf.vectorized_map(process_channel, (channels, kernel_channels)), axis=-1)
        output = tf.transpose(output, perm=[1,2,0])
        return tf.reduce_sum(output, axis=-1)

      return tf.concat(tf.vectorized_map(process_kernel, kernels), axis=-1)

    result_batch = tf.concat(tf.vectorized_map(process_image, batch_images), axis=-1)
    result_batch = tf.transpose(result_batch, perm=[0,2,3,1])

    return result_batch

  def _spatial_output_shape(self, spatial_input_shape):
    return [
        conv_utils.conv_output_length(
            length,
            self.kernel_size[i],
            padding=self.padding,
            stride=self.strides[i],
            dilation=self.dilation_rate[i])
        for i, length in enumerate(spatial_input_shape)
    ]

  def compute_output_shape(self, input_shape):
    input_shape = tensor_shape.TensorShape(input_shape).as_list()
    batch_rank = len(input_shape) - self.rank - 1
    if self.data_format == 'channels_last':
      return tensor_shape.TensorShape(
          input_shape[:batch_rank]
          + self._spatial_output_shape(input_shape[batch_rank:-1])
          + [self.filters])
    else:
      return tensor_shape.TensorShape(
          input_shape[:batch_rank] + [self.filters] +
          self._spatial_output_shape(input_shape[batch_rank + 1:]))

  def _recreate_conv_op(self, inputs):  # pylint: disable=unused-argument
    return False

  def get_config(self):
    config = {
        'filters':
            self.filters,
        'kernel_size':
            self.kernel_size,
        'strides':
            self.strides,
        'padding':
            self.padding,
        'data_format':
            self.data_format,
        'dilation_rate':
            self.dilation_rate,
        'groups':
            self.groups,
        'activation':
            activations.serialize(self.activation),
        'use_bias':
            self.use_bias,
        'kernel_initializer':
            initializers.serialize(self.kernel_initializer),
        'bias_initializer':
            initializers.serialize(self.bias_initializer),
        'kernel_regularizer':
            regularizers.serialize(self.kernel_regularizer),
        'bias_regularizer':
            regularizers.serialize(self.bias_regularizer),
        'activity_regularizer':
            regularizers.serialize(self.activity_regularizer),
        'kernel_constraint':
            constraints.serialize(self.kernel_constraint),
        'bias_constraint':
            constraints.serialize(self.bias_constraint),
        'trainable':
            self.trainable
    }
    base_config = super(FFTConv2D, self).get_config()
    full_config = dict(list(base_config.items()) + list(config.items()))
    return full_config

  def _compute_causal_padding(self, inputs):
    """Calculates padding for 'causal' option for 1-d conv layers."""
    left_pad = self.dilation_rate[0] * (self.kernel_size[0] - 1)
    if getattr(inputs.shape, 'ndims', None) is None:
      batch_rank = 1
    else:
      batch_rank = len(inputs.shape) - 2
    if self.data_format == 'channels_last':
      causal_padding = [[0, 0]] * batch_rank + [[left_pad, 0], [0, 0]]
    else:
      causal_padding = [[0, 0]] * batch_rank + [[0, 0], [left_pad, 0]]
    return causal_padding

  def _get_channel_axis(self):
    if self.data_format == 'channels_first':
      return -1 - self.rank
    else:
      return -1

  def _get_input_channel(self, input_shape):
    channel_axis = self._get_channel_axis()
    if input_shape.dims[channel_axis].value is None:
      raise ValueError('The channel dimension of the inputs '
                       'should be defined. Found `None`.')
    return int(input_shape[channel_axis])

  def _get_padding_op(self):
    if self.padding == 'causal':
      op_padding = 'valid'
    else:
      op_padding = self.padding
    if not isinstance(op_padding, (list, tuple)):
      op_padding = op_padding.upper()
    return op_padding

In [None]:
#@title VFFT-CNN

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Dense, Dropout, Flatten, Activation, BatchNormalization, Add, Attention
from tensorflow.keras.layers import Conv2D, DepthwiseConv2D, MaxPooling2D, AveragePooling2D, GlobalAveragePooling2D, SeparableConv2D  # straturi convolutionale si max-pooling
from tensorflow.keras.optimizers import RMSprop, SGD, Adadelta, Adam, Nadam
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers.schedules import ExponentialDecay
from tensorflow.keras.initializers import RandomNormal, HeNormal, GlorotUniform, GlorotNormal
from tensorflow.keras.regularizers import L2, L1L2

kernel_regularizer=L2(1e-4)
kernel_initializer=GlorotUniform(seed=None)
drop_rate = 0.35  # Best value for CIFAR-100 after tuning in range 0.25 - 0.75!
psiz=4
stri=2

#--------------------------  ------------------------------
# Define a convolutional block with FFTConv2D
def fft_conv_block(inputs, inputs_x, filters, kernel_size, padding, input_shape):
    x = FFTConv2D(filters=filters, kernel_size=(kernel_size, kernel_size), padding=padding, input_shape=input_shape, kernel_initializer=kernel_initializer, kernel_regularizer=kernel_regularizer)(inputs_x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = MaxPooling2D(pool_size=(psiz, psiz),strides=(stri,stri),padding=padding)(x)
    x = Dropout(drop_rate)(x)

    return x, inputs

def fft_block(inputs, filters, kernel_size, padding,input_shape):
    x = FFTConv2D(filters=filters, kernel_size=(kernel_size, kernel_size), padding=padding, input_shape=input_shape, kernel_initializer=kernel_initializer, kernel_regularizer=kernel_regularizer)(inputs)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    return x

def create_v_cnn_fft_model(input_shape, num_classes, flat=1, fil=[20], nl=[1], hid=[], csize=15, padding='same'):
    inputs = Input(shape=input_shape)
    original_inputs = inputs
    x = inputs

    # First macro-layer - connected to input
    if nl[0] > 0:
        x = fft_block(x, fil[0], csize, padding, input_shape)
        for _ in range(nl[0]):
            x = fft_block(x, fil[0], csize, padding, input_shape)
        x, inputs = fft_conv_block(inputs, x, fil[0], csize, padding, input_shape)
    else:
        x, inputs = fft_conv_block(inputs, x, fil[0], csize, padding, input_shape)

    # The remaining macro-layers
    for layer in range(1, len(fil)):
        for _ in range(nl[layer]):
            x = fft_block(x, fil[layer], csize, padding, input_shape)
        x, inputs = fft_conv_block(inputs, x, fil[layer], csize, padding, input_shape)

    # Exit classifier
    if flat == 1:
        x = Flatten()(x)
    elif flat == 0:
        x = GlobalAveragePooling2D()(x)

    for units in hid:
        x = Dense(units, activation='relu')(x)

    outputs = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=original_inputs, outputs=outputs)

    initial_learning_rate = 0.0001
    lr_schedule = ExponentialDecay(
        initial_learning_rate,
        decay_steps=10000,
        decay_rate=0.96,
        staircase=True)

    model.compile(
        optimizer=tf.keras.optimizers.Adam(clipvalue=1.0, learning_rate=initial_learning_rate),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    return model


In [None]:
#@title Benchmark dataset load

dataset_path = '/mnt/c/local/path'

# For dataset loaded from folder
train_data, validation_data, test_data, target_width, target_height, dataset_name, n_class = dataset(dataset_path, reload = False, width = 32, batch_size = 10, augment = 0, load_from_dir=True)
# For dataset loaded from Tensorflow API
# train_data, validation_data, test_data, target_width, target_height, dataset_name, n_class = dataset('imagenette', dataset_path, channels = 3, augment = 0)

In [None]:
#@title Benchmark

epochs = 500

vfft_cnn = create_v_cnn_fft_model(shape, num_classes=n_class, flat=1, fil=[3,6,10], nl=[1,0,0], hid=[], csize = 32, padding="same")
vfft_cnn.summary()  #  flat=1, fil=[3,6,10], nl=[1,0,0], hid=[], csize = 64 - 91%
train('VFFT-CNN', vfft_cnn, epochs, train_data, validation_data, test_data, dataset_name, dataset_path, batch_size = 64, use_generator_from_dir=True)

In [None]:
#@title Confusion Matrix
import tensorflow as tf
import numpy as np

model_path = '/mnt/c/local/path/model.keras'
model = tf.keras.models.load_model(model_path)

batch_size = 20

labels=(np.dot(test_data[1],np.array(range(n_class)).T)).astype('int16')
pred= model.predict(test_data[0], batch_size=batch_size)
predicted_class_indices=np.argmax(pred,axis=1)
from sklearn.metrics import classification_report, confusion_matrix
C=confusion_matrix(predicted_class_indices,labels)
print (C)
print('Classification Report')
print(classification_report(labels,predicted_class_indices ))


In [None]:
#@title CNN Filters plot

import tensorflow as tf
import matplotlib.pyplot as plt

# Load your trained model
model = tf.keras.models.load_model('/mnt/c/local/path/model.keras')

# Get the convolutional layers
# conv_layers = [layer for layer in model.layers if isinstance(layer,tf.keras.layers.Conv2D)]
conv_layers = [layer for layer in model.layers if isinstance(layer,FFTConv2D)]

# Visualize filters
for i, layer in enumerate(conv_layers):
    filters, biases = layer.get_weights()
    # Normalize filter values to 0-1 so they can be visualized
    filters_min, filters_max = filters.min(), filters.max()
    filters = (filters - filters_min) / (filters_max - filters_min)

    # Plot filters
    n_filters = filters.shape[3]
    for j in range(n_filters):
        plt.subplot(n_filters, len(conv_layers), i + 1)
        plt.imshow(filters[:, :, 0, j], cmap='gray')
        plt.axis('off')
plt.show()