In [None]:

!pip install -q tensorflow keras scikit-learn matplotlib seaborn

import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Model
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
import matplotlib.pyplot as plt
import seaborn as sns
import random

print(f"TensorFlow version: {tf.__version__}")
print(f"Keras version: {keras.__version__}")

In [None]:
train_data = np.load("/kaggle/input/facenet-train/vggface2_facenet_train.npz")
val_data   = np.load("/kaggle/input/facenet-val/vggface2_facenet_val.npz")

print("Train persons:", len(train_data.files))
print("Val persons:", len(val_data.files))


In [None]:

import numpy as np
import random

def create_triplet_dataset(embeddings_dict, 
                          num_triplets_per_person=100, 
                          max_neg_people_sample=100,
                          max_neg_emb_per_person=100,
                          margin=1.0,
                          strategy='semi-hard'):
    """
    T·∫°o triplet v·ªõi semi-hard mining nh∆∞ng gi·ªõi h·∫°n t√†i nguy√™n:
    - Ch·ªâ subsample m·ªôt ph·∫ßn nh·ªè negative candidates
    - Gi·∫£m s·ªë l∆∞·ª£ng triplet ƒë·ªÉ tr√°nh OOM
    
    Args:
        embeddings_dict: {person_id: [{'embedding': array}, ...]}
        num_triplets_per_person: s·ªë triplet m·ªói ng∆∞·ªùi (8-12 l√† an to√†n)
        max_neg_people_sample: s·ªë ng∆∞·ªùi kh√°c ƒë∆∞·ª£c xem x√©t (gi·∫£m RAM)
        max_neg_emb_per_person: max embedding l·∫•y t·ª´ m·ªói ng∆∞·ªùi kh√°c
        margin: d√πng cho semi-hard
        strategy: 'semi-hard', 'hard', 'random'
    
    Returns:
        anchors, positives, negatives (numpy arrays)
    """
    print(f"üîÑ Creating low-RAM {strategy} triplets (margin={margin})...")
    print(f"   Settings: {num_triplets_per_person} triplets/person | "
          f"max {max_neg_people_sample} neg people | max {max_neg_emb_per_person} emb/person")
    
    anchors = []
    positives = []
    negatives = []
    
    people = list(embeddings_dict.keys())
    
    for person_id in people:
        person_data = embeddings_dict[person_id]
        if len(person_data) < 2:
            continue
        
        person_embs = np.array([d['embedding'] for d in person_data])
        
        for _ in range(num_triplets_per_person):
            idx_a, idx_p = random.sample(range(len(person_embs)), 2)
            anchor = person_embs[idx_a]
            positive = person_embs[idx_p]
            
            pos_dist = np.linalg.norm(anchor - positive)
            
            other_people = [p for p in people if p != person_id]
            if len(other_people) > max_neg_people_sample:
                other_people = random.sample(other_people, max_neg_people_sample)
            
            neg_candidates = []
            for other_id in other_people:
                other_embs = [d['embedding'] for d in embeddings_dict[other_id]]
                if len(other_embs) > max_neg_emb_per_person:
                    other_embs = random.sample(other_embs, max_neg_emb_per_person)
                neg_candidates.extend(other_embs)
            
            if len(neg_candidates) < 1:
                continue 
            
            neg_candidates = np.array(neg_candidates)
            neg_dists = np.linalg.norm(neg_candidates - anchor[None, :], axis=1)
            if strategy == 'semi-hard':
                valid_mask = (neg_dists > pos_dist) & (neg_dists < pos_dist + margin)
                if np.any(valid_mask):
                    neg_idx = np.random.choice(np.where(valid_mask)[0])
                else:
                    neg_idx = np.argmin(neg_dists)
            elif strategy == 'hard':
                neg_idx = np.argmin(neg_dists)
            else: 
                neg_idx = random.randint(0, len(neg_candidates) - 1)
            
            anchors.append(anchor)
            positives.append(positive)
            negatives.append(neg_candidates[neg_idx])
    
    if len(anchors) == 0:
        raise ValueError("Kh√¥ng t·∫°o ƒë∆∞·ª£c triplet n√†o. Ki·ªÉm tra d·ªØ li·ªáu ho·∫∑c tƒÉng subsample.")
    
    anchors    = np.array(anchors,    dtype=np.float32)
    positives  = np.array(positives,  dtype=np.float32)
    negatives  = np.array(negatives,  dtype=np.float32)
    
    indices = np.random.permutation(len(anchors))
    anchors   = anchors[indices]
    positives = positives[indices]
    negatives = negatives[indices]
    
    print(f"‚úÖ Created {len(anchors):,} triplets (low-RAM mode)")
    print(f"   Shapes ‚Üí Anchor: {anchors.shape} | Pos: {positives.shape} | Neg: {negatives.shape}")
    
    return anchors, positives, negatives


