In [None]:
import os

import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import models
import keras_tuner as kt
from keras_tuner.tuners import RandomSearch, Hyperband, BayesianOptimization

import numpy as np

import preprocessing as pp
import machine_learning as ml

In [None]:
def get_model(hp):
    """
    This method returns a TensorFlow model. It is a modified duplicate of the one from machine_learning.
    """
    input_shape = 500
    num_classes = 4
    
    filter1 = hp.Int('filter1', 3, 128, step=2)
    filter2 = hp.Int('filter2', 3, 128, step=1)
    filter3 = hp.Int('filter3', 3, 128, step=1)
    filtersize1 = hp.Int('filtersize1', 1, 15, step=2)
    filtersize2 = hp.Int('filtersize2', 1, 15, step=2)
    filtersize3 = hp.Int('filtersize3', 1, 4, step=1)
    poolsize1 = hp.Int('poolsize1', 1, 4, step=1)
    poolsize2 = hp.Int('poolsize2', 1, 4, step=1)
    poolsize3 = hp.Int('poolsize3', 1, 4, step=1)
    gaussian_noise = hp.Float('noise', 0.0, 0.3, step=0.05, default=0.1)
    dropout = hp.Float('dropout', 0.0, 0.5, step=0.1, default=0.2)
    dense_size = hp.Int('dense', 32, 1024, step=16)
    
    dilation_1 = hp.Int('dilation_1', 1, 3, step=1, default=1)
    dilation_2 = hp.Int('dilation_2', 1, 3, step=1, default=2)
    dilation_3 = hp.Int('dilation_3', 1, 3, step=1, default=3)
    
    activation = 'gelu'
    padding='causal'
    
    model = models.Sequential([
        layers.Input(shape=input_shape),
        layers.Reshape((input_shape, 1)),
        
        layers.GaussianNoise(gaussian_noise),

        layers.Conv1D(filter1, filtersize1, activation=activation, padding=padding, dilation_rate=dilation_1),
        #layers.Conv1D(8, hp.Choice('filter1size',values=[2,4,8]), activation=activation),
        layers.MaxPooling1D(pool_size=poolsize1),
        layers.BatchNormalization(),

        #layers.Conv1D(hp.Choice('filter2',values=[16,32,64]), hp.Choice('filter2size',values=[1,2,3]), activation=activation),
        #layers.Conv1D(16,3, activation=activation),
        layers.Conv1D(filter2, filtersize2, activation=activation, padding=padding, dilation_rate=dilation_2),
        layers.MaxPooling1D(pool_size=poolsize2),
        layers.BatchNormalization(),
        
        layers.Conv1D(filter3, filtersize3, activation=activation, padding=padding, dilation_rate=dilation_3),
        layers.MaxPooling1D(pool_size=poolsize3),
        layers.BatchNormalization(),

        layers.SpatialDropout1D(dropout),
        layers.Flatten(),
        layers.Dense(dense_size, activation=activation),

        layers.Dense(num_classes),
    ])

    model.summary()
    
    hp_learning_rate = hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=hp_learning_rate),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['accuracy'],
    )

    return model

In [None]:
debug=False
use_new_data=False
num_test_data_sets = 5

data_directory = os.path.join("..", "DownloadedData", "Rev230meas2.5cm/")

offset_header_rev1 = -200
offset_header_rev2 = 200
if "rev1" in data_directory.lower():
    offset_header = offset_header_rev1
elif "rev2" in data_directory.lower():
    offset_header = offset_header_rev2
else:
    raise NotImplementedError("Specify which offset should be used!")

In [None]:
# Technical details of your experimental setup
technical_details = {
    "header_length": 1,  # Number of symbols used to identify the header
    "clock_freq": int(100e6),  # Clock frequency (Hz) of the sender electronics
    "sample_freq": int(1e10),  # Sample frequency (Hz) used by oscilloscope
    "signal_length": 2_000_002,  # Total number of data points of one measurement
    "steps_to_left": 2,  # start for the cut position in symbols to the left from the header position
    "steps_to_right": 3,  # end for the cut position in symbols to the right from the header position
    "do_normalize_data": True,  # Use if data should be normalized to have zero mean and std 1
}

# Array of the different distances in cm (folder names) used for the measurement
# position = [dirname for dirname in os.listdir(data_directory) 
            # if os.path.isdir(os.path.join(data_directory,dirname))]
position = [dirname for dirname in os.listdir(data_directory) 
                if os.path.isdir(os.path.join(data_directory,dirname))]

print(f"Evaluating datasets: {position}")

actual_dataset_used = position[0]
print(f"{actual_dataset_used=}")

In [None]:
def indices_to_one_hot(data, nb_classes):
    """Convert an iterable of indices to one-hot encoded labels."""
    print(data.dtype)
    return np.eye(nb_classes)[data]

In [None]:
training_data, training_labels, validation_data, validation_labels = pp.get_datasets(
    os.path.join(data_directory, actual_dataset_used), 
    technical_details=technical_details,
    offset_header=offset_header,
    data_augmentation_halflength=1, 
    debug=debug, 
    force_create_npy_files=use_new_data
)

print(training_data.shape)
print(training_labels.shape)
resnet = False
if resnet:
    # reshape to artificially make 2d data, so it can be processed by the HyperResNet
    training_data = training_data.reshape((-1,500,1,1))
    validation_data = validation_data.reshape((-1,500,1,1))

train_ds = ml.prepare_datasets_nosplit(training_data, training_labels, batch_size=128)
val_ds = ml.prepare_datasets_nosplit(validation_data, validation_labels, batch_size=128)

In [None]:
#tuner = kt.RandomSearch(
#tuner = kt.BayesianOptimization(
# kt.applications.HyperXception(input_shape=(500,1,1), classes=4)

tuner = kt.Hyperband(
     kt.applications.HyperXception(input_shape=(500,1), classes=4) if resnet else get_model,
     objective='val_accuracy',
     max_epochs=10,
     hyperband_iterations=3,
     #max_trials=20,
     project_name=f'hyperparam_tuning_rev2_fixed_dilation',
     seed=42)

In [None]:
train = False
if train:
    halt_callback = ml.HaltThresholdCallback(metric='val_accuracy', threshold=.99)

    tuner.search(train_ds, validation_data=val_ds, 
                 shuffle=True,
                # use_multiprocessing=True,
                 #workers=6,
                 epochs=10,
                 callbacks=[
                     tf.keras.callbacks.EarlyStopping(patience=2),
                     # halt_callback
                    ],
                )

In [None]:
best_model = tuner.get_best_models(1)[0]

for j in range(0,num_test_data_sets):
    test_data, test_labels, test_start = pp.load_test_datasets(
                os.path.join(data_directory, actual_dataset_used), technical_details,
                data_index=j, offset_header=offset_header,
            )

    print(ml.test_model(best_model, test_data, test_labels, test_start, technical_details))
    
    if resnet:
        # reshape to artificially make 2d data, so it can be processed by the HyperResNet
        processed_test_data = processed_test_data.reshape((-1,500,1,1))

In [None]:
ml.test_model_all_displacements(best_model, test_data, test_labels, test_start, technical_details=technical_details)

In [None]:
best_hyperparameters = tuner.get_best_hyperparameters(1)[0]
print(best_hyperparameters.values)