# Data Science Competition (DSC) LOGIKA UI 2025  ### **Klasifikasi Rumah Adat Nusantara - Improved Version**---## Anggota Tim: *wets belom dicoba*- **Bryant Farrel Titanius**  - **Evans Kizito**  - **Franklin Daniel Situmorang**---## Improvements in this notebook:1. **Near-duplicate detection and removal** - Detects and removes near-duplicate images using perceptual hashing2. **Comprehensive visualization** - Shows duplicate images side-by-side with their similarity scores3. **Error handling** - Robust error handling for corrupted images4. **Class balance analysis** - Before and after duplicate removal5. **Improved training pipeline** - Uses clean dataset after duplicate removal

In [None]:
# ========================================# 1. IMPORT NECESSARY LIBRARIES# ========================================import osimport sysimport mathimport randomimport jsonimport shutilimport warningsimport gcfrom pathlib import Pathfrom glob import globfrom collections import Counter, defaultdictfrom typing import List, Tuple, Dict, Optionalimport numpy as npimport pandas as pdfrom PIL import Imageimport cv2from tqdm.auto import tqdm# Deep Learning Librariesimport torchimport torch.nn as nnimport torch.nn.functional as Ffrom torch.utils.data import Dataset, DataLoader, WeightedRandomSamplerfrom torch.cuda.amp import GradScaler, autocast# Model and Augmentationsimport timmimport albumentations as Afrom albumentations.pytorch import ToTensorV2# Metrics and Visualizationfrom sklearn.model_selection import StratifiedKFoldfrom sklearn.metrics import f1_score, classification_report, confusion_matriximport matplotlib.pyplot as pltimport seaborn as sns# Image Hashing for Near-Duplicate Detectionimport imagehashimport matplotlib.gridspec as gridspecwarnings.filterwarnings('ignore')

In [None]:
# ========================================# 2. MOUNT GOOGLE DRIVE (for Colab)# ========================================try:    from google.colab import drive    drive.mount('/content/drive')    COLAB = Trueexcept:    COLAB = False    print("Not running in Colab environment")

In [None]:
# ========================================# 3. DOWNLOAD KAGGLE DATASET (Optional)# ========================================def setup_kaggle():    """Setup Kaggle API credentials"""    if COLAB:        from google.colab import files        print("Please upload your kaggle.json file:")        uploaded = files.upload()                # Setup Kaggle credentials        !mkdir -p ~/.kaggle        !cp kaggle.json ~/.kaggle/        !chmod 600 ~/.kaggle/kaggle.json                # Download dataset        !kaggle competitions download -c dsc-logika-ui-2025                # Unzip dataset        !unzip -q dsc-logika-ui-2025.zip -d ./data/        print("Dataset downloaded and extracted!")# Uncomment to download dataset# setup_kaggle()

In [None]:
# ========================================# 4. CONFIGURATION# ========================================class CFG:    """Configuration class for all hyperparameters"""        # Data Paths    DATA_ROOT = "data"    TRAIN_DIR = os.path.join(DATA_ROOT, "Train", "Train")    TEST_DIR = os.path.join(DATA_ROOT, "Test", "Test")        # Clean data paths (after duplicate removal)    CLEAN_ROOT = "data_clean"    CLEAN_TRAIN = os.path.join(CLEAN_ROOT, "Train", "Train")    CLEAN_TEST = os.path.join(CLEAN_ROOT, "Test", "Test")        # Model Configuration    BACKBONE = 'eva02_large_patch14_448.mim_in22k_ft_in22k'    IMG_SIZE = 448    NUM_CLASSES = 5        # Training Configuration    BATCH_SIZE = 14    EPOCHS = 50    N_SPLITS = 5        # Learning Rate Schedule    UNFREEZE_HEAD_EPOCH = 1    UNFREEZE_FULL_EPOCH = 8    HEAD_LR = 1e-3    BODY_LR = 2e-5    MIN_LR = 1e-6    WEIGHT_DECAY = 1e-3    PATIENCE = 10        # Augmentation    MIXUP_ALPHA = 0.6    LABEL_SMOOTHING = 0.15        # Near-Duplicate Detection    HASH_SIZE = 16  # Larger = more precise    SIMILARITY_THRESHOLD = 0.95  # Images with similarity > this are considered duplicates        # System    SEED = 42    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")    NUM_WORKERS = 2# Set random seeds for reproducibilitydef seed_everything(seed=CFG.SEED):    random.seed(seed)    os.environ['PYTHONHASHSEED'] = str(seed)    np.random.seed(seed)    torch.manual_seed(seed)    torch.cuda.manual_seed(seed)    torch.backends.cudnn.deterministic = True    torch.backends.cudnn.benchmark = Falseseed_everything()