def load_and_prepare_triplets():
    print("üìÇ Loading embeddings...")

    train_npz = np.load('/kaggle/input/facenet-train/vggface2_facenet_train.npz')
    val_npz   = np.load('/kaggle/input/facenet-val/vggface2_facenet_val.npz')

    train_dict = {
        k: [{'embedding': emb} for emb in train_npz[k]]
        for k in train_npz.files
        if train_npz[k].shape[0] >= 2
    }

    val_dict = {
        k: [{'embedding': emb} for emb in val_npz[k]]
        for k in val_npz.files
        if val_npz[k].shape[0] >= 2
    }

    print(f"Train: {len(train_dict)} persons")
    print(f"Val: {len(val_dict)} persons")

    train_anchor, train_pos, train_neg = create_triplet_dataset(
        train_dict,
        num_triplets_per_person=10,
        max_neg_people_sample=30,
        max_neg_emb_per_person=40,
        margin=0.4,
        strategy='semi-hard'
    )

    val_anchor, val_pos, val_neg = create_triplet_dataset(
        val_dict,
        num_triplets_per_person=6,
        max_neg_people_sample=20,
        max_neg_emb_per_person=30,
        margin=0.4,
        strategy='semi-hard'
    )

    return train_anchor, train_pos, train_neg, val_anchor, val_pos, val_neg
train_anchor, train_pos, train_neg, val_anchor, val_pos, val_neg = load_and_prepare_triplets()


üìÇ Loading embeddings...
Train: 480 persons
Val: 60 persons
üîÑ Creating low-RAM semi-hard triplets (margin=0.4)...
   Settings: 10 triplets/person | max 30 neg people | max 40 emb/person
‚úÖ Created 4,800 triplets (low-RAM mode)
   Shapes ‚Üí Anchor: (4800, 512) | Pos: (4800, 512) | Neg: (4800, 512)
üîÑ Creating low-RAM semi-hard triplets (margin=0.4)...
   Settings: 6 triplets/person | max 20 neg people | max 30 emb/person
‚úÖ Created 360 triplets (low-RAM mode)
   Shapes ‚Üí Anchor: (360, 512) | Pos: (360, 512) | Neg: (360, 512)


In [None]:
class TripletLoss(keras.losses.Loss):
    """
    Triplet Loss with margin
    
    Loss = max(0, ||f(anchor) - f(positive)||¬≤ - ||f(anchor) - f(negative)||¬≤ + margin)
    """
    def __init__(self, margin=0.3, **kwargs):
        super().__init__(**kwargs)
        self.margin = margin
    
    def call(self, y_true, y_pred):
        """
        y_pred contains: [anchor_embedding, positive_embedding, negative_embedding]
        """
        anchor, positive, negative = y_pred[:, 0, :], y_pred[:, 1, :], y_pred[:, 2, :]
        
        # Compute distances
        pos_dist = tf.reduce_sum(tf.square(anchor - positive), axis=1)
        neg_dist = tf.reduce_sum(tf.square(anchor - negative), axis=1)
        
        basic_loss = pos_dist - neg_dist + self.margin
        loss = tf.maximum(basic_loss, 0.0)
        
        return tf.reduce_mean(loss)


In [None]:

