In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
!pip install torch torchvision transformers pillow pandas tqdm scikit-learn umap-learn plotly

In [None]:
import os
import torch
import pandas as pd
import numpy as np
from pathlib import Path
from PIL import Image
from tqdm import tqdm
from transformers import ViTImageProcessor, ViTModel
from sklearn.manifold import TSNE
from torchvision import transforms
import umap
import plotly.express as px
import warnings
warnings.filterwarnings('ignore')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Config

In [None]:
TRAIN_DIR = "/kaggle/input/suku-indonesia-new/Dataset_Final_Split/Train"
TEST_DIR = "/kaggle/input/suku-indonesia-new/Dataset_Final_Split/Test"
TRAIN_OUTPUT = "train_embeddings.csv"
TEST_OUTPUT = "test_embeddings.csv"

MODEL_NAME = "google/vit-base-patch16-224-in21k"
EMBEDDING_DIM = 768

# Load VIT Pre-Trained

In [None]:
processor = ViTImageProcessor.from_pretrained(MODEL_NAME)
model = ViTModel.from_pretrained(MODEL_NAME)

if torch.cuda.device_count() > 1:
    model = torch.nn.DataParallel(model)
    
model = model.to(device)
model.eval()

# Hyper Paarameter Tuning

In [None]:
import optuna
from torch.utils.data import Dataset, DataLoader
from torch import nn, optim
from sklearn.model_selection import train_test_split

class ImageDataset(Dataset):
    def __init__(self, image_paths, labels, processor, augment=False):
        self.image_paths = image_paths
        self.labels = labels
        self.processor = processor
        self.augment = augment

        # label mapping
        self.label_to_idx = {label: idx for idx, label in enumerate(sorted(set(labels)))}

        # AUGMENTATION 
        self.transform = transforms.Compose([
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.RandomRotation(10),
            transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
        ])

    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert('RGB')
        if self.augment:
            image = self.transform(image)

        # resize after augmentasi
        image = image.resize((224, 224), Image.Resampling.LANCZOS)

        inputs = self.processor(images=image, return_tensors="pt")
        pixel_values = inputs['pixel_values'].squeeze(0)
        label = self.label_to_idx[self.labels[idx]]
        return pixel_values, label

def prepare_data(train_dir):
    train_path = Path(train_dir)
    image_paths = []
    labels = []
    
    for class_folder in sorted(train_path.iterdir()):
        if class_folder.is_dir():
            class_name = class_folder.name
            for img_file in class_folder.glob("*"):
                if img_file.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp']:
                    image_paths.append(str(img_file))
                    labels.append(class_name)
    
    return train_test_split(image_paths, labels, test_size=0.2, random_state=42, stratify=labels)

def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, device):
    best_val_acc = 0.0
    
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0
        
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(pixel_values=inputs).logits
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            _, predicted = outputs.max(1)
            train_total += labels.size(0)
            train_correct += predicted.eq(labels).sum().item()
        
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(pixel_values=inputs).logits
                loss = criterion(outputs, labels)
                
                val_loss += loss.item()
                _, predicted = outputs.max(1)
                val_total += labels.size(0)
                val_correct += predicted.eq(labels).sum().item()
        
        train_acc = 100. * train_correct / train_total
        val_acc = 100. * val_correct / val_total
        
        if val_acc > best_val_acc:
            best_val_acc = val_acc
        
        print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {train_loss/len(train_loader):.4f}, Train Acc: {train_acc:.2f}%, Val Loss: {val_loss/len(val_loader):.4f}, Val Acc: {val_acc:.2f}%")
    
    return best_val_acc

