In [1]:
import os
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from PIL import Image
from sklearn.model_selection import train_test_split
from torchvision import transforms
from torch.utils.data import TensorDataset, DataLoader, random_split
from torchreid import models
import torch.optim as optim
import cv2
from torchvision import transforms
from torchinfo import summary
from tqdm import tqdm
import matplotlib.pyplot as plt



In [2]:
def create_image_dataframe(base_path):
    data = []

    # Define label mapping
    label_map = {
        'employees': 1,
        'customers': 0
    }

    for category in ['employees', 'customers']:
        category_path = os.path.join(base_path, category)
        label = label_map[category]

        for subfolder in os.listdir(category_path):
            subfolder_path = os.path.join(category_path, subfolder)
            if not os.path.isdir(subfolder_path):
                continue

            for filename in os.listdir(subfolder_path):
                if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
                    img_path = os.path.join(subfolder_path, filename)
                    data.append({'img_path': img_path, 'label': label})

    df = pd.DataFrame(data)
    return df

# Example usage
# Replace 'data/' with your actual base directory path
df = create_image_dataframe('/Users/saptarshimallikthakur/Desktop/tracking/Bluetokai/IN OUT/data')
df

Unnamed: 0,img_path,label
0,/Users/saptarshimallikthakur/Desktop/tracking/...,1
1,/Users/saptarshimallikthakur/Desktop/tracking/...,1
2,/Users/saptarshimallikthakur/Desktop/tracking/...,1
3,/Users/saptarshimallikthakur/Desktop/tracking/...,1
4,/Users/saptarshimallikthakur/Desktop/tracking/...,1
...,...,...
1829,/Users/saptarshimallikthakur/Desktop/tracking/...,0
1830,/Users/saptarshimallikthakur/Desktop/tracking/...,0
1831,/Users/saptarshimallikthakur/Desktop/tracking/...,0
1832,/Users/saptarshimallikthakur/Desktop/tracking/...,0


In [3]:
df['label'].value_counts()

label
0    1253
1     581
Name: count, dtype: int64

In [3]:
# 1) Build & load the Market1501-pretrained OSNet ×1.0
def build_reid_model(checkpoint_path, device='cpu'):
    
    # a) instantiate the architecture
    model = models.osnet_x1_0(
        num_classes=1041,
        loss='softmax',
        pretrained=False,
        use_pretrained_backbone=False
    )

    # b) load the checkpoint
    ckpt = torch.load(checkpoint_path, map_location='cpu')
    state_dict = ckpt.get('state_dict', ckpt)

    # c) strip "module." if present
    clean_state = {k.replace('module.', ''): v for k, v in state_dict.items()}
    model.load_state_dict(clean_state)
    return model.to(device).eval()

