In [1]:
import os
import random
import warnings
import numpy as np
import pandas as pd
import pickle as pkl
import tensorflow as tf
from tensorboard.plugins import projector


In [None]:
from ipynb.fs.full._01_Data_Augmentation import augment

from ipynb.fs.full._02_Train_Val_Test import create_speaker_DF
from ipynb.fs.full._02_Train_Val_Test import choose, choose_train_only

from ipynb.fs.full._03_Data import create_data
from ipynb.fs.full._03_Data  import getAbsoluteCounts

from ipynb.fs.full._04_Model import do_all, do_all_train, do_all_test

from ipynb.fs.full._05_Eval import boxplot
from ipynb.fs.full._05_Eval import speakerFalse
from ipynb.fs.full._05_Eval import confusionMatrix
from ipynb.fs.full._05_Eval import weighted_f1
from ipynb.fs.full._05_Eval import combined_scores
from ipynb.fs.full._05_Eval import plot_dialect_boxplots_test
from ipynb.fs.full._05_Eval import plot_dialect_predictions
from ipynb.fs.full._05_Eval import plot_dialect_predictions_samples

from ipynb.fs.full._06_Compare import boxplot_diff
from ipynb.fs.full._06_Compare import significant
from ipynb.fs.full._06_Compare import raincloud_diff
from ipynb.fs.full._06_Compare import compareConfusionMatrix


## Variables

In [3]:
# general Path of all Audios
general_path = "..."
# name of folder inside general_pah for Audios
name = '...'
data_path = general_path + name
# name of Augmentation
# Keywords: 'best'->SeR and FM, 'shifting_pitch', 'segment_removal', 'background_noise'
# 'segment_swap', 'volume_confusion', 'time_reversing', 'time_stretching', 'speed_confusion', 'time_masking'
# 'frequency_masking', 'frequency_swap', 'frequency_insertion', 'speaker_insertion'
name_aug = ''
data_path_aug = general_path + name_aug
# name of folder inside general_pah for testing Audios
name_test = "not_used"
data_path_test = general_path + name_test
# name of baseline to compare with (there should be a pkl with the name './Results_' + name_base)
name_base = ''


# number of augmented files per original
aug_num = 6
# percentage for augmentation per file
aug_perc = 1.0
# segment length for each augmentation
aug_len = 1.0

# length of one segment
audio_length = 10.0
# what are the maximal seconds for one speaker/dialect (must be a multiple of audio_length or None)
# can only be used with augmentation, bc augmented_files get reduced
max_length_speaker = None #x*audio_length
max_length_dialect = None #y*audio_length

#model path for extracting embeddings
model_path = "C:\\Users\\Lea\\Desktop\\Doktor\\Models\\trillsson4"
# learning rate
lr = 0.0005
# dropout rate
dr = 0.2
# units dense layer
units = 128
# size of one batch for trillsson Model
batch_size_embedding = 10
# size of one batch for CNN
batch_size = 1024
# L1 regularization parameter used in the dense layers
l1 = 0.018
# L2 regularization parameter used in the dense layers
l2 = 0.01
#alpha for LeakyReLU
alpha = 0.1
# maximal number of epochs
max_epochs = 250

# number of individual runs
runs = 250
# pictures of first Epoch
first_pictures = True
# if not None Speaker for Test and Val will not be random
# give a List with List like [['OLALT3', 'HBALT3', 'FLALT2', 'CUXALT4'], ['BLALT1', 'BBALT2', 'CWALT', 'ULALT1']]
# each List represents an dialect and the first half of each sublist the Speaker for Testing and the second half the speaker for Validation
test = None #[['...', '...'], ['...', '...', '...', '...'], ...]
# test with dummy-classificator
dummy = False
# project to TB
tb = True
log_dir = '...'
# Test different Hyperparameters
hyper_test = False
# when True the Model gets trained and weights get saved
train_only = False
# when True the Model makes predictions on Audios in 'data_path_test'
test_only = False
# value between 0 and 1 used to determine the minimum prediction probability 
# redictions below this threshold are considered as 'not properly classified'
classification_threshold = 0.15


In [None]:
# Function to check if a path exists
def check_valid_path(path, path_name):
    if not os.path.isdir(path):
        raise ValueError(f"Invalid path: {path_name} -> {path}")

# Check for test_only
if test_only:
    if train_only or hyper_test or tb or dummy:
        raise ValueError("If test_only is True, then train_only, hyper_test, tb, and dummy must be False.")
    if name_base is not None:
        raise ValueError("If test_only is True, then name_base must be None.")
    if name_aug != '':
        raise ValueError("If test_only is True, then name_aug must be empty.")
    check_valid_path(data_path_test, "data_path_test")

# Check for train_only
if train_only:
    if test_only or hyper_test or dummy:
        raise ValueError("If train_only is True, then test_only, hyper_test, and dummy must be False.")
    if name_base is not None:
        raise ValueError("If train_only is True, then name_base must be None.")
    check_valid_path(data_path, "data_path")