def objective(trial):
    lr = trial.suggest_loguniform('lr', 1e-5, 1e-3)
    batch_size = trial.suggest_categorical('batch_size', [8, 16, 32])
    num_epochs = trial.suggest_int('num_epochs', 3, 10)
    
    train_imgs, val_imgs, train_lbls, val_lbls = prepare_data(TRAIN_DIR)
    
    processor_temp = ViTImageProcessor.from_pretrained(MODEL_NAME)
    train_dataset = ImageDataset(train_imgs, train_lbls, processor_temp)
    val_dataset = ImageDataset(val_imgs, val_lbls, processor_temp)
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2)
    
    num_labels = len(train_dataset.label_to_idx)
    model = ViTModel.from_pretrained(MODEL_NAME)
    model.config.num_labels = num_labels
    
    from transformers import ViTForImageClassification
    model = ViTForImageClassification.from_pretrained(MODEL_NAME, num_labels=num_labels, ignore_mismatched_sizes=True)
    
    if torch.cuda.device_count() > 1:
        model = torch.nn.DataParallel(model)
    
    model = model.to(device)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=lr)
    
    best_val_acc = train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, device)
    
    return best_val_acc

print("Starting Optuna hyperparameter tuning...")
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=10)

print(f"\nBest trial: {study.best_trial.number}")
print(f"Best validation accuracy: {study.best_value:.2f}%")
print(f"Best hyperparameters: {study.best_params}")

best_params = study.best_params
train_imgs, val_imgs, train_lbls, val_lbls = prepare_data(TRAIN_DIR)

processor = ViTImageProcessor.from_pretrained(MODEL_NAME)
train_dataset = ImageDataset(train_imgs, train_lbls, processor)
val_dataset = ImageDataset(val_imgs, val_lbls, processor)

train_loader = DataLoader(train_dataset, batch_size=best_params['batch_size'], shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=best_params['batch_size'], shuffle=False, num_workers=2)

num_labels = len(train_dataset.label_to_idx)

from transformers import ViTForImageClassification
model = ViTForImageClassification.from_pretrained(MODEL_NAME, num_labels=num_labels, ignore_mismatched_sizes=True)

if torch.cuda.device_count() > 1:
    model = torch.nn.DataParallel(model)

model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=best_params['lr'])

print("\nTraining final model with best hyperparameters...")
final_acc = train_model(model, train_loader, val_loader, criterion, optimizer, best_params['num_epochs'], device)

torch.save(model.state_dict(), 'vit_finetuned.pth')
print(f"\nFinal model saved with validation accuracy: {final_acc:.2f}%")

# Load FineTune

In [None]:
from transformers import ViTForImageClassification

processor = ViTImageProcessor.from_pretrained(MODEL_NAME)

train_imgs, val_imgs, train_lbls, val_lbls = prepare_data(TRAIN_DIR)
train_dataset = ImageDataset(train_imgs, train_lbls, processor)
num_labels = len(train_dataset.label_to_idx)

model = ViTForImageClassification.from_pretrained(MODEL_NAME, num_labels=num_labels, ignore_mismatched_sizes=True)

state_dict = torch.load('vit_finetuned.pth')
if list(state_dict.keys())[0].startswith('module.'):
    state_dict = {k.replace('module.', ''): v for k, v in state_dict.items()}
model.load_state_dict(state_dict)

if torch.cuda.device_count() > 1:
    model = torch.nn.DataParallel(model)
    
model = model.to(device)
model.eval()

print("Fine-tuned model loaded successfully")

# Feature Extraction