# 2) Standard ReID preprocessing (256×128 + ImageNet norm)
preprocess = transforms.Compose([
    transforms.Resize((256, 128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std =[0.229, 0.224, 0.225])
])

# @torch.no_grad()
# def extract_embeddings_both(model, img_path, device='cpu'):
#     pil_img = Image.open(img_path).convert('RGB')

#     # Original
#     x_orig = preprocess(pil_img).unsqueeze(0).to(device)
#     f_orig = model(x_orig).squeeze(0).cpu().numpy()

#     # Flipped
#     pil_flip = transforms.functional.hflip(pil_img)
#     x_flip = preprocess(pil_flip).unsqueeze(0).to(device)
#     f_flip = model(x_flip).squeeze(0).cpu().numpy()

#     return f_orig, f_flip

@torch.no_grad()
def extract_embeddings_both(model, img_path, device='cpu'):
    pil_img = Image.open(img_path).convert('RGB')

    # Original
    x_orig = preprocess(pil_img).unsqueeze(0).to(device)
    f_orig = model(x_orig).squeeze(0).cpu().numpy()

    return f_orig

In [4]:
# # 1) load & freeze your 512-D extractor
# backbone = build_reid_model("osnet_x1_0_msmt17_256x128_amsgrad_ep150_stp60_lr0.0015_b64_fb10_softmax_labelsmooth_flip.pth", device='mps')
# backbone.eval()

# # 2) precompute all embeddings
# embs, labs = [], []
# for _, row in tqdm(df.iterrows(), total=len(df)):
#     f1, f2 = extract_embeddings_both(backbone, row.img_path, device='mps')
    
#     embs.extend([f1, f2])              # add both embeddings
#     labs.extend([int(row.label)] * 2)  # same label for both

# embs = torch.from_numpy(np.stack(embs)).float()  # [N,512]
# labs = torch.tensor(labs).long()                 # [N]


# 1) load & freeze your 512-D extractor
backbone = build_reid_model("osnet_x1_0_msmt17_256x128_amsgrad_ep150_stp60_lr0.0015_b64_fb10_softmax_labelsmooth_flip.pth", device='mps')
backbone.eval()

# 2) precompute all embeddings
embs, labs = [], []
for _, row in tqdm(df.iterrows(), total=len(df)):
    f1 = extract_embeddings_both(backbone, row.img_path, device='mps')
    
    embs.append(f1)            # add both embeddings
    labs.append(row.label) # same label for both

embs = torch.from_numpy(np.stack(embs)).float()  # [N,512]
labs = torch.tensor(labs).long()  

100%|██████████| 1834/1834 [00:40<00:00, 44.84it/s]


In [11]:
embs.shape

torch.Size([1834, 512])

In [12]:
# 3) make train/val splits
dataset = TensorDataset(embs, labs)
val_sz = int(len(dataset)*0.1)
train_ds, val_ds = random_split(dataset, [len(dataset)-val_sz, val_sz])

train_loader = DataLoader(train_ds, batch_size=8)
val_loader   = DataLoader(val_ds,   batch_size=8)

# 4) define your head
hidden_dim = 16
head = nn.Sequential(
    nn.Linear(512, hidden_dim),
    nn.ReLU(inplace=True),
    nn.Dropout(0.1),
    nn.Linear(hidden_dim, 2)
).to('mps')

opt = optim.Adam(head.parameters(), lr=1e-3, weight_decay=1e-4)
crit = nn.CrossEntropyLoss()

In [13]:
# Hyper-params for early stopping
patience = 5           # how many epochs to wait after last improvement
best_acc = 0.0
epochs_no_improve = 0

for epoch in range(1, 51):
    head.train()
    for x, y in train_loader:
        x, y = x.to('mps'), y.to('mps')
        opt.zero_grad()
        loss = crit(head(x), y)
        loss.backward()
        opt.step()

    # evaluation
    head.eval()
    correct = 0
    with torch.no_grad():
        for x, y in val_loader:
            x, y = x.to('mps'), y.to('mps')
            preds = head(x).argmax(1)
            correct += (preds == y).sum().item()
    acc = correct / len(val_ds)

    # check for improvement
    if acc > best_acc:
        best_acc = acc
        epochs_no_improve = 0
        # (optional) save the best model
        torch.save(head.state_dict(), "best_head.pt")
    else:
        epochs_no_improve += 1
        if epochs_no_improve >= patience:
            print(f"Early stopping after {epoch} epochs (no improvement in {patience} epochs)")
            break

    print(f"Epoch {epoch:02d}  Val Acc: {acc:.4f}  (best: {best_acc:.4f})")

print("Best val acc:", best_acc)

head.load_state_dict(torch.load("best_head.pt"))

Epoch 01  Val Acc: 0.8852  (best: 0.8852)
Epoch 02  Val Acc: 0.9071  (best: 0.9071)
Epoch 03  Val Acc: 0.9126  (best: 0.9126)
Epoch 04  Val Acc: 0.9071  (best: 0.9126)
Epoch 05  Val Acc: 0.9071  (best: 0.9126)
Epoch 06  Val Acc: 0.9071  (best: 0.9126)
Epoch 07  Val Acc: 0.9126  (best: 0.9126)
Early stopping after 8 epochs (no improvement in 5 epochs)
Best val acc: 0.912568306010929


<All keys matched successfully>

In [8]:
class EmployeeClassifier(nn.Module):
    def __init__(self, backbone, head):
        super().__init__()
        self.backbone = backbone.eval()     # 512-D features
        self.head     = head.eval()         # 2-class logits

    def forward(self, x):                   # x: [B,3,256,128]
        feats = self.backbone(x)            # [B,512]
        return self.head(feats)             # [B,2]
    
full_model = EmployeeClassifier(backbone, head).cpu()   # ONNX export works on CPU
dummy      = torch.randn(1, 3, 256, 128)                # BCHW

torch.onnx.export(
    full_model,                            # model
    dummy,                                 # example input
    "employee_classifier.onnx",            # file to write
    opset_version = 12,                    # 11+ works with ONNX-Runtime
    input_names  = ["images"],
    output_names = ["logits"],
    dynamic_axes = {"images": {0: "batch"}, "logits": {0: "batch"}}
)
print("✓ exported → employee_classifier.onnx")


✓ exported → employee_classifier.onnx


In [9]:
import onnx

model = onnx.load("employee_classifier.onnx")
onnx.checker.check_model(model)      # throws if something is wrong

In [10]:
import cv2
import numpy as np
import onnxruntime as ort

# Imagenet means/std you used
IMNET_MEAN = np.array([0.485, 0.456, 0.406], dtype=np.float32)
IMNET_STD  = np.array([0.229, 0.224, 0.225], dtype=np.float32)

def preprocess(img_path:str) -> np.ndarray:
    img = cv2.imread(img_path)                 # BGR uint8
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (128, 256))          # (W,H)
    img = img.astype(np.float32) / 255.0
    img = (img - IMNET_MEAN) / IMNET_STD
    img = np.transpose(img, (2,0,1))           # CHW
    return np.expand_dims(img, 0)              # [1,3,256,128]

sess = ort.InferenceSession("employee_classifier.onnx",
                            providers=["CPUExecutionProvider"])   # or CUDAExecutionProvider

def predict(img_path:str) -> int:
    inp  = preprocess(img_path)                         # float32
    logits = sess.run(None, {"images": inp})[0]         # [1,2]
    return int(logits.argmax(1)[0])                     # 1 = employee, 0 = customer