In [None]:
# Imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.stats as stats
from scipy.signal import savgol_filter
from sklearn.model_selection import train_test_split, StratifiedKFold, StratifiedShuffleSplit
from sklearn.preprocessing import LabelEncoder, LabelBinarizer
from sklearn.metrics import r2_score, confusion_matrix, accuracy_score, precision_score, recall_score, f1_score, roc_curve, auc, RocCurveDisplay, classification_report
from sklearn.utils import compute_class_weight
from sklearn.decomposition import TruncatedSVD
from sklearn.cross_decomposition import PLSRegression
from tensorflow.keras.utils import plot_model
import tensorflow_model_optimization as tfmot
import tensorflow as tf
import tempfile
from tensorflow import keras
from tensorflow_model_optimization.python.core.keras.compat import keras # Note: needed for quantization
import joblib
import pathlib
import os
from data_parser import parse_json_data, process_path, parse_label

tf.config.run_functions_eagerly(True)
tf.data.experimental.enable_debug_mode()

In [None]:
# Import dataset
data_path = "../data/"
window_size = 30
poly_order = 3
test_size_split = 0.20
validation_size_split = 0.20

all_features = []
all_labels = []
pes_features = []
wool_features = []
cotton_features = []

#y_smooth = savgol_filter(pes.loc[0], window_size, poly_order)
# Apply savgol while appending data features
files = process_path(data_path)
for file_path in files:
    file_label = parse_label(file_path)
    file_features = parse_json_data(file_path)
    #file_features = savgol_filter(file_features, window_size, poly_order)
    if (file_label == "pes"):
        file_label = [1, 0, 0]
        pes_features.append(file_features)
    elif (file_label == "puuvilla"):
        file_label = [0, 1, 0]
        cotton_features.append(file_features)
    elif (file_label == "villa"):
        wool_features.append(file_features)
        file_label = [0, 0, 1]
    
    all_labels.append(file_label)
    all_features.append(file_features)
# Now we continue with calculating splits so we must use the smallest sample count list for equal splits max
if len(pes_features) > len(cotton_features) & len(pes_features) > len(wool_features):
    max_size = len(pes_features)
elif len(cotton_features) > len(pes_features) & len(cotton_features) > len(wool_features):
    max_size = len(cotton_features)
else:
    max_size = len(wool_features)
print(f"Max Size for equal distribution {max_size}")

# Here we split dataset to equal distribution where rounded to nearest 10 scans / sample
# and where length of the training set can be divisible to nearest 10 scans for test split
feature_samples_count = max_size - (int)(max_size*validation_size_split)
while feature_samples_count % 10 != 0:
    feature_samples_count -= 1
print("Nearest 10 scans count for all features after validation split")
print(feature_samples_count)

# For train test split we must calculate nearest percentage where split results divisible by 10
# By default we decrease test split size by 0.001 per search step
all_eq_max = feature_samples_count * 3 # for 3 categories max amnt
nearest_split_count = int(all_eq_max * test_size_split)
print(f"Current train-test split {int(nearest_split_count)} with {test_size_split}")
current_perc = test_size_split
while nearest_split_count % 10 != 0:
    if current_perc < 0.01: #end search
        break
    current_perc -= 0.001
    nearest_split_count = int(all_eq_max * current_perc)
# Now we can either use current_perc or nearest_split_count thats calculated for 10 divisble train splits
# Note: Passing current_perc into train test split does NOT ALWAYS correspond to calculated nearest_split_count
# maybe due to floating point rounding in their implementation.
print(f"Nearest test size: {nearest_split_count} samples - percentage of features {current_perc}")
test_size_split = current_perc

# 10 divisible validation samples
validation_samples_count = max_size - feature_samples_count
while validation_samples_count % 10 != 0:
    validation_samples_count -= 1
print("Nearest 10 scans count for validation split")
print(validation_samples_count)

pes = pd.DataFrame(pes_features[:feature_samples_count])
cotton = pd.DataFrame(cotton_features[:feature_samples_count])
wool = pd.DataFrame(wool_features[:feature_samples_count])
# Combine train-test features
all_eq_features = pd.concat([pes, cotton, wool], axis=0).reset_index(drop=True)