## Near-Duplicate Detection and RemovalThis section detects near-duplicate images using perceptual hashing and removes them to create a cleaner training dataset.

In [None]:
# ========================================# 5. NEAR-DUPLICATE DETECTION FUNCTIONS# ========================================class DuplicateDetector:    """Class for detecting and managing near-duplicate images"""        def __init__(self, hash_size=CFG.HASH_SIZE, similarity_threshold=CFG.SIMILARITY_THRESHOLD):        self.hash_size = hash_size        self.similarity_threshold = similarity_threshold        self.duplicates = defaultdict(list)        self.hash_dict = {}            def compute_hash(self, image_path: str) -> Optional[imagehash.ImageHash]:        """Compute perceptual hash for an image"""        try:            img = Image.open(image_path)            # Convert RGBA to RGB if necessary            if img.mode == 'RGBA':                img = img.convert('RGB')            # Use multiple hash methods for better accuracy            dhash = imagehash.dhash(img, hash_size=self.hash_size)            phash = imagehash.phash(img, hash_size=self.hash_size)            whash = imagehash.whash(img, hash_size=self.hash_size)            return {'dhash': dhash, 'phash': phash, 'whash': whash, 'path': image_path}        except Exception as e:            print(f"Error processing {image_path}: {e}")            return None        def compute_similarity(self, hash1: dict, hash2: dict) -> float:        """Compute similarity between two image hashes"""        dhash_sim = 1 - (hash1['dhash'] - hash2['dhash']) / (self.hash_size * self.hash_size * 4)        phash_sim = 1 - (hash1['phash'] - hash2['phash']) / (self.hash_size * self.hash_size * 4)        whash_sim = 1 - (hash1['whash'] - hash2['whash']) / (self.hash_size * self.hash_size * 4)        # Average similarity across hash methods        return (dhash_sim + phash_sim + whash_sim) / 3        def find_duplicates(self, image_paths: List[str]) -> Dict[str, List[Tuple[str, float]]]:        """Find all near-duplicate images in a list of image paths"""        print(f"Computing hashes for {len(image_paths)} images...")                # Compute hashes for all images        hashes = []        for path in tqdm(image_paths, desc="Computing hashes"):            hash_data = self.compute_hash(path)            if hash_data:                hashes.append(hash_data)                print(f"Finding duplicates among {len(hashes)} images...")        duplicates = defaultdict(list)        processed = set()                # Compare all pairs of images        for i in tqdm(range(len(hashes)), desc="Finding duplicates"):            if hashes[i]['path'] in processed:                continue                            for j in range(i + 1, len(hashes)):                if hashes[j]['path'] in processed:                    continue                                    similarity = self.compute_similarity(hashes[i], hashes[j])                                if similarity >= self.similarity_threshold:                    duplicates[hashes[i]['path']].append((hashes[j]['path'], similarity))                    processed.add(hashes[j]['path'])                return duplicates        def visualize_duplicates(self, duplicates: Dict, num_examples: int = 5):        """Visualize examples of duplicate images"""        examples = list(duplicates.items())[:num_examples]                if not examples:            print("No duplicates found!")            return                fig = plt.figure(figsize=(20, 4 * len(examples)))        gs = gridspec.GridSpec(len(examples), 5, width_ratios=[1, 1, 1, 1, 1])                for idx, (original, duplicate_list) in enumerate(examples):            # Show original image            ax_orig = plt.subplot(gs[idx, 0])            try:                img_orig = Image.open(original)                ax_orig.imshow(img_orig)                ax_orig.set_title(f"Original\n{Path(original).parent.name}", fontsize=10)                ax_orig.axis('off')            except:                ax_orig.text(0.5, 0.5, "Error loading image", ha='center', va='center')                ax_orig.axis('off')                        # Show up to 4 duplicates            for dup_idx, (dup_path, similarity) in enumerate(duplicate_list[:4]):                ax_dup = plt.subplot(gs[idx, dup_idx + 1])                try:                    img_dup = Image.open(dup_path)                    ax_dup.imshow(img_dup)                    ax_dup.set_title(f"Duplicate {dup_idx+1}\nSim: {similarity:.3f}\n{Path(dup_path).parent.name}",                                      fontsize=10)                    ax_dup.axis('off')                except:                    ax_dup.text(0.5, 0.5, "Error loading image", ha='center', va='center')                    ax_dup.axis('off')                plt.suptitle(f"Near-Duplicate Examples (Threshold: {self.similarity_threshold})", fontsize=16, y=1.02)        plt.tight_layout()        plt.show()