In [None]:
def preprocess_image(image_path, target_size=(224, 224)):
    """
    Preprocess image: convert to RGB, resize, and normalize.
    
    Args:
        image_path: Path to the image file
        target_size: Target size tuple (width, height)
        
    Returns:
        processed_image: PIL Image object ready for ViT processing
    """
    try:
        # Load image
        image = Image.open(image_path)
        
        # Convert to RGB (handles RGBA, grayscale, etc.)
        if image.mode != 'RGB':
            image = image.convert('RGB')
            
        # Resize to 224x224
        image = image.resize(target_size, Image.Resampling.LANCZOS)
        
        # Convert to numpy array for normalization
        img_array = np.array(image, dtype=np.float32)
        
        # Normalize pixel values from [0, 255] to ImageNet standards
        # ImageNet mean: [0.485, 0.456, 0.406]
        # ImageNet std: [0.229, 0.224, 0.225]
        mean = np.array([0.485, 0.456, 0.406]) * 255
        std = np.array([0.229, 0.224, 0.225]) * 255
        
        # Apply normalization: (pixel - mean) / std
        img_array = (img_array - mean) / std
        
        # Convert back to PIL Image (ViTImageProcessor expects PIL Image)
        # Scale back to 0-255 range for PIL
        img_normalized = ((img_array * std + mean)).clip(0, 255).astype(np.uint8)
        processed_image = Image.fromarray(img_normalized)
        
        return processed_image
        
    except Exception as e:
        return None

# Update the extract_embedding function to use preprocessing
def extract_embedding_with_preprocessing(image_path):
    try:
        processed_image = preprocess_image(image_path)
        
        if processed_image is None:
            return None
            
        inputs = processor(images=processed_image, return_tensors="pt")
        inputs = {k: v.to(device) for k, v in inputs.items()}
        
        with torch.no_grad():
            if hasattr(model, 'module'):
                base_model = model.module.vit
            else:
                base_model = model.vit
            outputs = base_model(**inputs)
            embedding = outputs.last_hidden_state[:, 0, :].cpu().numpy().flatten()
        
        return embedding
    
    except Exception as e:
        print(f"Error processing {image_path}: {e}")
        return None

# Proces Train Set

In [None]:
def process_train_dataset(train_dir, output_file):
    """
    Process train dataset and extract embeddings.
    
    Args:
        train_dir: Path to train directory (contains class folders)
        output_file: Path to save CSV file
    """
    # Collect all image paths and labels
    image_paths = []
    labels = []
    ids = []
    
    train_path = Path(train_dir)
    
    for class_folder in sorted(train_path.iterdir()):
        if class_folder.is_dir():
            class_name = class_folder.name
            
            for img_file in class_folder.glob("*"):
                if img_file.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp']:
                    image_paths.append(str(img_file))
                    labels.append(class_name)
                    ids.append(img_file.stem)
    
    # Extract embeddings
    embeddings = []
    valid_ids = []
    valid_labels = []
    
    for img_path, img_id, label in tqdm(zip(image_paths, ids, labels), total=len(image_paths)):
        embedding = extract_embedding_with_preprocessing(img_path)
        if embedding is not None:
            embeddings.append(embedding)
            valid_ids.append(img_id)
            valid_labels.append(label)
    
    # Create DataFrame
    print(f"\nCreating DataFrame...")
    
    # Create column names for embeddings
    embed_cols = [f'embed_{i}' for i in range(EMBEDDING_DIM)]
    
    # Create DataFrame
    df = pd.DataFrame(embeddings, columns=embed_cols)
    embed_cols = [f'embed_{i}' for i in range(EMBEDDING_DIM)]
    
    df = pd.DataFrame(embeddings, columns=embed_cols)
    df.insert(0, 'id', valid_ids)
    df.insert(1, 'label', valid_labels)
    
    df.to_csv(output_file, index=False)
    
    return df

# Process train dataset
train_df = process_train_dataset(TRAIN_DIR, TRAIN_OUTPUT)

# Process Test Set

