In [None]:
import numpy as np
import pandas as pd
import os
import random
import tensorflow as tf
from CNN.utility import train_routine
from ds_creation.ds_utility import get_file_count, get_other_class
from ds_creation.split_config import phisical_split
from ds_creation.plot_utility import process_metrics
from prototypical.train.train_setup import train



SPLIT_PERC = {'train': 0.8, 'val': 0.2}
TEST_SPLIT = 0.2

DATA_DIR = os.path.join('data', 'mammals_calls', 'data')
TEST_DIR = os.path.join('data', 'mammals_calls_test')
SPLIT_DIR = os.path.join('data', 'mammals_calls', 'splits', 'custom')
AUDIO_DIR = os.path.join('data', 'audio')
MODELS_METRICS_DIR = os.path.join("models_metrics")
CNN_CACHE_DIR = os.path.join("data_cache", "CNN")

PATIENCE = 5
TO_TRAIN = False
FROM_START = True
os.makedirs(MODELS_METRICS_DIR, exist_ok=True)
os.makedirs(SPLIT_DIR, exist_ok=True)
    
h = 164
w = 397

seed = 2025
random.seed(seed)
np.random.seed(seed)
tf.random.set_seed(seed)

# Eseguo lo split fisico dei file solo la prima volta per dividere i file in train e test
FIRST_RUN = False
if FIRST_RUN:
    perc = 1-TEST_SPLIT
    phisical_split(DATA_DIR, perc)

Vengono divise le specie in tre gruppi:
* Quelle per il training della CNN, hanno più di 1000 sample, le restanti classi formeranno la classe other
* Quelle per il training della Prototypical, hanno tra i 1000 ed i 100 sample
* Quelle per il test della Prototypical, hanno meno di 100 sample 

Così vengono creati i file che serviranno alla prototypical per splittare le classi tra train, validation e test

In [None]:
count_df = get_file_count(DATA_DIR)

CNN_training = count_df[count_df['file_count'] >= 1000]['species'].tolist()
proto_training = count_df[(count_df['file_count'] <= 1000) & (count_df['file_count'] >= 100)]['species'].tolist()
proto_test = count_df[count_df['file_count'] < 100]['species'].tolist()

random.shuffle(proto_training)
split_idx = int(len(proto_training) * SPLIT_PERC['train'])
proto_train = proto_training[:split_idx]
proto_val = proto_training[split_idx:]
print(len(proto_train), len(proto_val), len(proto_test))
with open(os.path.join(SPLIT_DIR, 'test.txt'), 'w') as f:
    for species in proto_test:
        f.write(f"{species}\n")
with open(os.path.join(SPLIT_DIR, 'train.txt'), 'w') as f:
    for species in proto_train:
        f.write(f"{species}\n")
with open(os.path.join(SPLIT_DIR, 'val.txt'), 'w') as f:
    for species in proto_val:
        f.write(f"{species}\n")



Viene fisicamente generata la classe other, contenente i files di tutte le altre classi che hanno meno di 1000 sample.
Viene quindi avviato il training della CNN su queste classi con uno split train/val 80/20 per stabilire il numero di epoche ottimale per l'addestramento.

In [None]:
count_df_truncated = get_file_count(DATA_DIR)
count_df_truncated = count_df_truncated[count_df_truncated['file_count'] < 999]

other_species_list = count_df_truncated['species'].tolist()
print(f'Other total files: {count_df_truncated["file_count"].sum()}, species count: {count_df_truncated.shape}, species: {other_species_list}')
get_other_class(DATA_DIR, other_species_list)

count_df = get_file_count(DATA_DIR)

In [None]:
other_ds, history = train_routine(count_df, PATIENCE, SPLIT_PERC, DATA_DIR, (w, h), (0, 0), cardinality=1000, subfolder='14-10_training', from_start=FROM_START, to_train=True)

In [None]:
best_epoch = np.argmax(history.history['val_accuracy']) + 1
print(f'Best epoch: {best_epoch}, val_accuracy: {history.history["val_accuracy"][best_epoch-1]}, accuracy: {history.history["accuracy"][best_epoch-1]}')

In [None]:
curr_training_date = '14-10'
process_metrics(count_df, 13, os.path.join(MODELS_METRICS_DIR, f'{curr_training_date}_training'), MODELS_METRICS_DIR)

In [None]:
config = {
    "data.dataset": "mammals_calls",
    "data.split": "custom",
    "data.train_way": 4,
    "data.train_support": 2,
    "data.train_query": 3,
    "data.test_way": 4,
    "data.test_support": 2,
    "data.test_query": 3,
    "data.episodes": 10,
    "data.gpu": 0,
    "data.cuda":True,
    "model.x_dim": "164,397,3",
    "model.z_dim": 64,
    "train.epochs": 50,
    'train.optim_method': "Adam",
    "train.lr": 0.001,
    "train.patience": 5,
    "model.save_path": 'data_cache/proto/test_mammals_calls.keras'
}

train(config)

In [None]:

def load_and_preprocess_image(img_path, sizes):
    """
    Load and return preprocessed image.
    Args:
        img_path (str): path to the image on disk.
    Returns (Tensor): preprocessed image
    """
    w, h, _ = sizes
    image = tf.io.read_file(img_path)
    image = tf.image.decode_png(image, channels=3)
    image = tf.image.resize(image, [h, w])
    image = tf.cast(image, tf.float32) / 255.0
    return image.numpy()


def embedding(support, w_h_c, model_dir):
    w, h, c = w_h_c
    n_support = len(support)
    print(n_support)
    # merge support and query to forward through encoder
    model = tf.keras.models.load_model(model_dir)
    z = []
    for cat in support:
        cat = tf.reshape(cat, [1, w, h, c])
        z.append(model(cat))
    z = tf.concat(z, axis=0)
    # Prototypes are means of n_support examples
    print(z.shape)
    print(type(z))
    z_prototypes = tf.math.reduce_mean(z, axis=1)
    return z_prototypes

def get_samples(classes, n_support_dict, w_h_c, model_dir):
    embedding_dict = {}
    for curr_class in classes:
        n_support = n_support_dict[curr_class]
        main_dir = os.path.join(DATA_DIR, curr_class)
        files = os.listdir(main_dir)
        selected_files = random.sample(files, n_support)
        class_embeddings = []
        for i_img in range(n_support):
            curr_img = os.path.join(main_dir, selected_files[i_img])
            class_embeddings.append(load_and_preprocess_image(curr_img, w_h_c))
        embedding_dict[curr_class] = class_embeddings
    embedding_df = pd.DataFrame(list(embedding_dict.items()), columns=['class', 'embeddings'])
    embedding_df['embeddings'] = embedding_df['embeddings'].apply(lambda x: embedding(x, w_h_c, model_dir))  

    return embedding_df
classes = proto_val + proto_train
n_support_dict = {curr_class: len(os.listdir(os.path.join(DATA_DIR, curr_class))) for curr_class in classes}
results = get_samples(classes, n_support_dict, (w, h, 3), os.path.join('data_cache', 'proto', 'test_mammals_calls.keras'))
print(results)


In [None]:
def calc_euclidian_dists(x, y):
    """
    Calculate euclidian distance between two 3D tensors.

    Args:
        x (tf.Tensor):
        y (tf.Tensor):

    Returns (tf.Tensor): 2-dim tensor with distances.

    """
    n = x.shape[0]
    m = y.shape[0]
    x = tf.tile(tf.expand_dims(x, 1), [1, m, 1])
    y = tf.tile(tf.expand_dims(y, 0), [n, 1, 1])
    return tf.reduce_mean(tf.math.pow(x - y, 2), 2)

