In [1]:
import pandas as pd
import os

In [19]:
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np
class DataGenerator(tf.keras.utils.Sequence):

    def __init__(self, df, batch_size=32,shuffle=True,image_dir=""):
        self.batch_size = batch_size
        self.df = df
        self.image_dir = image_dir
        self.indices = self.df.index.tolist()
        self.shuffle = shuffle
        self.on_epoch_end()


    def __len__(self):
        return len(self.indices) // self.batch_size

    def __getitem__(self, index):
        index = self.index[index * self.batch_size:(index + 1) * self.batch_size]
        batch = [self.indices[k] for k in index]
        X, y = self.__get_data(batch)
        return X, y

    def on_epoch_end(self):
        self.index = np.arange(len(self.indices))
        if self.shuffle == True:
            np.random.shuffle(self.index)
    
    

    def load_embedding(self, slice):
        data = np.load(self.df.iloc[slice]['data'], allow_pickle=True)
        embedding = data[0]
        vector_size = len(embedding)//2 
        embedding = embedding[:vector_size] - embedding[vector_size:]
        label = data[1]
        return embedding, label

    def __get_data(self, batch):
        X = []
        y = []
        for i, id in enumerate(batch):
            img, label = self.load_embedding(batch[i])
       
            X.append(img)
            y.append(label)
        return np.array(X),np.array(y)

In [20]:

df_validation = pd.DataFrame({'data':[f"test_car/embeddings/{x}" for x in os.listdir('test_car/embeddings/')]})

In [21]:
# from load_model import get_reid_embeddings

In [22]:
val_dataloader = DataGenerator(df_validation, batch_size=32)

In [23]:
from tqdm import tqdm

In [7]:
cosine_loss = tf.keras.losses.CosineSimilarity(axis=1, reduction=tf.keras.losses.Reduction.NONE)

In [8]:
# test_embedding = None
# test_label = None
# preds = []
# ground_truth = []
# for i, j  in  tqdm(val_dataloader):
#     test_embedding = i[0] 
#     test_label = j[0]
#     embedding1 = test_embedding[:len(test_embedding)//2]
#     embedding2 = test_embedding[len(test_embedding)//2:]
#     label = cosine_loss(embedding1[np.newaxis], embedding2[np.newaxis]).numpy()[0] < -0.35
#     preds.append(label)
#     ground_truth.append(test_label)

In [9]:
# sum(ground_truth)/len(ground_truth)

In [10]:
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

In [13]:
# accuracy_score(preds, ground_truth)

0.5292389068468998

In [14]:
# precision_recall_fscore_support(preds, ground_truth,average='macro')

(0.5292389068468998, 0.6043241086503741, 0.4259483415676265, None)

In [20]:
# embedding1.shape

(768,)

In [48]:
# label = cosine_loss(embedding1[np.newaxis], embedding2[np.newaxis]).numpy()[0] < -0.35

In [11]:
# label==test_label

In [12]:
df = pd.DataFrame({'data':[f"data/embeddings/{x}" for x in os.listdir('data/embeddings/')]})

In [13]:
df.head()

Unnamed: 0,data
0,data/embeddings/validation-1.npy
1,data/embeddings/validation-10.npy
2,data/embeddings/validation-100.npy
3,data/embeddings/validation-1000.npy
4,data/embeddings/validation-10000.npy


In [14]:
df_validation.head()

Unnamed: 0,data
0,test_car/embeddings/validation-1.npy
1,test_car/embeddings/validation-10.npy
2,test_car/embeddings/validation-100.npy
3,test_car/embeddings/validation-1000.npy
4,test_car/embeddings/validation-10000.npy


In [15]:
train_dataloader = DataGenerator(df)
val_dataloader = DataGenerator(df_validation)

In [16]:
import tensorflow as tf


In [17]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv1D, Flatten, MaxPooling1D,Dropout
from numpy import unique

In [18]:
from keras import backend as K

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision


In [24]:
from keras.models import Sequential
from keras.layers import Conv1D, MaxPooling1D, Flatten, Dense,BatchNormalization