def build_triplet_network(embedding_dim=512, output_dim=128, margin=0.4):
    def create_embedding_network():
        """M·∫°ng embedding c·∫£i ti·∫øn h∆°n"""
        inputs = layers.Input(shape=(embedding_dim,))
        
        x = layers.Dense(512, activation='relu', kernel_regularizer='l2')(inputs)
        x = layers.BatchNormalization()(x)
        x = layers.Dropout(0.4)(x)
        
        x = layers.Dense(256, activation='relu', kernel_regularizer='l2')(x)
        x = layers.BatchNormalization()(x)
        x = layers.Dropout(0.4)(x)
        
        x = layers.Dense(output_dim, activation=None)(x)
        outputs = layers.Lambda(lambda x: tf.nn.l2_normalize(x, axis=1))(x)
        
        return keras.Model(inputs, outputs, name='embedding_network')
    
    embedding_network = create_embedding_network()
    
    anchor_input = layers.Input(shape=(embedding_dim,), name='anchor')
    positive_input = layers.Input(shape=(embedding_dim,), name='positive')
    negative_input = layers.Input(shape=(embedding_dim,), name='negative')
    
    anchor_emb = embedding_network(anchor_input)
    positive_emb = embedding_network(positive_input)
    negative_emb = embedding_network(negative_input)
    
    merged = layers.Lambda(
        lambda x: tf.stack(x, axis=1),
        name='triplet_embeddings'
    )([anchor_emb, positive_emb, negative_emb])
    
    model = Model(
        inputs=[anchor_input, positive_input, negative_input],
        outputs=merged,
        name='triplet_network'
    )
    
    return model, embedding_network

print("\nüèóÔ∏è Building Triplet Network (margin = 1.0)...")
triplet_model, embedding_network = build_triplet_network(
    embedding_dim=512,
    output_dim=128,
    margin=0.4         
)

triplet_model.summary()

In [None]:

def compile_triplet_model(model, learning_rate=3e-4, margin=1.2):
    optimizer = keras.optimizers.Adam(
        learning_rate=learning_rate,
        weight_decay=1e-5   
    )
    
    model.compile(
        optimizer=optimizer,
        loss=TripletLoss(margin=margin)
    )
    
    print(f"‚úÖ Model compiled | LR={learning_rate} | Margin={margin}")
    return model


# Compile l·∫°i
triplet_model = compile_triplet_model(triplet_model, learning_rate=3e-4, margin=0.4)

In [None]:
class TripletMetricsCallback(keras.callbacks.Callback):
    """Calculate accuracy metrics during training"""
    
    def __init__(self, val_data, threshold=0.5):
        super().__init__()
        self.val_anchor, self.val_pos, self.val_neg = val_data
        self.threshold = threshold
        self.history = {'val_accuracy': [], 'val_pos_dist': [], 'val_neg_dist': []}
    
    def on_epoch_end(self, epoch, logs=None):
        
        embeddings = self.model.predict(
            [self.val_anchor, self.val_pos, self.val_neg],
            verbose=0
        )
        
        anchor_emb = embeddings[:, 0, :]
        pos_emb = embeddings[:, 1, :]
        neg_emb = embeddings[:, 2, :]
        
        pos_dist = np.sum(np.square(anchor_emb - pos_emb), axis=1)
        neg_dist = np.sum(np.square(anchor_emb - neg_emb), axis=1)
        
        accuracy = np.mean(pos_dist < neg_dist)
        
        avg_pos_dist = np.mean(pos_dist)
        avg_neg_dist = np.mean(neg_dist)
        
        self.history['val_accuracy'].append(accuracy)
        self.history['val_pos_dist'].append(avg_pos_dist)
        self.history['val_neg_dist'].append(avg_neg_dist)
        
        print(f"\nüìä Val Accuracy: {accuracy:.4f} | "
              f"Pos Dist: {avg_pos_dist:.4f} | Neg Dist: {avg_neg_dist:.4f}")

metrics_callback = TripletMetricsCallback(
    val_data=(val_anchor, val_pos, val_neg),
    threshold=0.5
)


