In [1]:
import cv2
import torch
import numpy as np
from torchvision import transforms
from retinaface import RetinaFace
from PIL import Image
from torchvision.models import mobilenet_v3_large
from torchvision.models import vit_b_32
from torch import nn
from pycocotools.coco import COCO
from mean_average_precision import MetricBuilder
import time
from tqdm import tqdm
import json
import os
from sklearn_extra.cluster import KMedoids

IMG_SIZE = 224
NUM_CLASSES = 7
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
image_dir = "E://Kuliah//UPI//SEMESTER 8//dataset skripsi//dataset ruang kelas final//temp//sesi3 front"
gt_json_path = "E://Kuliah//UPI//SEMESTER 8//dataset skripsi//dataset ruang kelas final//temp//front_annotations.json"
output_json = "E://Kuliah//UPI//SEMESTER 8//dataset skripsi//dataset ruang kelas final//temp//predict5//hybrid_pool_merge_front_predict2.json"

transform = transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.ToTensor(),
    ])

class TokenPooling(nn.Module):
    def __init__(self, keep_tokens: int, use_weighted: bool = True):
        super().__init__()
        self.keep_tokens = keep_tokens
        self.use_weighted = use_weighted

    def forward(self, x: torch.Tensor, significance: torch.Tensor = None) -> torch.Tensor:
        B, N_plus_1, D = x.shape
        cls_token, tokens = x[:, :1, :], x[:, 1:, :]  # (B, 1, D), (B, N, D)

        if self.keep_tokens >= tokens.shape[1]:
            return x  # tidak perlu pooling

        if not self.use_weighted:
            significance = torch.ones(tokens.shape[:2], device=x.device)

        # Ambil top-k token berdasarkan skor
        topk_scores, topk_indices = torch.topk(significance, self.keep_tokens, dim=1)  # (B, K)

        # Ambil token berdasarkan indeks top-k
        B_idx = torch.arange(B, device=x.device).unsqueeze(1).expand(-1, self.keep_tokens)  # (B, K)
        pooled_tokens = tokens[B_idx, topk_indices]  # (B, K, D)

        return torch.cat([cls_token, pooled_tokens], dim=1)  # (B, K+1, D)

class MultiheadSelfAttentionBlock(nn.Module):
    def __init__(self,
                 embedding_dim:int=768,
                 num_heads:int=12,
                 attn_dropout:float=0.):
        super().__init__()
        self.multihead_attn = nn.MultiheadAttention(embed_dim=embedding_dim,
                                                    num_heads=num_heads,
                                                    dropout=attn_dropout,
                                                    batch_first=True)
        self.attn_weights = None
    def forward(self, x):
        attn_output, attn_weights = self.multihead_attn(query=x,
                                             key=x,
                                             value=x,
                                             need_weights=True,
                                             average_attn_weights=False)
        self.attn_weights = attn_weights
        return attn_output, attn_weights
    
class MLPBlock(nn.Module):
    def __init__(self,
                 embedding_dim:int=768,
                 mlp_size:int=3072,
                 dropout:float=0.):
        super().__init__()
        self.mlp = nn.Sequential(
            nn.Linear(in_features=embedding_dim,
                      out_features=mlp_size),
            nn.GELU(),
            nn.Dropout(p=dropout),
            nn.Linear(in_features=mlp_size,
                      out_features=embedding_dim),
            nn.Dropout(p=dropout)
        )
    def forward(self, x):
        x = self.mlp(x)
        return x