In [None]:
def process_test_dataset(test_dir, output_file):
    """
    Process test dataset and extract embeddings.
    
    Args:
        test_dir: Path to test directory (flat structure, no subfolders)
        output_file: Path to save CSV file
    """
    
    # Collect all image paths
    test_path = Path(test_dir)
    image_paths = []
    ids = []
    
    for img_file in sorted(test_path.glob("*")):
        if img_file.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp']:
            image_paths.append(str(img_file))
            ids.append(img_file.stem)
    
    # Extract embeddings
    embeddings = []
    valid_ids = []
    
    for img_path, img_id in tqdm(zip(image_paths, ids), total=len(image_paths)):
        embedding = extract_embedding_with_preprocessing(img_path)
        if embedding is not None:
            embeddings.append(embedding)
            valid_ids.append(img_id)
    
    # Create DataFrame
    print(f"\nCreating DataFrame...")
    
    # Create column names for embeddings
    embed_cols = [f'embed_{i}' for i in range(EMBEDDING_DIM)]
    
    df = pd.DataFrame(embeddings, columns=embed_cols)
    df.insert(0, 'id', valid_ids)
    
    df.to_csv(output_file, index=False)
    
    return df

# Process test dataset
test_df = process_test_dataset(TEST_DIR, TEST_OUTPUT)

# TSNE

In [None]:
df = pd.read_csv("train_embeddings.csv")
print(df.shape)


In [None]:
train_data = pd.read_csv(TRAIN_OUTPUT)

X = train_data.iloc[:, 2:].values
labels = train_data['label'].values

tsne = TSNE(n_components=3, random_state=42, perplexity=30, max_iter=1000, verbose=0)
X_tsne = tsne.fit_transform(X)

vis_df = pd.DataFrame({
    'x': X_tsne[:, 0],
    'y': X_tsne[:, 1],
    'z': X_tsne[:, 2],
    'label': labels,
    'id': train_data['id'].values
})

# Calculate centroids for each class
centroids = []
unique_labels = sorted(vis_df['label'].unique())

for label in unique_labels:
    class_data = vis_df[vis_df['label'] == label]
    centroid = {
        'x': class_data['x'].mean(),
        'y': class_data['y'].mean(),
        'z': class_data['z'].mean(),
        'label': label,
        'type': 'Centroid',
        'id': f'{label}_centroid'
    }
    centroids.append(centroid)

centroid_df = pd.DataFrame(centroids)

# Combine data points and centroids
vis_df['type'] = 'Data Point'
combined_df = pd.concat([vis_df, centroid_df], ignore_index=True)

# Calculate inter-class distances
print("\nInter-class centroid distances:")
for i, label1 in enumerate(unique_labels):
    for label2 in unique_labels[i+1:]:
        c1 = centroid_df[centroid_df['label'] == label1]
        c2 = centroid_df[centroid_df['label'] == label2]
        dist = np.sqrt(
            (c1['x'].values[0] - c2['x'].values[0])**2 +
            (c1['y'].values[0] - c2['y'].values[0])**2 +
            (c1['z'].values[0] - c2['z'].values[0])**2
        )
        print(f"  {label1} <-> {label2}: {dist:.2f}")

# Create visualization with centroids
fig = px.scatter_3d(
    combined_df,
    x='x', y='y', z='z',
    color='label',
    symbol='type',
    hover_data=['id'],
    title='Train Embeddings with Centroids (t-SNE 3D)',
    labels={'x': 't-SNE 1', 'y': 't-SNE 2', 'z': 't-SNE 3'},
    opacity=0.6,
    height=700
)

# Update marker sizes - larger points for better visibility
fig.update_traces(
    marker=dict(size=7, opacity=0.6),
    selector=dict(mode='markers', name=lambda x: 'Data Point' in str(x))
)
fig.update_traces(
    marker=dict(
        size=35,
        symbol='diamond',
        line=dict(width=4, color='white'),
        opacity=1.0
    ),
    selector=dict(mode='markers', name=lambda x: 'Centroid' in str(x))
)

# Add buttons to toggle classes
buttons = [{'label': 'All', 'method': 'update', 'args': [{'visible': [True] * len(fig.data)}]}]

for label in unique_labels:
    visible = []
    for trace in fig.data:
        if label in trace.name:
            visible.append(True)
        else:
            visible.append(False)
    
    buttons.append({
        'label': label,
        'method': 'update',
        'args': [{'visible': visible}]
    })

