In [None]:
import os
import numpy as np
from PIL import Image
import tensorflow as tf
import random
import lr_scheduler
import func
from tensorflow.keras import layers, models
from sklearn.preprocessing import StandardScaler


folder_path = "F:/code/barlow/UR5"
file_pairs = func.load_file_pairs(folder_path)

AUTO = tf.data.AUTOTUNE
CROP_TO = 32
SEED = 200

PROJECT_DIM = 2048
BATCH_SIZE = 32
EPOCHS = 50

In [3]:
train_similar, train_dissimilar, test_similar, test_dissimilar = func.split_and_shuffle_pairs(file_pairs, folder_path)

In [4]:
def process_data(data_list):
    a = []
    b = []
    for jpg_path, npy_path in data_list:
        a.append(func.read_jpg_files(jpg_path))
        b.append(func.read_and_parse_npy_file(npy_path))
    return a,b

In [5]:
xtrain1 = process_data(train_similar)
xtrain2 = process_data(train_dissimilar)
xtest1 = process_data(test_similar)
xtest2 = process_data(test_dissimilar)

# label 1 - similar, 0 - disimilar
ytrain1 = [1] * 500
ytrain2 = [0] * 500
ytest1 = [1] *79
ytest2 = [0] *79

In [6]:
#
t1 = np.array([np.expand_dims(item, axis=-1) for item in xtrain1[0]])  # Expand dims
t1 = t1.astype("float32") / 255.0  # Normalize to [0, 1]
scaler = StandardScaler()
t2 = np.array([item for item in xtrain1[1]])  # NPY input
t2 = scaler.fit_transform(t2)  # Standardize to mean=0, std=1
train_ds1 = tf.data.Dataset.from_tensor_slices(((t1, t2), ytrain1))

#
t3 = np.array([np.expand_dims(item, axis=-1) for item in xtrain2[0]])  # Expand dims
t3 = t3.astype("float32") / 255.0  # Normalize to [0, 1]
t4 = np.array([item for item in xtrain2[1]])  # NPY input
t4 = scaler.transform(t4)  # Sử dụng scaler đã fit từ trước
train_ds2 = tf.data.Dataset.from_tensor_slices(((t3, t4), ytrain2))


# concat
train_ds = train_ds1.concatenate(train_ds2)
train_ds = train_ds.shuffle(buffer_size=1000, seed=226)
train_ds = train_ds.batch(32).prefetch(tf.data.AUTOTUNE)


In [7]:
#
t5 = np.array([np.expand_dims(item, axis=-1) for item in xtest1[0]])  # Expand dims
t5 = t5.astype("float32") / 255.0  # Normalize to [0, 1]
t6 = np.array([item for item in xtest1[1]])  # NPY input
t6 = scaler.transform(t6)  # Sử dụng scaler đã fit từ dữ liệu training
test_ds1 = tf.data.Dataset.from_tensor_slices(((t5, t6), ytest1))

#
t7 = np.array([np.expand_dims(item, axis=-1) for item in xtest2[0]])  # Expand dims
t7 = t7.astype("float32") / 255.0  # Normalize to [0, 1]
t8 = np.array([item for item in xtest2[1]])  # NPY input
t8 = scaler.transform(t8)  # Sử dụng scaler đã fit từ dữ liệu training
test_ds2 = tf.data.Dataset.from_tensor_slices(((t7, t8), ytest2))

#
test_ds = test_ds1.concatenate(test_ds2)
test_ds = test_ds.shuffle(buffer_size=1000, seed=226)
test_ds = test_ds.batch(32).prefetch(tf.data.AUTOTUNE)

In [None]:
def concat_encoder():
    # Input đầu vào cho ảnh và npy_file
    image_input = layers.Input(shape=(128, 128, 1), name="image_input")  # Đầu vào ảnh 128x128x1
    npy_input = layers.Input(shape=(8,), name="npy_input")  # Đầu vào npy_file shape (8,)

    # x1: CNN 
    x1 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(image_input)
    x1 = layers.MaxPooling2D((2, 2))(x1)
    x1 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x1)
    x1 = layers.MaxPooling2D((2, 2))(x1)
    x1 = layers.Flatten()(x1)
    x1 = layers.Dense(20, activation='relu', name="cnn_output")(x1)

    # x2: Đọc npy_file với shape (8)
    # x2 = layers.Dense(8, activation='relu', name="npy_output")(npy_input)
    # x3: Kết hợp x1 + x2
    #x3 = layers.Concatenate(name="combined_features")([x1, x2])
    
    # Đọc thẳng npy_file với shape (8)
    x3 = layers.Concatenate(name="combined_features")([x1, npy_input])

    # Fully connected layers
    x = layers.Dense(128, activation="relu", name="dense_128")(x3)
    x = layers.Dense(64, activation="relu", name="dense_64")(x)
    x = layers.Dense(32, activation=None, name="projection_head")(x)  # Projection head

    # Model
    model = models.Model(inputs=[image_input, npy_input], outputs=x, name="encoder")
    return model