In [None]:
# ========================================# 6. ANALYZE DATASET AND FIND DUPLICATES# ========================================def analyze_dataset():    """Analyze the dataset structure and find duplicates"""        # Get all training images    train_images = glob(os.path.join(CFG.TRAIN_DIR, "*", "*"))    test_images = glob(os.path.join(CFG.TEST_DIR, "*"))        print(f"Found {len(train_images)} training images")    print(f"Found {len(test_images)} test images")        # Analyze class distribution    train_df = pd.DataFrame({        'path': train_images,        'label': [Path(p).parent.name for p in train_images],        'filename': [Path(p).name for p in train_images]    })        print("\n=== Class Distribution (Original) ===")    class_counts = train_df['label'].value_counts()    print(class_counts)        # Visualize class distribution    plt.figure(figsize=(10, 6))    class_counts.plot(kind='bar')    plt.title('Class Distribution (Before Duplicate Removal)')    plt.xlabel('Class')    plt.ylabel('Number of Images')    plt.xticks(rotation=45)    plt.tight_layout()    plt.show()        # Find duplicates within each class    detector = DuplicateDetector()    all_duplicates = {}        for class_name in train_df['label'].unique():        print(f"\n=== Finding duplicates in class: {class_name} ===")        class_images = train_df[train_df['label'] == class_name]['path'].tolist()        class_duplicates = detector.find_duplicates(class_images)                if class_duplicates:            all_duplicates.update(class_duplicates)            print(f"Found {len(class_duplicates)} images with duplicates in {class_name}")                        # Count total duplicates            total_dups = sum(len(dups) for dups in class_duplicates.values())            print(f"Total duplicate images to remove: {total_dups}")        # Visualize some duplicates    if all_duplicates:        print("\n=== Visualizing Duplicate Examples ===")        detector.visualize_duplicates(all_duplicates, num_examples=5)        return train_df, all_duplicates# Run analysistrain_df, duplicates = analyze_dataset()

