In [2]:
# =======================
# COMMON FUNCTIONS
# =======================
import numpy as np
import tarfile

def loadTGZ(tgz, fPath, dtype, skip_header=False):
    """
    Load a CSV file from the tarfile 'tgz' given its file path 'fPath'.  
    Optionally skip the header row (if skip_header=True). 
    Reshape the data into 256*256 and cast to the specified dtype.
    """
    with tgz.extractfile(fPath) as csvfile:
        data = np.genfromtxt(csvfile, delimiter=",", skip_header=1 if skip_header else 0)
        if data.size != 256*256:
            raise ValueError(f"Data size mismatch in {fPath}: expected {256*256}, got {data.size}")
        return data.reshape(256*256).astype(dtype)

def removeMaskNaN(outSet):
    """Remove entries with NaN in any file and entries with mask != 0."""
    aMask = (outSet["mask"] == 0)
    for k in outSet.keys():
        aMask &= ~np.isnan(outSet[k])
    for k in outSet.keys():
        outSet[k] = outSet[k][aMask]
    return outSet

def normSet(outSet):
    """
    Normalize the variables in a consistent way (using a sensor-specific offset)
    to allow comparisons between sensors.
    """
    offset = np.mean(outSet["tMean0"]).astype(np.float16)
    outSet["tMean0"] = (outSet["tMean0"] - offset) / 75.
    outSet["tMeanF"] = (outSet["tMeanF"] - (offset + 150)) / 75.
    outSet["tWidth0"] = (outSet["tWidth0"] - 10) / 7.5
    outSet["tWidthF"] = (outSet["tWidthF"] - 10) / 7.5
    return outSet

# =======================
# CELL 1: OLD DATA LOADING
# =======================
tgzFileName = "ASideFirstModules.tgz"

# Define module training list for old data
moduleFiles = [
    "equalisation_M98_A1",
    "equalisation_M94_A2",
    "equalisation_M96_A3",
    "equalisation_M59_A4",
    "equalisation_N22_A5",
    "equalisation_M116_A6",
    "equalisation_N030_A7",
    "equalisation_M90_A8",
    "equalisation_N029_A9",
    "equalisation_N013_A10"
]

# Define training set (to be excluded from evaluation)
trainingSet = [
    ("equalisation_N030_A7","VP0-1"), #offset = 2
    ("equalisation_M59_A4","VP0-0"), #offset = 1
    ("equalisation_M98_A1","VP0-1"), #offset = 0
    ("equalisation_N030_A7","VP3-2"), #offset = -1
    ("equalisation_M98_A1","VP2-0") #offset = -2
]

vpList = [f"VP{i}-{j}" for i in range(4) for j in range(3)]
evaluationSet = [(f, vp) for f in moduleFiles for vp in vpList]
evaluationSet = [s for s in evaluationSet if s not in trainingSet]

print(f"Old Data:\n Training set: {trainingSet}\n {len(evaluationSet)} evaluation sets to test.")
print("First:", evaluationSet[0], "Last:", evaluationSet[-1])

def extractFromTGZ(tgzName, dSet):
    """
    Extract the old data files for a given dataset dSet.
    dSet is a tuple: (module_name, vp), e.g. ("equalisation_M98_A1", "VP0-1").
    """
    path = dSet[0] + "/"
    outSet = {}
    with tarfile.open(tgzName, 'r:gz') as tgz:
        outSet["tMean0"] = loadTGZ(tgz, path + f"Module0_{dSet[1]}_Trim0_Noise_Mean.csv", np.float16)
        outSet["tMeanF"] = loadTGZ(tgz, path + f"Module0_{dSet[1]}_TrimF_Noise_Mean.csv", np.float16)
        outSet["tWidth0"] = loadTGZ(tgz, path + f"Module0_{dSet[1]}_Trim0_Noise_Width.csv", np.float16)
        outSet["tWidthF"] = loadTGZ(tgz, path + f"Module0_{dSet[1]}_TrimF_Noise_Width.csv", np.float16)
        outSet["mask"]    = loadTGZ(tgz, path + f"Module0_{dSet[1]}_Matrix_Mask.csv", np.float16)
        outSet["trim"]    = loadTGZ(tgz, path + f"Module0_{dSet[1]}_Matrix_Trim.csv", np.int8)
    return removeMaskNaN(outSet)


Old Data:
 Training set: [('equalisation_N030_A7', 'VP0-1'), ('equalisation_M59_A4', 'VP0-0'), ('equalisation_M98_A1', 'VP0-1'), ('equalisation_N030_A7', 'VP3-2'), ('equalisation_M98_A1', 'VP2-0')]
 115 evaluation sets to test.
First: ('equalisation_M98_A1', 'VP0-0') Last: ('equalisation_N013_A10', 'VP3-2')


In [3]:
from matplotlib import pyplot as plt
# Keras network used as front end to Tensorflow
# See https://keras.io/ for details of the package
# import Keras overall
import keras
# input normalization layer [fixes input variables to useful range]
from keras.layers import BatchNormalization
# a single NN layer of type "Dense" i.e. all inputs connected to all outputs
from keras.layers import Dense
# The input layer, takes x and starts NN processing
from keras.layers import Input
# Keras functional methods for defining a NN model
from keras.models import Model
import time  # Import time module
import keras_tuner as kt
from keras.saving import register_keras_serializable

@register_keras_serializable()
def adjacent_accuracy(y_true, y_pred):
    """
    Custom accuracy metric that accepts predictions within ±1 of the true class.
    Args:
        y_true: True labels (integer values).
        y_pred: Predicted probabilities for each class (softmax output).
    Returns:
        Accuracy metric allowing adjacent values.
    """
    import tensorflow.keras.backend as K

    # Get the predicted class (argmax of softmax output)
    y_pred_classes = K.argmax(y_pred, axis=-1)

    # Ensure both tensors are of the same type
    y_true = K.cast(y_true, dtype='float32')  # Cast y_true to float32
    y_pred_classes = K.cast(y_pred_classes, dtype='float32')  # Cast y_pred_classes to float32 

    # Check if predictions are within ±1 of the true labels
    correct = K.abs(y_true - y_pred_classes) <= 1

    # Calculate mean accuracy
    return K.mean(K.cast(correct, dtype='float32'))

# Define training data set, slot post then ASIC
custom_objects = {"adjacent_accuracy": adjacent_accuracy}
loadFile = f"NN_{trainingSet[0][0]}_{trainingSet[0][1]}.keras"
loadModel = keras.saving.load_model(loadFile, custom_objects=custom_objects)
%timeit loadModel = keras.saving.load_model(loadFile, custom_objects=custom_objects)

import random

random_eval_sets = random.sample(evaluationSet, 1)  #select random datasets
eval_set = random_eval_sets[0]

# Extract and process the evaluation set
dSet = extractFromTGZ(tgzFileName, eval_set)
dSet = removeMaskNaN(dSet)
dSet = normSet(dSet)

# Create the input feature matrix and target vector
X = np.column_stack([dSet["tMean0"], dSet["tMeanF"],
                     dSet["tWidth0"], dSet["tWidthF"]])
y = dSet["trim"]

import tensorflow as tf

#XT = tf.convert_to_tensor(X)
#tf.compat.v1.disable_eager_execution()

# Now, measure evaluation time using %timeit:
##%timeit loadModel.evaluate(X, y, verbose=0)

# Or if you want to time prediction:
%timeit loadModel.predict(XT, verbose=0)


57.1 ms ± 3.74 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


NameError: name 'XT' is not defined

In [9]:
print(type(X))

<class 'numpy.ndarray'>