class TransformerEncoderBlock(nn.Module):
    def __init__(self,
                 embedding_dim:int=768,
                 num_heads:int=12,
                 mlp_size:int=3072,
                 mlp_dropout:float=0.,
                 attn_dropout:float=0.):
        super().__init__()
        self.layer_norm1 = nn.LayerNorm(normalized_shape=embedding_dim, eps=1e-6)
        
        self.msa_block = MultiheadSelfAttentionBlock(embedding_dim=embedding_dim,
                                                     num_heads=num_heads,
                                                     attn_dropout=attn_dropout)
        
        self.layer_norm2 = nn.LayerNorm(normalized_shape=embedding_dim, eps=1e-6)
        
        self.mlp_block =  MLPBlock(embedding_dim=embedding_dim,
                                   mlp_size=mlp_size,
                                   dropout=mlp_dropout)
    def forward(self, x):
        x_attn, attn_weights = self.msa_block(self.layer_norm1(x))
        x = x_attn + x
        
        x = self.mlp_block(self.layer_norm2(x)) + x 
        
        return x, attn_weights

class ViTMobilenet(nn.Module):
    def __init__(self,
                 img_size:int=224, # Training resolution from Table 3 in ViT paper
                 in_channels:int=3, # Number of channels in input image
                 patch_size:int=16, # Patch size
                 num_transformer_layers:int=12, # Layers from Table 1 for ViT-Base
                 embedding_dim:int=768, # Hidden size D from Table 1 for ViT-Base
                 mlp_size:int=3072, # MLP size from Table 1 for ViT-Base
                 num_heads:int=12, # Heads from Table 1 for ViT-Base
                 attn_dropout:float=0., # Dropout for attention projection
                 mlp_dropout:float=0., # Dropout for dense/MLP layers 
                 embedding_dropout:float=0., # Dropout for patch and position embeddings
                 num_classes:int=1000): # Default for ImageNet but can customize this
        super().__init__()
         
        assert img_size % 32 == 0, f"Image size must be divisible by 32, image size: {img_size}"
        
        self.mobilenet = mobilenet_v3_large(pretrained=True).features
        
        self.projection = nn.Conv2d(in_channels=960, 
                                    out_channels=embedding_dim,
                                    kernel_size=1)
                 
        self.class_embedding = nn.Parameter(data=torch.randn(1, 1, embedding_dim),
                                            requires_grad=True)

        self.num_patches = (img_size // 32) ** 2  # MobileNet reduces spatial size by 32x
        
        self.position_embedding = nn.Parameter(data=torch.randn(1, self.num_patches+1, embedding_dim),
                                               requires_grad=True)
                
        self.embedding_dropout = nn.Dropout(p=embedding_dropout)
        
        self.transformer_encoder = nn.Sequential(*[TransformerEncoderBlock(embedding_dim=embedding_dim,
                                                                            num_heads=num_heads,
                                                                            mlp_size=mlp_size,
                                                                            mlp_dropout=mlp_dropout) for _ in range(num_transformer_layers)])
        
        self.keep_tokens = [49, 35, 35, 35, 26, 26, 20, 20, 20, 12, 12, 12]
        # Tambahkan TokenPooling per layer (jumlah token disesuaikan)
        self.token_pools = nn.ModuleList([
            TokenPooling(keep_tokens=k, use_weighted=True) for k in self.keep_tokens
        ])
       
        self.norm = nn.LayerNorm(normalized_shape=embedding_dim, eps=1e-6)
        self.head = nn.Linear(in_features=embedding_dim, out_features=num_classes)
    
    def forward(self, pixel_values, labels=None):
        
        batch_size = pixel_values.shape[0]

        # Extract features using MobileNet
        features = self.mobilenet(pixel_values)  # Output shape: (batch_size, 1280, H', W')
        features = self.projection(features)  # Project to embedding_dim: (batch_size, embedding_dim, H', W')

        # Flatten the feature maps into a sequence of tokens
        features = features.flatten(2).transpose(1, 2)  # Shape: (batch_size, num_patches, embedding_dim)
        
        class_token = self.class_embedding.expand(batch_size, -1, -1)

        x = torch.cat((class_token, features), dim=1)  # Shape: (batch_size, num_patches + 1, embedding_dim)

        x = x + self.position_embedding

        x = self.embedding_dropout(x)
        
        significance_scores = []

        for i, block in enumerate(self.transformer_encoder):
            x, attn_weights = block(x)
            
            # Hitung significance score: total attention yang diterima setiap token
            score = attn_weights.sum(dim=1).sum(dim=1)[:, 1:]  # shape: (B, N-1)
            
            significance_scores.append(score)
            
            if self.token_pools[i].keep_tokens > 0:
                x = self.token_pools[i](x, significance=score)
            else:
                x = x[:, :1, :]  # hanya CLS token

        x = self.norm(x)
        
        cls_token_final = x[:, 0]

        logits = self.head(cls_token_final)

        return logits
# ==========================================================

# class MultiheadSelfAttentionBlock(nn.Module):
#     def __init__(self,
#                  embedding_dim:int=768,
#                  num_heads:int=12,
#                  attn_dropout:float=0.):
#         super().__init__()
#         self.multihead_attn = nn.MultiheadAttention(embed_dim=embedding_dim,
#                                                     num_heads=num_heads,
#                                                     dropout=attn_dropout,
#                                                     batch_first=True)
#     def forward(self, x):
#         attn_output, _ = self.multihead_attn(query=x,
#                                              key=x,
#                                              value=x,
#                                              need_weights=False)
#         return attn_output
    
# class MLPBlock(nn.Module):
#     def __init__(self,
#                  embedding_dim:int=768,
#                  mlp_size:int=3072,
#                  dropout:float=0.):
#         super().__init__()
#         self.mlp = nn.Sequential(
#             nn.Linear(in_features=embedding_dim,
#                       out_features=mlp_size),
#             nn.GELU(),
#             nn.Dropout(p=dropout),
#             nn.Linear(in_features=mlp_size,
#                       out_features=embedding_dim),
#             nn.Dropout(p=dropout)
#         )
#     def forward(self, x):
#         x = self.mlp(x)
#         return x

# class TransformerEncoderBlock(nn.Module):
#     def __init__(self,
#                  embedding_dim:int=768,
#                  num_heads:int=12,
#                  mlp_size:int=3072,
#                  mlp_dropout:float=0.,
#                  attn_dropout:float=0.):
#         super().__init__()
#         self.layer_norm1 = nn.LayerNorm(normalized_shape=embedding_dim, eps=1e-6)
        
#         self.msa_block = MultiheadSelfAttentionBlock(embedding_dim=embedding_dim,
#                                                      num_heads=num_heads,
#                                                      attn_dropout=attn_dropout)
        
#         self.layer_norm2 = nn.LayerNorm(normalized_shape=embedding_dim, eps=1e-6)
        
#         self.mlp_block =  MLPBlock(embedding_dim=embedding_dim,
#                                    mlp_size=mlp_size,
#                                    dropout=mlp_dropout)
#     def forward(self, x):
#         x = self.msa_block(self.layer_norm1(x)) + x 
        
#         x = self.mlp_block(self.layer_norm2(x)) + x 
        
#         return x

# class ViTMobilenet(nn.Module):
#     def __init__(self,
#                  img_size:int=224, # Training resolution from Table 3 in ViT paper
#                  in_channels:int=3, # Number of channels in input image
#                  patch_size:int=16, # Patch size
#                  num_transformer_layers:int=12, # Layers from Table 1 for ViT-Base
#                  embedding_dim:int=768, # Hidden size D from Table 1 for ViT-Base
#                  mlp_size:int=3072, # MLP size from Table 1 for ViT-Base
#                  num_heads:int=12, # Heads from Table 1 for ViT-Base
#                  attn_dropout:float=0., # Dropout for attention projection
#                  mlp_dropout:float=0., # Dropout for dense/MLP layers 
#                  embedding_dropout:float=0., # Dropout for patch and position embeddings
#                  num_classes:int=1000): # Default for ImageNet but can customize this
#         super().__init__()
         
#         assert img_size % 32 == 0, f"Image size must be divisible by 32, image size: {img_size}"
        
#         self.mobilenet = mobilenet_v3_large(pretrained=True).features
        
#         self.projection = nn.Conv2d(in_channels=960, 
#                                     out_channels=embedding_dim,
#                                     kernel_size=1)
                 
#         self.class_embedding = nn.Parameter(data=torch.randn(1, 1, embedding_dim),
#                                             requires_grad=True)

#         self.num_patches = (img_size // 32) ** 2  # MobileNet reduces spatial size by 32x
        
#         self.position_embedding = nn.Parameter(data=torch.randn(1, self.num_patches+1, embedding_dim),
#                                                requires_grad=True)
                
#         self.embedding_dropout = nn.Dropout(p=embedding_dropout)
        
#         self.transformer_encoder = nn.Sequential(*[TransformerEncoderBlock(embedding_dim=embedding_dim,
#                                                                             num_heads=num_heads,
#                                                                             mlp_size=mlp_size,
#                                                                             mlp_dropout=mlp_dropout) for _ in range(num_transformer_layers)])
       
#         self.norm = nn.LayerNorm(normalized_shape=embedding_dim, eps=1e-6)
#         self.head = nn.Linear(in_features=embedding_dim, out_features=num_classes)
    
#     def forward(self, pixel_values, labels=None):
        
#         batch_size = pixel_values.shape[0]

#         # Extract features using MobileNet
#         features = self.mobilenet(pixel_values)  # Output shape: (batch_size, 1280, H', W')
#         features = self.projection(features)  # Project to embedding_dim: (batch_size, embedding_dim, H', W')

#         # Flatten the feature maps into a sequence of tokens
#         features = features.flatten(2).transpose(1, 2)  # Shape: (batch_size, num_patches, embedding_dim)
        
#         class_token = self.class_embedding.expand(batch_size, -1, -1)

#         x = torch.cat((class_token, features), dim=1)  # Shape: (batch_size, num_patches + 1, embedding_dim)

#         x = x + self.position_embedding

#         x = self.embedding_dropout(x)

#         x = self.transformer_encoder(x)

#         x = self.norm(x)
        
#         cls_token_final = x[:, 0]

#         logits = self.head(cls_token_final)

#         return logits

# ================== Hybrid ============================
# model = ViTMobilenet(num_classes=NUM_CLASSES, 
#                      in_channels=3,  
#                      num_heads=12, 
#                      embedding_dim=768, 
#                      num_transformer_layers=12,
#                      mlp_size=3072)

# checkpoint = torch.load("E://Kuliah//UPI//SEMESTER 8//coba coba//hybrid_mobilenet_vit_pooling_SAM_best.pt")
# model.load_state_dict(checkpoint["model_state_dict"])
# model.eval().to(DEVICE)

model = ViTMobilenet(num_classes=NUM_CLASSES, 
                     in_channels=3,  
                     num_heads=12, 
                     embedding_dim=768, 
                     num_transformer_layers=12,
                     mlp_size=3072)

checkpoint = torch.load("E://Kuliah//UPI//SEMESTER 8//coba coba//hybrid_mobilenet_vit_merge_all_best.pt")
model.load_state_dict(checkpoint["model_state_dict"])
model.eval().to(DEVICE)

# model = ViTMobilenet(num_classes=NUM_CLASSES, 
#                      in_channels=3,  
#                      num_heads=12, 
#                      embedding_dim=768, 
#                      num_transformer_layers=12,
#                      mlp_size=3072)

# checkpoint = torch.load("E://Kuliah//UPI//SEMESTER 8//coba coba//hybrid_mobilenet_vit_pooling_SAM_finetune_best.pt")
# model.load_state_dict(checkpoint["model_state_dict"])
# model.eval().to(DEVICE)

# model = ViTMobilenet(num_classes=NUM_CLASSES, 
#                      in_channels=3,  
#                      num_heads=12, 
#                      embedding_dim=768, 
#                      num_transformer_layers=12,
#                      mlp_size=3072)

# checkpoint = torch.load("E://Kuliah//UPI//SEMESTER 8//coba coba//hybrid_mobilenet_vit_SAM_best.pt")
# model.load_state_dict(checkpoint["model_state_dict"])
# model.eval().to(DEVICE)

# ================== Mobilenetv3 ========================
# model = mobilenet_v3_large(pretrained=False)
# in_features = model.classifier[0].in_features
# model.classifier = nn.Linear(in_features, NUM_CLASSES)
# model = model.to(DEVICE)

# checkpoint = torch.load("E://Kuliah//UPI//SEMESTER 8//coba coba//percobaan10_only-mobilenetv3_no_balancing_best.pt")
# model.load_state_dict(checkpoint["model_state_dict"])
# model.eval()

# model = mobilenet_v3_large(pretrained=False)
# model.classifier[3] = nn.Linear(model.classifier[3].in_features, NUM_CLASSES)
# model = model.to(DEVICE)

# checkpoint = torch.load("E://Kuliah//UPI//SEMESTER 8//coba coba//mobilenetv3_pretrained_best.pt")
# model.load_state_dict(checkpoint["model_state_dict"])
# model.eval()

# ======================= Vit ==========================
# model = vit_b_32(pretrained=False)
# model.heads.head = nn.Linear(model.heads.head.in_features, NUM_CLASSES)
# model = model.to(DEVICE)

# checkpoint = torch.load("E://Kuliah//UPI//SEMESTER 8//coba coba//vit_pretrained_SAM_best.pt")
# model.load_state_dict(checkpoint["model_state_dict"])
# model.eval()

# Load JSON
with open(gt_json_path, 'r') as f:
    gt_data = json.load(f)

# Build mapping from file_name to image_id
filename_to_id = {img["file_name"]: img["id"] for img in gt_data["images"]}

image_extensions = ('.jpg', '.jpeg', '.png')

prediction_output = []

image_id = 0

for filename in tqdm(sorted(os.listdir(image_dir))):
    if not filename.lower().endswith(image_extensions):
        continue

    img_path = os.path.join(image_dir, filename)
    img = cv2.imread(img_path)

    if img is None:
        continue

    current_image_id = filename_to_id.get(filename)
    
    if current_image_id is None:
        print(f"Skipping {filename}: not found in ground truth.")
        continue

    # Deteksi wajah
    faces = RetinaFace.detect_faces(img_path)
    
    face_tensors = []
    boxes = []

    for face_key, face_data in faces.items():
        facial_area = face_data["facial_area"]
        x1, y1, x2, y2 = facial_area
        x = x1
        y = y1
        w = x2 - x1
        h = y2 - y1
        
        face_crop = img[y1:y2, x1:x2]
        
        try:
            face_img = Image.fromarray(cv2.cvtColor(face_crop, cv2.COLOR_BGR2RGB))
            tensor = transform(face_img)
            face_tensors.append(tensor)
            boxes.append([x1, y1, w, h])
        except Exception as e:
            print(f"Skipping face due to error: {e}")
            continue

    if len(face_tensors) > 0:
        batch_tensor = torch.stack(face_tensors).to(DEVICE)
        with torch.no_grad():
            logits = model(batch_tensor)
            probs = torch.softmax(logits, dim=1)
            class_ids = torch.argmax(probs, dim=1).cpu().numpy()
            scores = torch.max(probs, dim=1).values.cpu().numpy()

        for i in range(len(class_ids)):
            prediction_output.append({
                "image_id": int(current_image_id),
                "category_id": int(class_ids[i]),
                "bbox": [int(v) for v in boxes[i]],
                "score": float(scores[i])
        })

    image_id += 1

# Simpan ke file JSON
with open(output_json, "w") as f:
    json.dump(prediction_output, f, indent=2)

print(f"Deteksi wajah selesai. Anotasi disimpan ke {output_json}")

100%|████████████████████████████████████████████████████████████████████████████████| 290/290 [08:24<00:00,  1.74s/it]


Deteksi wajah selesai. Anotasi disimpan ke E://Kuliah//UPI//SEMESTER 8//dataset skripsi//dataset ruang kelas final//temp//predict5//hybrid_pool_merge_front_predict2.json