In [None]:
# ========================================# 7. CREATE CLEAN DATASET# ========================================def create_clean_dataset(train_df: pd.DataFrame, duplicates: Dict):    """Create a clean dataset by removing duplicates"""        # Get all duplicate paths (images to remove)    duplicate_paths = set()    for dup_list in duplicates.values():        for dup_path, _ in dup_list:            duplicate_paths.add(dup_path)        print(f"Total images to remove: {len(duplicate_paths)}")        # Create clean dataframe    clean_df = train_df[~train_df['path'].isin(duplicate_paths)].copy()        print(f"Original dataset: {len(train_df)} images")    print(f"Clean dataset: {len(clean_df)} images")    print(f"Removed: {len(train_df) - len(clean_df)} images ({(len(train_df) - len(clean_df))/len(train_df)*100:.1f}%)")        # Show class distribution after cleaning    print("\n=== Class Distribution (After Cleaning) ===")    clean_class_counts = clean_df['label'].value_counts()    print(clean_class_counts)        # Compare before and after    comparison_df = pd.DataFrame({        'Original': train_df['label'].value_counts(),        'Clean': clean_df['label'].value_counts()    })    comparison_df['Removed'] = comparison_df['Original'] - comparison_df['Clean']    comparison_df['% Removed'] = (comparison_df['Removed'] / comparison_df['Original'] * 100).round(1)        print("\n=== Comparison Table ===")    print(comparison_df)        # Visualize comparison    fig, axes = plt.subplots(1, 2, figsize=(15, 6))        comparison_df[['Original', 'Clean']].plot(kind='bar', ax=axes[0])    axes[0].set_title('Class Distribution: Before vs After Cleaning')    axes[0].set_xlabel('Class')    axes[0].set_ylabel('Number of Images')    axes[0].legend(['Original', 'After Cleaning'])    axes[0].tick_params(axis='x', rotation=45)        comparison_df['% Removed'].plot(kind='bar', ax=axes[1], color='red', alpha=0.7)    axes[1].set_title('Percentage of Images Removed per Class')    axes[1].set_xlabel('Class')    axes[1].set_ylabel('% Removed')    axes[1].tick_params(axis='x', rotation=45)        plt.tight_layout()    plt.show()        return clean_df# Create clean datasetclean_df = create_clean_dataset(train_df, duplicates)

In [None]:
# ========================================# 8. COPY CLEAN FILES TO NEW DIRECTORY# ========================================def copy_clean_dataset(clean_df: pd.DataFrame):    """Copy clean files to a new directory structure"""        # Create clean directories    os.makedirs(CFG.CLEAN_TRAIN, exist_ok=True)    os.makedirs(CFG.CLEAN_TEST, exist_ok=True)        print("Copying clean training files...")    for _, row in tqdm(clean_df.iterrows(), total=len(clean_df)):        src = row['path']        class_dir = os.path.join(CFG.CLEAN_TRAIN, row['label'])        os.makedirs(class_dir, exist_ok=True)        dst = os.path.join(class_dir, row['filename'])                if not os.path.exists(dst):            shutil.copy2(src, dst)        # Copy test files (no changes needed)    print("Copying test files...")    test_files = glob(os.path.join(CFG.TEST_DIR, "*"))    for src in tqdm(test_files):        dst = os.path.join(CFG.CLEAN_TEST, os.path.basename(src))        if not os.path.exists(dst):            shutil.copy2(src, dst)        print(f"Clean dataset created in {CFG.CLEAN_ROOT}")# Uncomment to copy files# copy_clean_dataset(clean_df)

## Model Implementation with EVA-02

In [None]:
# ========================================# 9. DATA AUGMENTATION# ========================================def get_transforms(mode='train'):    """Get augmentation transforms"""        if mode == 'train':        return A.Compose([            A.RandomResizedCrop(CFG.IMG_SIZE, CFG.IMG_SIZE, scale=(0.7, 1.0)),            A.HorizontalFlip(p=0.5),            A.VerticalFlip(p=0.1),            A.RandomRotate90(p=0.5),            A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.15, rotate_limit=30, p=0.5),            A.OneOf([                A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1, p=1),                A.HueSaturationValue(hue_shift_limit=20, sat_shift_limit=30, val_shift_limit=20, p=1),                A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=1),            ], p=0.8),            A.OneOf([                A.GaussNoise(var_limit=(10, 50), p=1),                A.GaussianBlur(blur_limit=3, p=1),                A.MotionBlur(blur_limit=3, p=1),            ], p=0.3),            A.CoarseDropout(max_holes=8, max_height=32, max_width=32, fill_value=0, p=0.3),            A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),            ToTensorV2()        ])        elif mode == 'val':        return A.Compose([            A.Resize(CFG.IMG_SIZE, CFG.IMG_SIZE),            A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),            ToTensorV2()        ])        else:  # TTA        return A.Compose([            A.RandomResizedCrop(CFG.IMG_SIZE, CFG.IMG_SIZE, scale=(0.85, 1.0)),            A.HorizontalFlip(p=0.5),            A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),            ToTensorV2()        ])