In [15]:
# Tạo encoder mới
encoder = concat_encoder()
barlow_twins = func.BarlowTwins(encoder=encoder)


backbone = tf.keras.Model(
    inputs=barlow_twins.encoder.input,
    outputs=barlow_twins.encoder.layers[-1].output  # output từ lớp Dense(32)
)
# Freeze 
# barlow_twins.load_weights('bl_test2.weights.h5')
# backbone.trainable = False

In [16]:
inputs = backbone.input
x = backbone.output
# Thêm lớp classifier phía sau
outputs = layers.Dense(2, activation="relu", name="classifier")(x)

# Xây dựng model mới
test_model = tf.keras.Model(inputs, outputs, name="test_model")

# Hiển thị kiến trúc model
test_model.summary()

In [17]:
# Compile model
test_model.compile(
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"],
    optimizer=tf.keras.optimizers.SGD(learning_rate=0.00005, momentum=0.9)
)

history = test_model.fit(
    train_ds, validation_data=test_ds, epochs=100
)

_, test_acc = test_model.evaluate(test_ds)
print("Test accuracy: {:.2f}%".format(test_acc * 100))

Epoch 1/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 69ms/step - accuracy: 0.5252 - loss: 3.1414 - val_accuracy: 0.5633 - val_loss: 0.7445
Epoch 2/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 68ms/step - accuracy: 0.5010 - loss: 0.7591 - val_accuracy: 0.5506 - val_loss: 0.6955
Epoch 3/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 68ms/step - accuracy: 0.5263 - loss: 0.6955 - val_accuracy: 0.5506 - val_loss: 0.6932
Epoch 4/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 68ms/step - accuracy: 0.4938 - loss: 0.6976 - val_accuracy: 0.5506 - val_loss: 0.6935
Epoch 5/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 70ms/step - accuracy: 0.4867 - loss: 0.6959 - val_accuracy: 0.5506 - val_loss: 0.6948
Epoch 6/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 67ms/step - accuracy: 0.4963 - loss: 0.6925 - val_accuracy: 0.5506 - val_loss: 0.6950
Epoch 7/100
[1m32/32[0m [

In [20]:
# Predict on the test dataset
predictions = test_model.predict(test_ds)

# Extract the true labels from the test dataset
true_labels = np.concatenate([y for x, y in test_ds], axis=0)

# Print predictions and true labels
for i, prediction in enumerate(predictions):
    predicted_label = np.argmax(prediction)  # Get the predicted class index
    print(f"Sample {i}: Predicted Label = {predicted_label}, True Label = {true_labels[i]}")

[1m1/5[0m [32m━━━━[0m[37m━━━━━━━━━━━━━━━━[0m [1m0s[0m 27ms/step

[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
Sample 0: Predicted Label = 0, True Label = 1
Sample 1: Predicted Label = 0, True Label = 1
Sample 2: Predicted Label = 1, True Label = 0
Sample 3: Predicted Label = 0, True Label = 0
Sample 4: Predicted Label = 0, True Label = 0
Sample 5: Predicted Label = 0, True Label = 0
Sample 6: Predicted Label = 0, True Label = 0
Sample 7: Predicted Label = 0, True Label = 1
Sample 8: Predicted Label = 0, True Label = 1
Sample 9: Predicted Label = 0, True Label = 0
Sample 10: Predicted Label = 1, True Label = 1
Sample 11: Predicted Label = 0, True Label = 1
Sample 12: Predicted Label = 0, True Label = 1
Sample 13: Predicted Label = 1, True Label = 0
Sample 14: Predicted Label = 0, True Label = 0
Sample 15: Predicted Label = 0, True Label = 1
Sample 16: Predicted Label = 1, True Label = 0
Sample 17: Predicted Label = 1, True Label = 0
Sample 18: Predicted Label = 0, True Label = 0
Sample 19: Predicted Label = 0, True Label = 