# Check for augmentation
if name_aug:
    if not (0 <= aug_perc <= 1):
        raise ValueError("aug_perc must be between 0 and 1.")
    if not (0 <= aug_len <= audio_length):
        raise ValueError("aug_len must be between 0 and audio_lengh.")
    if aug_num <= 0:
        raise ValueError("aug_num must be greater than 0.")
    if aug_perc * audio_length < aug_len:
        warnings.warn("aug_perc * audio_length is less than aug_len.", UserWarning)

# Check for max_length_speaker and max_length_dialect
if max_length_speaker is not None and max_length_speaker % audio_length != 0:
    raise ValueError("max_length_speaker must be a multiple of audio_length.")
if max_length_dialect is not None and max_length_dialect % audio_length != 0:
    raise ValueError("max_length_dialect must be a multiple of audio_length.")

# Check for hyper_test
if hyper_test:
    if dummy:
        raise ValueError("If hyper_test is True, then dummy must be False.")
    if test is None: warnings.warn("If hyper_test is True, test should not be None", UserWarning)

print("All checks passed successfully.")


## Augmentation

In [4]:
if (name_aug != ''):
    augment(name_aug, data_path, data_path_aug, aug_perc, aug_num, aug_len, audio_length, test)
    

## DF with all Audios

In [5]:
df_speaker = create_speaker_DF(data_path, data_path_aug, name_aug, False)
df_speaker = pd.read_pickle('./All_Files_.pkl')

if test_only:
    df_speaker_test = create_speaker_DF(data_path_test, '', '', True)
    df_speaker_test = pd.read_pickle('./All_Files_test.pkl')
    

In [None]:
df_speaker


In [None]:
if test_only:
    print(df_speaker_test)

## Extract Embeddings

In [None]:
if (not test_only):
    df_learn = create_data(model_path, audio_length, batch_size_embedding, '', None, None)
if (name_aug != ''):
    df_learn_aug = create_data(model_path, audio_length, batch_size_embedding, name_aug, max_length_speaker, max_length_dialect)

if (test_only):
    df_test = create_data(model_path, audio_length, batch_size_embedding, '', None, None, True)
    
df_learn = pd.read_pickle('./Data_.pkl')
if (name_aug != ''):
    df_learn_aug = pd.read_pickle('./Data_' + name_aug + '_aug.pkl')
else:
    df_learn_aug = None
if (test_only):
    df_test = pd.read_pickle('./Data_test.pkl')
    

In [None]:
df_learn

In [None]:
if (name_aug != ''):
    print(df_learn_aug)

In [None]:
if test_only:
    print(df_test)

In [None]:
if (not test_only):
    getAbsoluteCounts(None)
if (name_aug != ''):
    getAbsoluteCounts(name_aug)

### Projection of embeddings to TB

In [11]:
if tb:
    labels1 = np.array(df_learn['dialect'].tolist())
    len_1 = len(labels1)
    labels2 = np.array(df_learn['speaker'].tolist())
    embeddings = np.array(df_learn['trillsson'].values.tolist())
    if (name_aug != ''):
            labels3 = np.array(df_learn_aug['dialect'].tolist())
            labels4 = np.array(df_learn_aug['speaker'].tolist())
            embeddings2 = np.array(df_learn_aug['trillsson'].values.tolist())
            labels1 = np.concatenate((labels1, labels3), axis=None)
            labels2 = np.concatenate((labels2, labels4), axis=None)
            embeddings = np.concatenate((embeddings, embeddings2), axis=0)

    with open(os.path.join(log_dir, 'metadata.tsv'), "w") as metadata:
        for i in range(0, len(labels1)):
            if i >= len_1:
                metadata.write(f'{labels1[i] + "_" + labels2[i] + "_aug"}\n')
            else:
                metadata.write(f'{labels1[i] + "_" + labels2[i]}\n')
        
    embeddings_tensor = tf.Variable(embeddings)

    checkpoint = tf.train.Checkpoint(embedding=embeddings_tensor)
    checkpoint.save(os.path.join(log_dir, "embedding.ckpt"))

    config = projector.ProjectorConfig()
    embedding = config.embeddings.add()
    embedding.tensor_name = "embedding/.ATTRIBUTES/VARIABLE_VALUE"
    embedding.metadata_path = 'metadata.tsv'
    projector.visualize_embeddings(log_dir, config)


## Test

In [12]:
if (not test_only):
    df_learned = pd.DataFrame(columns=['acc_train', 'acc_val', 'acc_test',
                                    'loss_train', 'loss_val', 'loss_test',
                                    'f1_train', 'f1_val', 'f1_test',
                                    'f1_macro_train', 'f1_macro_val', 'f1_macro_test',
                                    'f1_w_train', 'f1_w_val', 'f1_w_test',
                                    'speaker_val', 'speaker_test', 'false_names', 'false_speaker', 'false_segments_begin',
                                    'false_segments_end', 'false_simplified', 'classes_x', 'classes_true', 'y_test_names', 
                                    'y_test_speaker'])