In [None]:
# ========================================# 10. DATASET CLASS# ========================================class RumahAdatDataset(Dataset):    """Dataset class for Rumah Adat images"""        def __init__(self, df, transform=None, mode='train'):        self.df = df        self.transform = transform        self.mode = mode                # Create label mapping        self.labels = sorted(df['label'].unique())        self.label2idx = {label: idx for idx, label in enumerate(self.labels)}        self.idx2label = {idx: label for label, idx in self.label2idx.items()}            def __len__(self):        return len(self.df)        def __getitem__(self, idx):        row = self.df.iloc[idx]                # Load image        img_path = row['path'] if 'path' in row else row['image_path']        try:            img = cv2.imread(img_path)            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)        except:            # Handle corrupted images            img = np.zeros((CFG.IMG_SIZE, CFG.IMG_SIZE, 3), dtype=np.uint8)                # Apply transforms        if self.transform:            augmented = self.transform(image=img)            img = augmented['image']                if self.mode == 'train' or self.mode == 'val':            label = self.label2idx[row['label']]            return img, label        else:            return img

In [None]:
# ========================================# 11. MODEL ARCHITECTURE# ========================================class EVA02Model(nn.Module):    """EVA-02 based model for classification"""        def __init__(self, backbone_name=CFG.BACKBONE, num_classes=CFG.NUM_CLASSES,                  pretrained=True, dropout=0.3):        super().__init__()                # Load backbone        self.backbone = timm.create_model(            backbone_name,             pretrained=pretrained,             num_classes=0  # Remove classification head        )                # Get feature dimension        self.feat_dim = self.backbone.num_features                # Custom head with more capacity        self.head = nn.Sequential(            nn.Linear(self.feat_dim, self.feat_dim),            nn.ReLU(inplace=True),            nn.BatchNorm1d(self.feat_dim),            nn.Dropout(dropout),            nn.Linear(self.feat_dim, self.feat_dim // 2),            nn.ReLU(inplace=True),            nn.BatchNorm1d(self.feat_dim // 2),            nn.Dropout(dropout / 2),        )                # Final classifier        self.classifier = nn.Linear(self.feat_dim // 2, num_classes)                # Initialize weights        self._init_weights()            def _init_weights(self):        for m in self.head.modules():            if isinstance(m, nn.Linear):                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')                if m.bias is not None:                    nn.init.constant_(m.bias, 0)            elif isinstance(m, nn.BatchNorm1d):                nn.init.constant_(m.weight, 1)                nn.init.constant_(m.bias, 0)                # Initialize classifier        nn.init.normal_(self.classifier.weight, 0, 0.01)        nn.init.constant_(self.classifier.bias, 0)            def forward(self, x):        # Extract features        features = self.backbone(x)                # Pass through custom head        x = self.head(features)                # Classification        logits = self.classifier(x)                return logits

In [None]:
# ========================================# 12. LOSS FUNCTIONS# ========================================class FocalLoss(nn.Module):    """Focal Loss for handling class imbalance"""        def __init__(self, alpha=1, gamma=2, label_smoothing=0.0):        super().__init__()        self.alpha = alpha        self.gamma = gamma        self.label_smoothing = label_smoothing            def forward(self, inputs, targets):        # Apply label smoothing        if self.label_smoothing > 0:            n_classes = inputs.size(-1)            targets_smooth = torch.zeros_like(inputs).scatter_(                1, targets.unsqueeze(1), 1            )            targets_smooth = targets_smooth * (1 - self.label_smoothing) + \                           self.label_smoothing / n_classes                        # Cross entropy with smooth targets            ce_loss = -(targets_smooth * F.log_softmax(inputs, dim=-1)).sum(dim=-1)        else:            ce_loss = F.cross_entropy(inputs, targets, reduction='none')                # Focal loss        pt = torch.exp(-ce_loss)        focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss                return focal_loss.mean()