model = Sequential()
model.add(Conv1D(filters=256, kernel_size=5, activation='relu', padding='same', input_shape=(1536//2, 1)))
model.add(BatchNormalization())
model.add(MaxPooling1D(pool_size=2))
model.add(Flatten())
model.add(Dense(128, activation='relu', kernel_regularizer='l2', bias_regularizer='l2'))
model.add(Dropout(0.2))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy',precision_m])
# model.load_weights('D_vector.h5')

In [40]:
# TP = 0
# FP = 0
# FN = 0

# # Iterate over the batches in the Sequence
# for batch_data in tqdm(val_dataloader):
#     batch_inputs, batch_true_labels = batch_data  # Assuming the batch data contains inputs and true labels
    
#     # Make predictions using the model
#     batch_pred_labels = model.predict(batch_inputs) > 0.85
    
#     # Convert labels to the appropriate data type
#     batch_pred_labels = tf.cast(batch_pred_labels, tf.int32)
#     batch_true_labels = tf.cast(batch_true_labels, tf.int32)
    
#     # Calculate TP, FP, FN for the batch
#     batch_TP = tf.math.count_nonzero(tf.logical_and(tf.equal(batch_pred_labels, 1), tf.equal(batch_true_labels, 1)))
#     batch_FP = tf.math.count_nonzero(tf.logical_and(tf.equal(batch_pred_labels, 1), tf.equal(batch_true_labels, 0)))
#     batch_FN = tf.math.count_nonzero(tf.logical_and(tf.equal(batch_pred_labels, 0), tf.equal(batch_true_labels, 1)))
#     batch_TN = tf.math.count_nonzero(tf.logical_and(tf.equal(batch_pred_labels, 0), tf.equal(batch_true_labels, 0)))

# #     print(batch_TP,batch_FP,batch_FN,batch_TN)
#     # Accumulate TP, FP, FN across batches
#     TP += batch_TP
#     FP += batch_FP
#     FN += batch_FN

# # Calculate precision and recall
# precision = TP / (TP + FP)
# recall = TP / (TP + FN)

# # Print the results
# print("Precision:", precision.numpy())
# print("Recall:", recall.numpy())

100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 425/425 [01:51<00:00,  3.80it/s]

Precision: 0.507085553215215
Recall: 0.6021314550537001





In [22]:
# model.load_weights('conv1d_1D128.h5')

In [25]:
es = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=50)
mcp = tf.keras.callbacks.ModelCheckpoint(
    filepath='d_vector.h5',
    save_weights_only=True,
    monitor='val_precision_m',
    mode='max',
    save_best_only=True)
import datetime
log_dir = f"logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

In [None]:
model.fit(train_dataloader, validation_data=val_dataloader, epochs = 100, batch_size = 32, callbacks=[mcp,tensorboard_callback,es])

In [None]:
model.sa

In [None]:
model.evaluate(val_dataloader)



In [None]:
xtest[0].shape

In [None]:
dummy_input-np.random.rand(1536)

In [None]:
dummy_input = np.random.rand(1536)
model(dummy_input[np.newaxis])

In [6]:
from load_model import get_reid_embeddings

  from .autonotebook import tqdm as notebook_tqdm


using Transformer_type: vit_base_patch16_224_TransReID as a backbone
using stride: [14, 14], and part number is num_y18 * num_x18
using drop_path_rate is : 0.1
using aie_xishu is : 2.5
Loading finetune model......from model-weights/transformer_40.pth
base.cls_token
base.pos_embed
base.patch_embed.proj.weight
base.patch_embed.proj.bias
base.blocks.0.norm1.weight
base.blocks.0.norm1.bias
base.blocks.0.attn.qkv.weight
base.blocks.0.attn.qkv.bias
base.blocks.0.attn.proj.weight
base.blocks.0.attn.proj.bias
base.blocks.0.norm2.weight
base.blocks.0.norm2.bias
base.blocks.0.mlp.fc1.weight
base.blocks.0.mlp.fc1.bias
base.blocks.0.mlp.fc2.weight
base.blocks.0.mlp.fc2.bias
base.blocks.1.norm1.weight
base.blocks.1.norm1.bias
base.blocks.1.attn.qkv.weight
base.blocks.1.attn.qkv.bias
base.blocks.1.attn.proj.weight
base.blocks.1.attn.proj.bias
base.blocks.1.norm2.weight
base.blocks.1.norm2.bias
base.blocks.1.mlp.fc1.weight
base.blocks.1.mlp.fc1.bias
base.blocks.1.mlp.fc2.weight
base.blocks.1.mlp.fc2.

In [7]:
cars = ['car1','car2','car3','car4']

In [8]:
import os

In [9]:
imgs = {}
for car in cars:
    imgs[car] = [f'test_car/{car}/{x}' for x in os.listdir(f'test_car/{car}')]

In [10]:
import random
test_images = []
for car in cars:
    test_images.extend(imgs[car])

In [11]:
from itertools import combinations
combinations = list(combinations(test_images, 2))

In [12]:
from tqdm import tqdm
import cv2
import numpy as np

In [13]:
combinations[0]

('test_car/car1/1.jpeg', 'test_car/car1/2.jpeg')

In [14]:
x = []
y_hat = []
y_pred = []
for pair in tqdm(combinations):
    img1 = cv2.imread(pair[0])
    img2 = cv2.imread(pair[1])
    img1_embedding = get_reid_embeddings(cv2.resize(img1,(256,256)))
    img2_embedding = get_reid_embeddings(cv2.resize(img2,(256,256)))
    embedding = np.concatenate((img1_embedding,img2_embedding), axis=0)
    pred = model(embedding[np.newaxis])[0]
    y_pred.append(pred)
    y_hat.append(pair[0].split('/')[-2].split('_')[-1] == pair[1].split('/')[-2].split('_')[-1])
#     x.append(embedding)
#     print(pair[0].split('/')[-2].split('_')[-1] == pair[1].split('/')[-2].split('_')[-1])
#     print(pair[0].split('/')[-2].split('_')[-1],pair[1].split('/')[-2].split('_')[-1])
#     print(pair)

#     y.append(pair[0].split('/')[-2].split('_')[-1] == pair[1].split('/')[-2].split('_')[-1])

100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 325/325 [00:40<00:00,  7.95it/s]


In [15]:
# y_pred = [y[0] for y in y_pred]
y_pred[0]

<tf.Tensor: shape=(1,), dtype=float32, numpy=array([0.9912661], dtype=float32)>

In [16]:
y_pred = np.array(y_pred)
y_hat = np.array(y_hat)

In [20]:
# def calculate_metrics(predictions, ground_truth):
#     """
#     Calculates accuracy, precision, recall, and F1-score given the prediction and ground truth lists.
#     """
#     true_positives = sum(1 for pred, true in zip(predictions, ground_truth) if pred == 1 and true == 1)
#     false_positives = sum(1 for pred, true in zip(predictions, ground_truth) if pred == 1 and true == 0)
#     false_negatives = sum(1 for pred, true in zip(predictions, ground_truth) if pred == 0 and true == 1)
#     true_negatives = sum(1 for pred, true in zip(predictions, ground_truth) if pred == 0 and true == 0)

#     accuracy = (true_positives + true_negatives) / len(predictions)
#     precision = true_positives / (true_positives + false_positives)
#     recall = true_positives / (true_positives + false_negatives)
#     f1_score = 2 * (precision * recall) / (precision + recall)

#     return accuracy, precision, recall, f1_score


# accuracy, precision, recall, f1_score = calculate_metrics(y_pred>0.7, y_hat)
# print("Accuracy:", accuracy)
# print("Precision:", precision)
# print("Recall:", recall)
# print("F1-score:", f1_score)

Accuracy: 0.5938461538461538
Precision: 0.3564356435643564
Recall: 0.972972972972973
F1-score: 0.5217391304347826


In [None]:
x = np.array(x)
y = np.array(y)

In [None]:
y = y.reshape(-1,1)

In [None]:
x.shape

In [None]:
np.save(f'data/validation_x.npy', x)
np.save(f'data/validation_y.npy', y)

In [None]:
model.evaluate(x,y)

In [None]:
x[:,0][0].reshape(-1,1).shape