fig.update_layout(
    updatemenus=[{
        'buttons': buttons,
        'direction': 'down',
        'showactive': True,
        'x': 0.02,
        'y': 0.98,
        'xanchor': 'left',
        'yanchor': 'top'
    }]
)

fig.show()

# LDA

In [None]:
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis

# Apply LDA to reduce to 3 dimensions directly for visualization
lda_3d = LinearDiscriminantAnalysis(n_components=3)
X_lda_only = lda_3d.fit_transform(X, labels)

print(f"LDA reduced dimensions: {X.shape[1]} -> {X_lda_only.shape[1]}")
print(f"Explained variance ratio: {lda_3d.explained_variance_ratio_}")
print(f"Total explained variance: {lda_3d.explained_variance_ratio_.sum():.4f}")

vis_df_lda_only = pd.DataFrame({
    'x': X_lda_only[:, 0],
    'y': X_lda_only[:, 1],
    'z': X_lda_only[:, 2],
    'label': labels,
    'id': train_data['id'].values
})

# Calculate centroids for LDA only
centroids_lda_only = []
for label in unique_labels:
    class_data = vis_df_lda_only[vis_df_lda_only['label'] == label]
    centroid = {
        'x': class_data['x'].mean(),
        'y': class_data['y'].mean(),
        'z': class_data['z'].mean(),
        'label': label,
        'type': 'Centroid',
        'id': f'{label}_centroid'
    }
    centroids_lda_only.append(centroid)

centroid_df_lda_only = pd.DataFrame(centroids_lda_only)

# Combine data and centroids
vis_df_lda_only['type'] = 'Data Point'
combined_df_lda_only = pd.concat([vis_df_lda_only, centroid_df_lda_only], ignore_index=True)

# Calculate inter-class distances for LDA only
print("\nLDA inter-class centroid distances:")
for i, label1 in enumerate(unique_labels):
    for label2 in unique_labels[i+1:]:
        c1 = centroid_df_lda_only[centroid_df_lda_only['label'] == label1]
        c2 = centroid_df_lda_only[centroid_df_lda_only['label'] == label2]
        dist = np.sqrt(
            (c1['x'].values[0] - c2['x'].values[0])**2 +
            (c1['y'].values[0] - c2['y'].values[0])**2 +
            (c1['z'].values[0] - c2['z'].values[0])**2
        )
        print(f"  {label1} <-> {label2}: {dist:.2f}")

fig_lda_only = px.scatter_3d(
    combined_df_lda_only,
    x='x', y='y', z='z',
    color='label',
    symbol='type',
    hover_data=['id'],
    title='Train Embeddings with Centroids (LDA 3D)',
    labels={'x': 'LDA 1', 'y': 'LDA 2', 'z': 'LDA 3'},
    opacity=0.6,
    height=700
)

fig_lda_only.update_traces(
    marker=dict(size=7, opacity=0.6),
    selector=dict(mode='markers', name=lambda x: 'Data Point' in str(x))
)
fig_lda_only.update_traces(
    marker=dict(
        size=35,
        symbol='diamond',
        line=dict(width=4, color='white'),
        opacity=1.0
    ),
    selector=dict(mode='markers', name=lambda x: 'Centroid' in str(x))
)

# Add buttons to toggle classes
buttons_lda_only = [{'label': 'All', 'method': 'update', 'args': [{'visible': [True] * len(fig_lda_only.data)}]}]

for label in unique_labels:
    visible = []
    for trace in fig_lda_only.data:
        if label in trace.name:
            visible.append(True)
        else:
            visible.append(False)
    
    buttons_lda_only.append({
        'label': label,
        'method': 'update',
        'args': [{'visible': visible}]
    })

fig_lda_only.update_layout(
    updatemenus=[{
        'buttons': buttons_lda_only,
        'direction': 'down',
        'showactive': True,
        'x': 0.02,
        'y': 0.98,
        'xanchor': 'left',
        'yanchor': 'top'
    }]
)

fig_lda_only.show()