In [None]:
# ========================================# 13. MIXUP AUGMENTATION# ========================================def mixup_data(x, y, alpha=1.0):    """Apply mixup augmentation"""    if alpha > 0:        lam = np.random.beta(alpha, alpha)    else:        lam = 1        batch_size = x.size()[0]    index = torch.randperm(batch_size).to(x.device)        mixed_x = lam * x + (1 - lam) * x[index]    y_a, y_b = y, y[index]        return mixed_x, y_a, y_b, lamdef mixup_criterion(criterion, pred, y_a, y_b, lam):    """Mixup loss calculation"""    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

In [None]:
# ========================================# 14. TRAINING FUNCTIONS# ========================================def train_epoch(model, loader, criterion, optimizer, scaler, epoch, use_mixup=True):    """Train for one epoch"""    model.train()    running_loss = 0.0    predictions = []    targets = []        pbar = tqdm(loader, desc=f'Training Epoch {epoch}')    for batch_idx, (images, labels) in enumerate(pbar):        images = images.to(CFG.DEVICE)        labels = labels.to(CFG.DEVICE)                # Mixup augmentation        if use_mixup and np.random.random() > 0.5:            images, labels_a, labels_b, lam = mixup_data(                images, labels, alpha=CFG.MIXUP_ALPHA            )                        # Forward pass with mixed precision            with autocast():                outputs = model(images)                loss = mixup_criterion(criterion, outputs, labels_a, labels_b, lam)        else:            with autocast():                outputs = model(images)                loss = criterion(outputs, labels)                # Backward pass        optimizer.zero_grad()        scaler.scale(loss).backward()                # Gradient clipping        scaler.unscale_(optimizer)        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)                scaler.step(optimizer)        scaler.update()                # Update metrics        running_loss += loss.item()                if not use_mixup or np.random.random() > 0.5:            _, preds = torch.max(outputs, 1)            predictions.extend(preds.cpu().numpy())            targets.extend(labels.cpu().numpy())                # Update progress bar        pbar.set_postfix({'loss': running_loss / (batch_idx + 1)})        # Calculate metrics    epoch_loss = running_loss / len(loader)    if predictions:        epoch_f1 = f1_score(targets, predictions, average='macro')    else:        epoch_f1 = 0.0        return epoch_loss, epoch_f1def validate_epoch(model, loader, criterion):    """Validate for one epoch"""    model.eval()    running_loss = 0.0    predictions = []    targets = []        with torch.no_grad():        pbar = tqdm(loader, desc='Validation')        for images, labels in pbar:            images = images.to(CFG.DEVICE)            labels = labels.to(CFG.DEVICE)                        # Forward pass            outputs = model(images)            loss = criterion(outputs, labels)                        # Update metrics            running_loss += loss.item()            _, preds = torch.max(outputs, 1)                        predictions.extend(preds.cpu().numpy())            targets.extend(labels.cpu().numpy())                        pbar.set_postfix({'loss': running_loss / (len(pbar) + 1)})        # Calculate metrics    epoch_loss = running_loss / len(loader)    epoch_f1 = f1_score(targets, predictions, average='macro')        # Detailed classification report    print("\n" + "="*50)    print("Classification Report:")    print(classification_report(targets, predictions, target_names=loader.dataset.labels))        return epoch_loss, epoch_f1, predictions, targets

In [None]:
# ========================================# 15. PROGRESSIVE UNFREEZING# ========================================def unfreeze_layers(model, epoch, optimizer):    """Progressive unfreezing of model layers"""    newly_unfrozen = []        # Unfreeze head first    if epoch >= CFG.UNFREEZE_HEAD_EPOCH:        for name, param in model.head.named_parameters():            if not param.requires_grad:                param.requires_grad = True                newly_unfrozen.append(param)                for name, param in model.classifier.named_parameters():            if not param.requires_grad:                param.requires_grad = True                newly_unfrozen.append(param)                if newly_unfrozen:            optimizer.add_param_group({'params': newly_unfrozen, 'lr': CFG.HEAD_LR})            print(f"Unfroze {len(newly_unfrozen)} head parameters")        # Unfreeze backbone    if epoch >= CFG.UNFREEZE_FULL_EPOCH:        newly_unfrozen = []        for name, param in model.backbone.named_parameters():            if not param.requires_grad:                param.requires_grad = True                newly_unfrozen.append(param)                if newly_unfrozen:            optimizer.add_param_group({'params': newly_unfrozen, 'lr': CFG.BODY_LR})            print(f"Unfroze {len(newly_unfrozen)} backbone parameters")