In [None]:
if train_only:
    if test is None:
        test_val = choose_train_only(df_speaker)
        print(test_val)
    else:
        test_val = test
        
    list_row, label_mapping = do_all_train(first_pictures, df_learn, df_learn_aug, test_val, name_aug, lr, dr, units, l1, l2, alpha, batch_size, max_epochs) 
    df_learned.loc[len(df_learned)] = list_row
    
elif test_only:
    predictions, speaker, dialect = do_all_test(df_learn, df_test, lr, dr, units, l1, l2, alpha)
    predictions_df = pd.DataFrame({
        'predictions': [list(pred) for pred in predictions],
        'speaker': speaker,
        'dialect': dialect
    })
    predictions_df.to_csv('predictions.csv', index=False)
    predictions_df.to_pickle('predictions.pkl')
    
else:   
    for i in range(0, runs):
        print('Iteration ', i, ' of ' + str(runs))
    
        if test is None:
            test_val = choose(df_speaker)
            print(test_val)
        else:
            test_val = test

        if (hyper_test):
            lr_test = random.uniform(lr[0], lr[1])
            dr_test = random.uniform(dr[0], dr[1])
            l1_test = random.uniform(l1[0], l1[1])
            l2_test = random.uniform(l2[0], l2[1])
            alpha_test_num = random.randint(0, len(alpha)-1)
            alpha_test = alpha[alpha_test_num]
            units_test_num = random.randint(0, len(units)-1)
            units_test = units[units_test_num]
            batch_size_test_num = random.randint(0, len(batch_size)-1)
            batch_size_test = batch_size[batch_size_test_num]
      
  
        if (hyper_test):
            list_row, label_mapping = do_all(first_pictures, df_learn, df_learn_aug, test_val, name_aug, i, lr_test, dr_test, units_test, l1_test, l2_test, alpha_test, batch_size_test, tb, log_dir, dummy, max_epochs)  
        else:
            list_row, label_mapping = do_all(first_pictures, df_learn, df_learn_aug, test_val, name_aug, i, lr, dr, units, l1, l2, alpha, batch_size, tb, log_dir, dummy, max_epochs)   
        df_learned.loc[len(df_learned)] = list_row
    

In [14]:
if (not test_only): 
    with open('label_mapping.pkl', 'wb') as f:
        pkl.dump(label_mapping, f)
    df_learned.to_pickle('./Results_' + name + '_' + name_aug + '.pkl')
    df_learned.to_csv('./Results_' + name + '_' + name_aug + '.csv',  sep=';')


## Evaluation

In [None]:
with open('label_mapping.pkl', 'rb') as f:
    label_mapping = pkl.load(f)

In [None]:
if (test_only):
    plot_dialect_boxplots_test(predictions_df, df_test, label_mapping)
    plot_dialect_predictions(predictions_df, label_mapping)
    plot_dialect_predictions_samples(predictions_df, label_mapping)
    plot_dialect_predictions_samples(predictions_df, label_mapping, classification_threshold)

In [None]:
if (not test_only and not train_only):
    boxplot('f1_test', name, name_aug)
    boxplot('f1_macro_test', name, name_aug)
    boxplot('f1_w_test', name, name_aug)
    boxplot('acc_test', name, name_aug)

In [None]:
if (not test_only and not train_only):
    weighted_f1(name, name_aug, label_mapping)

In [19]:
if (not test_only and not train_only):
    speakerFalse(name, name_aug)

In [None]:
if (not test_only and not train_only):
    combined_scores(name, name_aug)
    confusionMatrix(name, name_aug, label_mapping, True, False)
    confusionMatrix(name, name_aug, label_mapping, False, False)
    confusionMatrix(name, name_aug, label_mapping, False, True)
    confusionMatrix(name, name_aug + "_combined", label_mapping, True, False)
    confusionMatrix(name, name_aug + "_combined", label_mapping, False, False)


In [21]:
if (name_base is not None):
    compareConfusionMatrix(name, name_aug, name_base, label_mapping, True)
    compareConfusionMatrix(name, name_aug, name_base, label_mapping, False)
    

In [22]:
if (name_base is not None):
    boxplot_diff('f1_w_test', name_base, name, name_aug)
    raincloud_diff('f1_w_test', name_base, name, name_aug)
    

In [None]:
#compares results base model with new model (two-sided)
if (name_base is not None):
    U_base, U_aug, p_U, res_T, var_base, var_aug = significant('./Results_' + name_base + '.pkl', './Results_' + name + '_' + name_aug + '.pkl', 'f1_w_test', 'two-sided')
    print("two-sided U-test: ", f'{p_U:.20f}')
    print("two-sided T-test: ", f'{res_T.pvalue:.20f}')
    

In [None]:
#tests whether results from base model is less (worse) than from new model
if (name_base is not None):
    U_base, U_aug, p_U, res_T, var_base, var_aug = significant('./Results_' + name_base + '.pkl', './Results_' + name + '_' + name_aug + '.pkl', 'f1_w_test', 'less')
    print("one-sided U-test: ", f'{p_U:.20f}')
    print("two-sided T-test: ", f'{res_T.pvalue:.20f}')
    