last = feature_samples_count + validation_samples_count
pes_val = pd.DataFrame(pes_features[feature_samples_count:last])
cotton_val = pd.DataFrame(cotton_features[feature_samples_count:last])
wool_val = pd.DataFrame(wool_features[feature_samples_count:last])
# Combine validation dataset
all_val_features = pd.concat([pes_val, cotton_val, wool_val], axis=0).reset_index(drop=True)

# One-hot encode labels, this will be our output layer format for softmax true
fabric_types = ["pes", "puuvilla", "villa"]
label_array = np.array([[1, 0, 0]] * feature_samples_count 
                       + [[0, 1, 0]] * feature_samples_count 
                       + [[0, 0, 1]] * feature_samples_count
                      )
labels_eq = pd.DataFrame(label_array, columns=fabric_types)

y_val_labels = np.array([[1, 0, 0]] * validation_samples_count 
                       + [[0, 1, 0]] * validation_samples_count
                       + [[0, 0, 1]] * validation_samples_count
                      )
val_labels_eq = pd.DataFrame(y_val_labels, columns=fabric_types)
val_labels_int = np.argmax(val_labels_eq, axis=1)

# Full dataset in dataframes for conveniance
features = pd.DataFrame(all_features)
labels = pd.DataFrame(all_labels)


In [None]:
y_labels = labels_eq.idxmax(axis=1)

# Split data to train and test sets
X_train, X_test, y_train, y_test = train_test_split(
    all_eq_features, 
    labels_eq, 
    test_size=nearest_split_count,
    random_state=42,
    stratify=y_labels
)

y_train_int = np.argmax(y_train, axis=1)
y_test_int = np.argmax(y_test, axis=1)

def plot_class_distribution(y_data, label="Dataset"):
    class_counts = pd.Series(y_data).value_counts()
    class_counts.plot(kind='bar')
    plt.title(f"Class Distribution in {label}")
    plt.xlabel("Class")
    plt.ylabel("Count")
    plt.show()

# Convert one-hot encoded labels back to categorical labels
y_train_labels = y_train.idxmax(axis=1)
y_test_labels = y_test.idxmax(axis=1)
# Plot for train and test sets
print(len(y_train_labels))
print(len(y_test_labels))
plot_class_distribution(y_train_labels, "Training Set")
plot_class_distribution(y_test_labels, "Testing Set")


In [None]:
# Define max total epochs
total_epochs = 20

# Build Model
model = keras.models.Sequential([
    keras.Input(shape=(512,)),
    keras.layers.Dense(256, activation='relu'),
    keras.layers.Dense(128, activation='relu'),
    keras.layers.Dense(64, activation='relu'),
    keras.layers.Dense(32, activation='relu'),
    keras.layers.Dense(3, activation='softmax')
])

# Early stopping after loss function does not improve in 3 consecutive epochs
callback = keras.callbacks.EarlyStopping(monitor='loss', patience=3)

model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.fit(
    X_train, 
    y_train_int,
    validation_data=(all_val_features, val_labels_int),
    epochs=total_epochs,
    callbacks=[callback],
    batch_size=10, # Maybe the batch size isnt crucial here afterall, should research gradient optimization...
)

model.evaluate(X_test, y_test_int, verbose=0)

model.summary()

In [None]:
y_val_pred = model.predict(all_val_features)
y_val_pred_classes = np.argmax(y_val_pred, axis=1)
print(classification_report(val_labels_int, y_val_pred_classes))

In [None]:
# NOTE: Right click on cell after exec -> Disable scrolling for outputs