In [None]:
# ========================================# 16. MAIN TRAINING LOOP# ========================================def train_fold(fold, train_idx, val_idx):    """Train a single fold"""        print(f"\n{'='*50}")    print(f"Training Fold {fold + 1}/{CFG.N_SPLITS}")    print(f"{'='*50}")        # Prepare data    train_data = clean_df.iloc[train_idx].reset_index(drop=True)    val_data = clean_df.iloc[val_idx].reset_index(drop=True)        print(f"Train samples: {len(train_data)}")    print(f"Val samples: {len(val_data)}")        # Create datasets    train_dataset = RumahAdatDataset(train_data, transform=get_transforms('train'), mode='train')    val_dataset = RumahAdatDataset(val_data, transform=get_transforms('val'), mode='val')        # Calculate class weights for balanced sampling    class_counts = train_data['label'].value_counts()    class_weights = {label: 1.0 / count for label, count in class_counts.items()}    sample_weights = [class_weights[label] for label in train_data['label']]    sampler = WeightedRandomSampler(sample_weights, len(sample_weights))        # Create dataloaders    train_loader = DataLoader(        train_dataset,         batch_size=CFG.BATCH_SIZE,        sampler=sampler,        num_workers=CFG.NUM_WORKERS,        pin_memory=True    )        val_loader = DataLoader(        val_dataset,        batch_size=CFG.BATCH_SIZE * 2,        shuffle=False,        num_workers=CFG.NUM_WORKERS,        pin_memory=True    )        # Initialize model    model = EVA02Model().to(CFG.DEVICE)        # Freeze backbone initially    for param in model.backbone.parameters():        param.requires_grad = False        # Loss and optimizer    criterion = FocalLoss(gamma=2.0, label_smoothing=CFG.LABEL_SMOOTHING)    optimizer = torch.optim.AdamW(        filter(lambda p: p.requires_grad, model.parameters()),        lr=CFG.HEAD_LR,        weight_decay=CFG.WEIGHT_DECAY    )        # Learning rate scheduler    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(        optimizer, T_max=CFG.EPOCHS, eta_min=CFG.MIN_LR    )        # Mixed precision scaler    scaler = GradScaler()        # Training history    history = {        'train_loss': [], 'train_f1': [],        'val_loss': [], 'val_f1': []    }        best_f1 = 0    patience_counter = 0        # Training loop    for epoch in range(1, CFG.EPOCHS + 1):        # Progressive unfreezing        unfreeze_layers(model, epoch, optimizer)                # Train        train_loss, train_f1 = train_epoch(            model, train_loader, criterion, optimizer, scaler, epoch,            use_mixup=(epoch >= 5)  # Start mixup after initial training        )                # Validate        val_loss, val_f1, _, _ = validate_epoch(model, val_loader, criterion)                # Update scheduler        scheduler.step()                # Save history        history['train_loss'].append(train_loss)        history['train_f1'].append(train_f1)        history['val_loss'].append(val_loss)        history['val_f1'].append(val_f1)                # Print metrics        print(f"\nEpoch {epoch}/{CFG.EPOCHS}")        print(f"Train Loss: {train_loss:.4f}, Train F1: {train_f1:.4f}")        print(f"Val Loss: {val_loss:.4f}, Val F1: {val_f1:.4f}")        print(f"LR: {optimizer.param_groups[0]['lr']:.6f}")                # Save best model        if val_f1 > best_f1:            best_f1 = val_f1            patience_counter = 0            torch.save(                model.state_dict(),                f"best_model_fold{fold+1}.pth"            )            print(f"✓ Saved best model (F1: {best_f1:.4f})")        else:            patience_counter += 1                    # Early stopping        if patience_counter >= CFG.PATIENCE:            print(f"Early stopping at epoch {epoch}")            break        return history, best_f1