In [None]:
callbacks = [
    metrics_callback,
    
    EarlyStopping(
        monitor='loss',
        patience=10,
        restore_best_weights=True,
        verbose=1
    ),
    
    ModelCheckpoint(
        'best_triplet_model.keras',
        monitor='loss',
        save_best_only=True,
        mode='min',
        verbose=1
    ),
    
    ReduceLROnPlateau(
        monitor='loss',
        factor=0.5,
        patience=5,
        min_lr=1e-7,
        verbose=1
    )
]

print("‚úÖ Callbacks configured!")


‚úÖ Callbacks configured!


In [None]:
print("\n" + "="*50)
print("üöÄ STARTING TRAINING WITH TRIPLET LOSS")
print("="*50)

dummy_train_labels = np.zeros((len(train_anchor), 3, 128))
dummy_val_labels = np.zeros((len(val_anchor), 3, 128))

history = triplet_model.fit(
    [train_anchor, train_pos, train_neg],
    dummy_train_labels,
    validation_data=([val_anchor, val_pos, val_neg], dummy_val_labels),
    epochs=50,
    batch_size=128,
    callbacks=callbacks,
    verbose=1
)

print("\n‚úÖ Training completed!")


In [None]:
def plot_triplet_training(history, metrics_callback):
    """Plot training metrics"""
    
    fig, axes = plt.subplots(1, 3, figsize=(18, 5))
    
    # Loss
    axes[0].plot(history.history['loss'], label='Train Loss')
    axes[0].plot(history.history['val_loss'], label='Val Loss')
    axes[0].set_title('Triplet Loss Over Epochs')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Loss')
    axes[0].legend()
    axes[0].grid(True)
    
    # Accuracy
    axes[1].plot(metrics_callback.history['val_accuracy'], label='Val Accuracy', color='green')
    axes[1].set_title('Validation Accuracy Over Epochs')
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('Accuracy')
    axes[1].legend()
    axes[1].grid(True)
    
    # Distances
    axes[2].plot(metrics_callback.history['val_pos_dist'], label='Positive Distance', color='blue')
    axes[2].plot(metrics_callback.history['val_neg_dist'], label='Negative Distance', color='red')
    axes[2].set_title('Distances Over Epochs')
    axes[2].set_xlabel('Epoch')
    axes[2].set_ylabel('L2 Distance')
    axes[2].legend()
    axes[2].grid(True)
    
    plt.tight_layout()
    plt.savefig('triplet_training_history.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    print("üìä Training history plot saved!")

plot_triplet_training(history, metrics_callback)


In [None]:
def evaluate_triplet_model(model, val_anchor, val_pos, val_neg):
    """Evaluate triplet model"""
    
    print("\n" + "="*50)
    print("üìä MODEL EVALUATION")
    print("="*50)
    
    # Get embeddings
    embeddings = model.predict(
        [val_anchor, val_pos, val_neg],
        verbose=0
    )
    
    anchor_emb = embeddings[:, 0, :]
    pos_emb = embeddings[:, 1, :]
    neg_emb = embeddings[:, 2, :]
    
    pos_dist = np.sum(np.square(anchor_emb - pos_emb), axis=1)
    neg_dist = np.sum(np.square(anchor_emb - neg_emb), axis=1)
    
    print(f"\nüìè Distance Statistics:")
    print(f"   Positive pairs (same person):")
    print(f"     Mean: {np.mean(pos_dist):.4f} | Std: {np.std(pos_dist):.4f}")
    print(f"     Min: {np.min(pos_dist):.4f} | Max: {np.max(pos_dist):.4f}")
    print(f"\n   Negative pairs (different person):")
    print(f"     Mean: {np.mean(neg_dist):.4f} | Std: {np.std(neg_dist):.4f}")
    print(f"     Min: {np.min(neg_dist):.4f} | Max: {np.max(neg_dist):.4f}")
    
    # Accuracy
    accuracy = np.mean(pos_dist < neg_dist)
    print(f"\n‚úÖ Triplet Accuracy: {accuracy:.4f}")
    print(f"   (% of triplets where positive < negative)")
    
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.hist(pos_dist, bins=50, alpha=0.7, label='Positive', color='blue')
    plt.hist(neg_dist, bins=50, alpha=0.7, label='Negative', color='red')
    plt.xlabel('L2 Distance')
    plt.ylabel('Frequency')
    plt.title('Distance Distributions')
    plt.legend()
    plt.grid(True)
    
    plt.subplot(1, 2, 2)
    plt.scatter(pos_dist, neg_dist, alpha=0.5, s=10)
    plt.plot([0, max(neg_dist)], [0, max(neg_dist)], 'r--', label='y=x')
    plt.xlabel('Positive Distance')
    plt.ylabel('Negative Distance')
    plt.title('Positive vs Negative Distances')
    plt.legend()
    plt.grid(True)
    
    plt.tight_layout()
    plt.savefig('distance_analysis.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    all_dists = np.concatenate([pos_dist, neg_dist])
    all_labels = np.concatenate([np.ones(len(pos_dist)), np.zeros(len(neg_dist))])
    
    thresholds = np.linspace(0, max(all_dists), 100)
    accuracies = []
    
    for thresh in thresholds:
        pred = (all_dists < thresh).astype(int)
        acc = np.mean(pred == all_labels)
        accuracies.append(acc)
    
    optimal_idx = np.argmax(accuracies)
    optimal_threshold = thresholds[optimal_idx]
    optimal_accuracy = accuracies[optimal_idx]
    
    print(f"\nüéØ Optimal Threshold: {optimal_threshold:.4f}")
    print(f"   Accuracy: {optimal_accuracy:.4f}")
    
    return optimal_threshold, pos_dist, neg_dist

optimal_threshold, pos_dist, neg_dist = evaluate_triplet_model(
    triplet_model, val_anchor, val_pos, val_neg
)

from sklearn.metrics import roc_curve, auc

y_true = np.concatenate([np.ones(len(pos_dist)), np.zeros(len(neg_dist))])

similarity_scores = -np.concatenate([pos_dist, neg_dist])

fpr, tpr, thresholds_sim = roc_curve(y_true, similarity_scores)

dist_thresholds = -thresholds_sim

auc_score = auc(fpr, tpr)
print(f"AUC: {auc_score:.4f} (c√†ng g·∫ßn 1 c√†ng t·ªët)")

target_fars = [0.01, 0.001, 0.0001, 0.00001]

print("\nüéØ Threshold theo FAR (tr√™n distance):")
for target in target_fars:
    idx = np.argmin(np.abs(fpr - target))
    if idx < len(dist_thresholds):
        thresh = dist_thresholds[idx]
        actual_far = fpr[idx]
        frr = 1 - tpr[idx]
        print(f"   Target FAR {target*100:.4f}% ‚Üí Actual FAR {actual_far*100:.4f}% | "
              f"Threshold distance = {thresh:.4f} | FRR {frr*100:.4f}%")
    else:
        print(f"   Kh√¥ng ƒë·∫°t ƒë∆∞·ª£c FAR {target*100:.4f}% (max FAR ƒë·∫°t {fpr.max()*100:.2f}%)")

if len(dist_thresholds) > 0:
    strict_idx = np.argmin(fpr)  
    recommended_thresh = dist_thresholds[np.argwhere(fpr <= 0.001)[0][0]] if np.any(fpr <= 0.001) else dist_thresholds[np.argmin(fpr)]
    print(f"\nRecommended threshold (FAR th·∫•p nh·∫•t): {recommended_thresh:.4f}")

In [None]:
def build_comparison_model(embedding_network, input_dim=512):
    """
    Build model for comparing two faces
    
    Input: 2 ArcFace embeddings
    Output: Distance score
    """
    
    input_1 = layers.Input(shape=(input_dim,), name='embedding_1')
    input_2 = layers.Input(shape=(input_dim,), name='embedding_2')
    
    emb_1 = embedding_network(input_1)
    emb_2 = embedding_network(input_2)
    
    distance = layers.Lambda(
        lambda x: tf.sqrt(tf.reduce_sum(tf.square(x[0] - x[1]), axis=1, keepdims=True)),
        name='l2_distance'
    )([emb_1, emb_2])
    
    model = Model(inputs=[input_1, input_2], outputs=distance, name='face_comparison')
    
    return model

comparison_model = build_comparison_model(embedding_network, input_dim=512)

print("\n‚úÖ Comparison model created!")
comparison_model.summary()


In [None]:
def test_comparison(model, val_anchor, val_pos, val_neg, threshold, n_samples=10):
    """Test the comparison model"""
    
    print("\n" + "="*50)
    print("üîç TESTING COMPARISON MODEL")
    print("="*50)
    
    # Test positive pairs
    print("\n‚úÖ POSITIVE PAIRS (Same Person):")
    for i in range(min(n_samples, len(val_anchor))):
        dist = model.predict(
            [val_anchor[[i]], val_pos[[i]]],
            verbose=0
        )[0][0]
        
        result = "SAME" if dist < threshold else "DIFFERENT"
        status = "‚úÖ" if dist < threshold else "‚ùå"
        
        print(f"  {status} Pair {i+1}: Distance={dist:.4f} ‚Üí {result}")
    
    print("\n‚ùå NEGATIVE PAIRS (Different Person):")
    for i in range(min(n_samples, len(val_anchor))):
        dist = model.predict(
            [val_anchor[[i]], val_neg[[i]]],
            verbose=0
        )[0][0]
        
        result = "SAME" if dist < threshold else "DIFFERENT"
        status = "‚úÖ" if dist >= threshold else "‚ùå"
        
        print(f"  {status} Pair {i+1}: Distance={dist:.4f} ‚Üí {result}")

test_comparison(comparison_model, val_anchor, val_pos, val_neg, 
                optimal_threshold, n_samples=5)


In [None]:
def save_models_for_production():
    """Save models for production use"""
    
    print("\nüíæ Saving models...")
    
    embedding_network.save('embedding_network.keras')
    print("‚úÖ Saved: embedding_network.keras")
    
    comparison_model.save('face_comparison_model.keras')
    print("‚úÖ Saved: face_comparison_model.keras")
    
    np.save('optimal_threshold.npy', optimal_threshold)
    print(f"‚úÖ Saved: optimal_threshold.npy (value: {optimal_threshold:.4f})")
    
    print("\n" + "="*50)
    print("üì¶ MODELS SAVED SUCCESSFULLY")
    print("="*50)

save_models_for_production()

print("\n" + "="*70)
print("üéâ TRAINING COMPLETE!")
print("="*70)


In [None]:
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt

print("üîç Evaluating Triplet Model using EMBEDDINGS (no images)")

X1 = np.vstack([val_anchor, val_anchor])
X2 = np.vstack([val_pos, val_neg])

y_true = np.array(
    [1] * len(val_anchor) +   # anchor‚Äìpositive
    [0] * len(val_anchor)     # anchor‚Äìnegative
)

distances = np.linalg.norm(X1 - X2, axis=1)

thresholds = np.linspace(distances.min(), distances.max(), 300)

best_acc = 0
best_th = 0

for th in thresholds:
    y_pred = (distances < th).astype(int)
    acc = np.mean(y_pred == y_true)
    if acc > best_acc:
        best_acc = acc
        best_th = th
y_pred_final = (distances < best_th).astype(int)

print(f"‚úÖ Best Threshold: {best_th:.4f}")
print(f"‚úÖ Validation Accuracy: {best_acc:.4f}")

print("\nüìä Classification Report:")
print(classification_report(
    y_true,
    y_pred_final,
    target_names=["Kh√¥ng kh·ªõp", "Kh·ªõp"]
))

cm = confusion_matrix(y_true, y_pred_final)

plt.figure(figsize=(4,4))
plt.imshow(cm, cmap="Blues")
plt.title("Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.colorbar()
plt.show()