# Helper function to make statistics and analytics from both Test and Validation datasets
def analyze_model_performance(true_labels, predictions, predicted_labels, dataset_name):
    print(f"{dataset_name} Dataset")

    # Confusion Matrix
    raw_cm = confusion_matrix(true_labels, predicted_labels)
    normalized_cm = confusion_matrix(true_labels, predicted_labels, normalize='all')

    labels = []
    num_classes = raw_cm.shape[0]
    for i in range(num_classes):
        for j in range(num_classes):
            labels.append(f"{raw_cm[i, j]} ({normalized_cm[i, j]:.2f})")
    labels = np.array(labels).reshape(raw_cm.shape)

    # Plotting the confusion matrix
    plt.figure(figsize=(8, 6))
    plt.imshow(normalized_cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.colorbar()

    # Adding annotations for raw counts and proportions
    for i in range(num_classes):
        for j in range(num_classes):
            # Annotate raw count and normalized value
            text = f"{raw_cm[i, j]}\n({normalized_cm[i, j]:.2f})"
            plt.text(
                j, i, text,
                ha="center", va="center", fontsize=10,
                bbox=dict(boxstyle="round", facecolor='white', edgecolor='0.3')
            )

    # Labels and title
    plt.title(f"Confusion Matrix Against {dataset_name}", fontsize=16)
    plt.xlabel("Predicted Label", fontsize=12)
    plt.ylabel("True Label", fontsize=12)
    plt.xticks(np.arange(num_classes), labels=np.arange(num_classes))
    plt.yticks(np.arange(num_classes), labels=np.arange(num_classes))
    plt.show()

    # ROC
    label_binarizer = LabelBinarizer()
    all_possible_labels = np.unique(np.concatenate((true_labels, predicted_labels)))
    label_binarizer.fit(all_possible_labels)

    y_onehot = label_binarizer.transform(true_labels)

    plt.figure(figsize=(8, 6))
    # Iterate through the classes the binarizer was fitted on
    for class_idx, class_label in enumerate(label_binarizer.classes_):
        # Check if the class is present in the true labels for this dataset
        if class_label in all_possible_labels:
            fpr, tpr, thresholds = roc_curve(y_onehot[:, class_idx], predictions[:, class_idx])
            roc_auc = auc(fpr, tpr)
            display = RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=roc_auc,
                                      estimator_name=f"Class {class_label}")
            display.plot(ax=plt.gca())
    plt.title(f"ROC Curves for {dataset_name} Dataset", fontsize=16)
    plt.xlabel("False Positive Rate", fontsize=12)
    plt.ylabel("True Positive Rate", fontsize=12)
    plt.show()


    # Metrics
    print(f"Metrics for {dataset_name} Dataset:")
    # Calculate metrics for each class present in the true labels
    for i in np.unique(true_labels):
        # Use true_labels == i and predicted_labels == i for binary classification per class
        precision = precision_score(true_labels == i, predicted_labels == i, zero_division=0)
        recall = recall_score(true_labels == i, predicted_labels == i, zero_division=0)
        f1 = f1_score(true_labels == i, predicted_labels == i, zero_division=0)
        print(f"Metrics for Class {i}:")
        print(f"  Precision: {precision:.4f}")
        print(f"  Recall: {recall:.4f}")
        print(f"  F1-Score: {f1:.4f}")
        print("\n")

# Analytics against Test dataset
test_predictions = model.predict(X_test)
test_predicted_labels_keras = np.argmax(test_predictions, axis=1)
analyze_model_performance(y_test_int, test_predictions, test_predicted_labels_keras, "Test")

# Analytics against Validation dataset
val_predictions = model.predict(all_val_features)
val_predicted_labels_keras = np.argmax(val_predictions, axis=1)
analyze_model_performance(val_labels_int, val_predictions, val_predicted_labels_keras, "Validation")


In [None]:
# Source 1: https://www.tensorflow.org/model_optimization/guide/quantization/training_example
# Source 2: https://www.tensorflow.org/model_optimization/guide/quantization/post_training

# Now we quantize the model weights to reduce its size and prepare it for TFLite conversion
quantize_model = tfmot.quantization.keras.quantize_model
q_aware_model = quantize_model(model)

# `quantize_model` requires a recompile.
q_aware_model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# q_aware_model.summary()

# After quantization we must fit to regain accuracy
# Optional approach to this is to quantize model pre-training but it is kept like this to display accuracy difference
q_aware_model.fit(X_train, y_train_int, batch_size=10, epochs=1, validation_data=(all_val_features, val_labels_int), verbose=0)

_, baseline_model_accuracy = model.evaluate(
    X_test, y_test_int, verbose=0)

_, q_aware_model_accuracy = q_aware_model.evaluate(
   X_test, y_test_int, verbose=0)