In [None]:
# ========================================# 17. K-FOLD CROSS VALIDATION# ========================================def run_training():    """Run k-fold cross validation training"""        # Prepare data for k-fold    skf = StratifiedKFold(n_splits=CFG.N_SPLITS, shuffle=True, random_state=CFG.SEED)        all_histories = []    fold_scores = []        # Train each fold    for fold, (train_idx, val_idx) in enumerate(skf.split(clean_df, clean_df['label'])):        history, best_f1 = train_fold(fold, train_idx, val_idx)        all_histories.append(history)        fold_scores.append(best_f1)                # Clean up memory        gc.collect()        torch.cuda.empty_cache()        # Print final results    print("\n" + "="*50)    print("FINAL RESULTS")    print("="*50)    for i, score in enumerate(fold_scores):        print(f"Fold {i+1}: F1 = {score:.4f}")    print(f"Mean F1: {np.mean(fold_scores):.4f} ± {np.std(fold_scores):.4f}")        return all_histories, fold_scores# Uncomment to run training# histories, scores = run_training()

In [None]:
# ========================================# 18. INFERENCE WITH TTA# ========================================def inference_with_tta(num_tta=5):    """Run inference on test set with Test Time Augmentation"""        # Load test images    test_images = glob(os.path.join(CFG.CLEAN_TEST, "*"))    test_images.sort()        print(f"Found {len(test_images)} test images")        # Prepare for predictions    all_predictions = []        # Load each fold model    for fold in range(CFG.N_SPLITS):        print(f"\nPredicting with Fold {fold+1} model...")                # Load model        model = EVA02Model().to(CFG.DEVICE)        model.load_state_dict(torch.load(f"best_model_fold{fold+1}.pth", map_location=CFG.DEVICE))        model.eval()                fold_predictions = []                # TTA predictions        for tta_idx in range(num_tta):            tta_transform = get_transforms('tta')            current_preds = []                        with torch.no_grad():                for img_path in tqdm(test_images, desc=f"TTA {tta_idx+1}/{num_tta}"):                    # Load and transform image                    img = cv2.imread(img_path)                    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)                                        augmented = tta_transform(image=img)                    img_tensor = augmented['image'].unsqueeze(0).to(CFG.DEVICE)                                        # Predict                    logits = model(img_tensor)                    probs = F.softmax(logits, dim=1)                    current_preds.append(probs.cpu().numpy())                        fold_predictions.append(np.vstack(current_preds))                # Average TTA predictions        fold_avg = np.mean(fold_predictions, axis=0)        all_predictions.append(fold_avg)        # Average all fold predictions    final_predictions = np.mean(all_predictions, axis=0)    final_classes = np.argmax(final_predictions, axis=1)        # Create submission    submission = pd.DataFrame({        'image': [os.path.basename(p) for p in test_images],        'label': final_classes    })        # Map back to label names    dataset = RumahAdatDataset(clean_df, transform=None)    submission['label'] = submission['label'].map(dataset.idx2label)        # Save submission    submission.to_csv('submission.csv', index=False)    print(f"\nSubmission saved to submission.csv")    print(submission['label'].value_counts())        return submission# Uncomment to run inference# submission = inference_with_tta(num_tta=5)

## Summary and Next StepsThis notebook implements a complete pipeline for the DSC Rumah Adat classification competition with the following key features:1. **Near-Duplicate Detection**: Uses perceptual hashing to find and remove duplicate images2. **Data Cleaning**: Removes duplicates to create a cleaner training dataset  3. **Visualization**: Shows duplicate examples and class distribution analysis4. **EVA-02 Model**: State-of-the-art Vision Transformer architecture5. **Advanced Training**: Progressive unfreezing, mixup augmentation, focal loss6. **K-Fold Cross Validation**: Robust evaluation with 5-fold CV7. **Test Time Augmentation**: Improved predictions through TTA### Potential Improvements:- Experiment with different similarity thresholds for duplicate detection- Try other Vision Transformer models (DeiT, Swin, etc.)- Add more aggressive augmentations for underrepresented classes- Implement pseudo-labeling with high-confidence test predictions- Use ensemble methods with different architecturesGood luck with your competition!