print('Baseline test accuracy:', baseline_model_accuracy)
print('Quant test accuracy:', q_aware_model_accuracy)

# Create a TFLite model from the quantization aware model that has been fine-tuned
converter = tf.lite.TFLiteConverter.from_keras_model(q_aware_model)
# According to Tensorflow, 8-bit integer weights are recommended for CPU Execution which is our case
converter.optimizations = [tf.lite.Optimize.DEFAULT]
quantized_tflite_model = converter.convert()

# Helper function for getting accuracy of the TFLite model against X_test
def evaluate_model(interpreter):
    input_index = interpreter.get_input_details()[0]["index"]
    output_index = interpreter.get_output_details()[0]["index"]

    # Run predictions on every measurement in the "test" dataset.
    prediction_digits = []
    for i in range(len(X_test)):
        test_data = X_test.iloc[i].values
        test_data = np.expand_dims(test_data, axis=0).astype(np.float32)
        interpreter.set_tensor(input_index, test_data)
        interpreter.invoke()
        
        output = interpreter.tensor(output_index)
        digit = np.argmax(output()[0])
        prediction_digits.append(digit)

    # Compare prediction results with ground truth labels to calculate accuracy.
    accuracy = (prediction_digits == y_test_int).mean()
    return accuracy

interpreter = tf.lite.Interpreter(model_content=quantized_tflite_model)
interpreter.allocate_tensors()

test_accuracy = evaluate_model(interpreter)

print('Quant TFLite test_accuracy:', test_accuracy)
print('Quant TF test accuracy:', q_aware_model_accuracy)

# Create TFLite model from the original non-quantized model for comparison
float_converter = tf.lite.TFLiteConverter.from_keras_model(model)
float_tflite_model = float_converter.convert()

# Measure sizes of models.
_, float_file = tempfile.mkstemp('.tflite')
_, quant_file = tempfile.mkstemp('.tflite')

with open(quant_file, 'wb') as f:
  f.write(quantized_tflite_model)

with open(float_file, 'wb') as f:
  f.write(float_tflite_model)

print("Float model in Mb:", os.path.getsize(float_file) / float(2**20))
print("Quantized model in Mb:", os.path.getsize(quant_file) / float(2**20))

# Save the quantized tflite model into models dir
quant_model_path = "../models/model.tflite"
# Write quantized model to file
with open(quant_model_path, 'wb') as f:
    f.write(quantized_tflite_model)


In [None]:
# Usage example:
def predict_textile(feature_list: list[float]):
    # Encode fabric_type labels with integers
    label_encoder = LabelEncoder()
    labels['fabric_type_encoded'] = label_encoder.fit_transform(labels['fabric_type'])
    # Prepare features
    features = np.expand_dims(feature_list, axis=0)
    predictions = model.predict(features, verbose=0)
    flattened = np.argmax(predictions, axis=1)
    fabric_types = label_encoder.inverse_transform(flattened)
    return predictions, fabric_types

def run_validations():
    # Import dataset
    all_features = []
    all_labels = []
    data_path = "../data/"
    files = process_path(data_path)
    for file_path in files:
        file_label = parse_label(file_path)
        file_features = parse_json_data(file_path)
        all_labels.append(file_label)
        all_features.append(file_features)

    num_vals = len(all_features)
    i = 0
    pes_incorrect = 0
    cotton_incorrect = 0
    wool_incorrect = 0
    while i < num_vals-1:
        predictions, fabric_types = predict_textile(all_features[i])
        print(f"Validating {all_labels[i]} - Out: {fabric_types[0]}\n")
        if not all_labels[i] == fabric_types[0]:
            if all_labels[i] == "pes":
                pes_incorrect += 1
            elif all_labels[i] == "puuvilla":
                cotton_incorrect += 1
            elif all_labels[i] == "villa":
                wool_incorrect += 1
        i += 1

    print(f"Results:\n Pes Incorrect: {pes_incorrect}\n Cotton Incorrect: {cotton_incorrect}\n Wool Incorrect: {wool_incorrect}")
    return

# Uncomment to run model against all samples in the dataset
# Results return the amount of incorrect categorial predicts
# run_validations()