In [None]:
from google.colab import files
files.upload()


In [None]:
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json


In [None]:
!kaggle competitions download -c sheep-classification-challenge-2025

In [None]:
## Data Loading & Understanding
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import glob
import os
from tqdm import tqdm
import seaborn as sns

## Data EDA & Preprocessing
from tqdm import tqdm
from PIL import Image
import cv2
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.preprocessing import LabelEncoder
from skimage.feature import graycomatrix, graycoprops
from skimage import io, color, feature, exposure
from skimage.util import img_as_ubyte
from sklearn.cluster import KMeans
from sklearn.ensemble import RandomForestClassifier


##  Modelling
import timm
import torchvision.models as models
from torchvision.datasets import ImageFolder
from torchvision import datasets, transforms
import torch.nn as nn
import torch.optim as optim
import torch

## Evaluation
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix



In [None]:
!mkdir -p model
!mkdir -p pretrained
!mkdir -p dataset

In [None]:
##################### Saving Prereained Models ######################

import torch
import timm
from torch import nn
from torch import optim
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim

models_to_save = [
    'efficientnet_b0',
    'mobilenetv3_large_100',
    'resnet18',
    'convnext_tiny',
    'swin_tiny_patch4_window7_224',
    'vit_base_patch16_224',
    'convnext_base',
    "densenet121",
    "efficientnetv2_s"
]

for name in models_to_save:
    print(f"ðŸ”¹ Downloading & saving {name}...")
    model = timm.create_model(name, pretrained=True)
    torch.save(model.state_dict(), f"pretrained/{name}_pretrained.pth")

print("âœ… All models saved locally!")




In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

device

In [None]:
!unzip sheep-classification-challenge-2025 -d dataset

In [None]:
################ Load Data
MODE = "folder"  # "folder" atau "csv"
IMAGE_DIR = "/content/dataset/Sheep Classification Images/train"
train_csv = "/content/dataset/Sheep Classification Images/train_labels.csv"

img="filename"
label="label"

def load_dataset(image_path="filename", labels="genus", data_dir=None, csv_path=None):
    df = pd.read_csv(csv_path)
    assert all(c in df.columns for c in [image_path, labels])

    df = df.sample(frac=1).reset_index(drop=True)
    print(f"Terdapat {len(df)} data citra dan {df[labels].nunique()} label")
    return df

df_train = load_dataset(csv_path=train_csv, labels="label")


In [None]:
def show_samples(df, data_dir , n_per_class=3, image_paths = "filename", labels = "genus"):
    classes = df[labels].unique()
    for cls in classes:
        subset = df[df[labels] == cls].sample(min(n_per_class, len(df[df[labels] == cls])))
        plt.figure(figsize = (n_per_class * 2, 2))
        for i, (_, row) in enumerate (subset.iterrows()):
            if data_dir is not None:
                img = Image.open(os.path.join(data_dir, row[image_paths]))
            else:
                img = Image.open(row[image_paths])
            plt.subplot(1, n_per_class, i + 1)
            plt.imshow(img)
            plt.axis('off')
            plt.title(cls)
        plt.tight_layout
        plt.show



In [None]:
show_samples(df_train, data_dir = IMAGE_DIR, labels = label)

In [None]:
print(df_train.head())
print("\n Class Distribution")
print(df_train['label'].value_counts())
df_train['label'].nunique()

In [None]:
width, heights, ratio = [], [], []

for path in tqdm(df_train["filename"], desc = "Analyzing image sizes"):
    try:
        with Image.open(os.path.join(IMAGE_DIR, path)) as img:
            w, h = img.size
            width.append(w)
            heights.append(h)
            ratio.append(w / h)
    except:
        print(f"Image {os.path.join(IMAGE_DIR, path)} cant be opened adding null data")
        width.append(None)
        heights.append(None)
        ratio.append(None)
df_train["width"] = width
df_train["heights"] = heights
df_train["ratio"] = ratio

df_train[['width', 'heights', 'ratio']].head()

In [None]:
df_train[['width', 'heights', 'ratio']].describe()

In [None]:
def image_stats(df, sample_size = 200, image_col="filename", data_dir = None, per_class = False, label_col = 'genus'):
    sample_df = df.sample(min(sample_size, len(df))).reset_index(drop=True)
    mean_rgb, std_rgb = [], []
    per_class_stats = {}

    for _, row in tqdm (sample_df.iterrows(), total = len(sample_df), desc="computing RGB stats"):
        path = row[image_col]
        label = row[label_col] if label_col in row else "unknown"
        if data_dir is not None:
            path = os.path.join(data_dir, path)
        img = cv2.imread(path)
        if img is None:
            continue
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) / 255.0

        mean = img.mean(axis=(0, 1))
        std = img.std(axis=(0, 1))

        mean_rgb.append(mean)
        std_rgb.append(std)

        if per_class:
            if label not in per_class_stats:
                per_class_stats[label] = {"mean": [], "std": []}
            per_class_stats[label]["mean"].append(mean)
            per_class_stats[label]["std"].append(std)

    mean_rgb = np.mean(mean_rgb, axis = 0)
    std_rgb = np.mean(std_rgb, axis = 0)

    if per_class:
        for cls in per_class_stats:
            per_class_stats[cls]["mean"] = np.mean(per_class_stats[cls]["mean"], axis = 0)
            per_class_stats[cls]["std"] = np.mean(per_class_stats[cls]["std"], axis = 0)
        return mean_rgb, std_rgb, per_class_stats
    else:
        return mean_rgb, std_rgb

In [None]:
mean_rgb, std_rgb, per_class_stats = image_stats(df_train, data_dir = IMAGE_DIR, per_class = True, label_col=label)

print(f"Global mean RGB: {mean_rgb}")
print(f"Global std RGB: {std_rgb}")
for cls, stats in per_class_stats.items():
    print(f"{cls}: Mean {stats["mean"]}, Std {stats["std"]}")

In [None]:
##### A. Brightness & Kontras###
def brightness_contrast(img):
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    brightness = np.mean(gray)
    contrast = np.std(gray)
    return brightness, contrast

######## Blur / Focus Level (Variance of Laplacian)####

def blur_score(img):
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    return cv2.Laplacian(gray, cv2.CV_64F).var()

########C. Edge Density (Canny Edge Detector)####

def edge_density(img):
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    edges = cv2.Canny(gray, 100, 200)
    return np.sum(edges > 0) / edges.size


######## D. Entropy (skimage)####
from skimage.measure import shannon_entropy
def image_entropy(img):
    gray = color.rgb2gray(img)
    return shannon_entropy(gray)

##### E. GLCM (Gray Level Co-occurrence Matrix) Texture Contrast######

def glcm_contrast(img):
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    gray = img_as_ubyte(gray)
    glcm = graycomatrix(gray, distances=[1], angles=[0], symmetric=True, normed=True)
    return graycoprops(glcm, 'contrast')[0,0]


######## F. Dominant Color (KMeans)####

def dominant_color(img, k=3):
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img_flat = img_rgb.reshape(-1, 3)
    km = KMeans(n_clusters=k, n_init=10)
    km.fit(img_flat)
    return km.cluster_centers_[np.argmax(np.bincount(km.labels_))]

In [None]:

######## Calculate per Image ####
features = []
for i, row in tqdm(df_train.iterrows(), total=200):
    img = cv2.imread(os.path.join(IMAGE_DIR, row['filename']))
    if img is None:
        continue
    img = cv2.resize(img, (224,224))
    bright, contrast = brightness_contrast(img)
    blur = blur_score(img)
    edge = edge_density(img)
    entropy = image_entropy(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
    glcm_c = glcm_contrast(img)
    dom_col = dominant_color(img)

    features.append({
        'label': row['label'],
        'brightness': bright,
        'contrast': contrast,
        'blur': blur,
        'edges': edge,
        'entropy': entropy,
        'texture_contrast': glcm_c,
        'dom_R': dom_col[0],
        'dom_G': dom_col[1],
        'dom_B': dom_col[2]
    })

eda_df = pd.DataFrame(features)
eda_df.head()


In [None]:
eda_df.describe()

In [None]:
desc = eda_df.groupby('label').describe()
desc.columns = ['_'.join(col) for col in desc.columns]
desc

In [None]:
le_eda = LabelEncoder()
eda_df["label"] = le.fit_transform(eda_df["label"])
corr = eda_df.corr()
sns.heatmap(corr, annot=True, cmap='coolwarm')


In [None]:
x = eda_df.drop(columns=[label])
y = eda_df[label]

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42)

rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(x_train, y_train)

importances = rf.feature_importances_
feature_importances = pd.DataFrame({
    'Feature': x.columns,
    'Importance': importances
}).sort_values(by='Importance', ascending=False)

print(feature_importances)

In [None]:
le_train = LabelEncoder()
df_train["label"] = le_train.fit_transform(df_train["label"])

In [None]:
class SquarePad:
    def __call__(self, image):
        w, h = image.size
        max_wh = max(w, h)
        pad_left = (max_wh - w) // 2
        pad_top = (max_wh - h) // 2
        pad_right = max_wh - w - pad_left
        pad_bottom = max_wh - h - pad_top
        return transforms.functional.pad(image, (pad_left, pad_top, pad_right, pad_bottom), 0, 'constant')

transforms_train = transforms.Compose([
    SquarePad(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    ),

])

transforms_val = transforms.Compose([
    SquarePad(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(
    mean=[0.485, 0.456, 0.406],
    std=[0.229, 0.224, 0.225]
),
])




In [None]:
########## Normal Image Dataset ##########
class ImageDataset(Dataset):
    def __init__(self, df, transform, dir_path, special_label=None,  mode="train"):
        self.df = df.reset_index()
        self.transform = transform
        self.dir_path = dir_path
        self.special_label = special_label
        self.mode = mode

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = self.df.loc[idx, 'filename']
        label = self.df.loc[idx, 'label']
        image = Image.open(os.path.join(self.dir_path, img_path)).convert("RGB")
        image = self.transform(image)
        return image, label

In [None]:
train_data, val_data = train_test_split(df_train, test_size=0.2, random_state=42, stratify =df_train["label"])

train_data = ImageDataset(train_data, transforms_train, IMAGE_DIR, mode="train")
val_data = ImageDataset(val_data, transforms_val, IMAGE_DIR, mode="val")

In [None]:
train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
val_loader = DataLoader(val_data, batch_size=32, shuffle=False)

In [None]:
class Trainer():
     def __init__(self, model, device, criterion, optimizer, early_stopping_patience, save_path, wscheduler=None):
         self.model = model.to(device)
         self.device = device
         self.criterion = criterion
         self.optimizer = optimizer
         self.scheduler =scheduler
         self.early_stopping_patience = early_stopping_patience
         self.save_path = save_path
         self.best_val_loss = 0.0
         self.epochs_no_improve = 0

     def train_one_epoch(self, train_loader):
            self.model.train()

            running_loss = 0.0
            correct, total = 0,0

            for images, labels in tqdm(train_loader, desc="Training", leave=False):
                images, labels = images.to(self.device), labels.to(self.device)

                self.optimizer.zero_grad()
                outputs = self.model(images)
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()

                running_loss += loss.item() * images.size(0)
                _, preds = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (preds == labels).sum().item()

            train_acc = 100 * correct / total
            train_loss = running_loss / total

            return train_loss, train_acc

     def validate_per_epoch(self, val_loader):
            val_loss, val_correct, val_total = 0.00, 0, 0
            with torch.no_grad():
                for images, labels in tqdm(val_loader, desc="Validation"):
                    images, labels = images.to(self.device), labels.to(self.device)
                    outputs = model(images)
                    loss = criterion(outputs, labels)

                    val_loss += loss.item() * images.size(0)
                    _, preds = torch.max(outputs, 1)
                    val_total += labels.size(0)
                    val_correct += (preds == labels).sum().item()

            epoch_loss = val_loss/val_total
            epoch_acc = 100 * val_correct / val_total

            return epoch_loss, epoch_acc


     def fit(self, train_loader, val_loader, num_epochs):
            history = {"train_loss": [], "val_loss": [], "train_acc": [], "val_acc": []}

            for epoch in range(num_epochs):
                print(f"epochs {epoch}/{num_epochs}")
                train_loss, train_acc = self.train_one_epoch(train_loader)
                val_loss, val_acc = self.validate_per_epoch(val_loader)

                history["train_loss"].append(train_loss)
                history["train_acc"].append(train_acc)
                history["val_loss"].append(val_loss)
                history["val_acc"].append(val_acc)

                print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.2f}%"
                     f"\nValidation Loss: {val_loss:.4f}, Validation Accuracy: {val_acc:.2f}%")

                if val_loss < self.best_val_loss:
                    self.best_val_loss = val_loss
                    self.save = f"model/model-epoch{epoch}-best.pth"
                    torch.save(self.model.state_dict(), self.save_path)
                    self.epochs_no_improve = 0
                else:
                    self.epochs_no_improve += 1
                    if self.early_stopping_patience and self.epochs_no_improve > self.early_stopping_patience:
                        self.save_path = "model/finetuned-model.pth"
                        torch.save(self.model.state_dict(), self.save_path)
                        print(f"Early stopped latest model saved {self.save_path}")




In [None]:
import torch
import torch.nn as nn
import timm

class HybridFineTune(nn.Module):
    def __init__(self, num_classes, convnext_weight_path=None, freeze_backbone=True, dropout=0.3):
        super(HybridFineTune, self).__init__()

        # === EfficientNet backbone (fine-grained features) ===
        self.eff = timm.create_model('efficientnetv2_s', pretrained=False, num_classes=0)
        eff_features = self.eff.num_features

        # === ConvNeXt backbone (general features) ===
        self.convnext = timm.create_model('convnext_tiny', pretrained=False, num_classes=0)
        if convnext_weight_path is not None:
            print(f"Loading ConvNeXt weights from {convnext_weight_path} ...")
            state_dict = torch.load(convnext_weight_path, map_location='cpu')
            self.convnext.load_state_dict(state_dict, strict=False)
        conv_features = self.convnext.num_features

        # === Freeze backbone (optional) ===
        if freeze_backbone:
            for p in self.eff.parameters():
                p.requires_grad = False
            for p in self.convnext.parameters():
                p.requires_grad = False

        # === Fusion layer ===
        total_features = eff_features + conv_features
        self.classifier = nn.Sequential(
            nn.Linear(total_features, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        eff_out = self.eff(x)
        conv_out = self.convnext(x)

        # Concatenate kedua feature
        combined = torch.cat((eff_out, conv_out), dim=1)
        out = self.classifier(combined)
        return out


In [None]:
model = HybridFineTune(convnext_weight_path= "/content/pretrained/convnext_tiny_pretrained.pth", num_classes = int(df_train["label"].nunique()), freeze_backbone= False)

model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

In [None]:


trainer = Trainer(
    model = model,
    device = device,
    criterion = criterion,
    optimizer = optimizer,
    early_stopping_patience = 10,
    save_path = "model/hybridfinetuned.pth"
)

In [None]:
history = trainer.fit(train_loader, val_loader, num_epochs = 15)

In [None]:
model = HybridFineTune(convnext_weight_path= "/content/pretrained/convnext_tiny_pretrained.pth", num_classes = int(df_train["label"].nunique()), freeze_backbone= False)
model.load_state_dict(torch.load("/content/model/finetuned-model.pth"))

In [None]:
pred_val = []
label_val = []

model.to(device)

val_loss, val_correct, val_total = 0.00, 0, 0
with torch.no_grad():
      for images, labels in tqdm(val_loader, desc="Validation"):
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        loss = criterion(outputs, labels)

        val_loss += loss.item() * images.size(0)
        _, preds = torch.max(outputs, 1)
        val_total += labels.size(0)
        val_correct += (preds == labels).sum().item()

        label_val.extend(labels.cpu().numpy())
        pred_val.extend(preds.cpu().numpy())

epoch_loss = val_loss/val_total
epoch_acc = 100 * val_correct / val_total

print(classification_report(label_val, pred_val))




In [None]:
image_paths = "/content/dataset/Sheep Classification Images/test"


images = []
for image in os.listdir(image_paths):
    files = os.path.join(image_paths, image)
    images.append(image)

df_test = pd.DataFrame({"filename": images})
df_test

In [None]:

class TestDataset(Dataset):
    def __init__(self, df, transform, dir_path):
        self.df = df.reset_index()
        self.transform = transform
        self.dir_path = dir_path


    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = self.df.loc[idx, 'filename']
        image = Image.open(os.path.join(self.dir_path, img_path)).convert("RGB")
        image = self.transform(image)
        return image

In [None]:
test_data = TestDataset(df_test, transform = transforms_val, dir_path = image_paths)

In [None]:
test_loader = DataLoader(test_data)

In [None]:
model.eval()

In [None]:
pred = []

model.to(device)
with torch.no_grad():
    for images in tqdm(test_loader, desc="Testing"):
        images = images.to(device)
        outputs = model(images)
        _, preds = torch.max(outputs, 1)
        pred.append(int(preds.item()))



In [None]:
df_test["label"] = le_train.inverse_transform(pred)


df_test[["filename", "label"]].to_csv("result_pred.csv",  index=False)

In [None]:
!kaggle competitions submit -c sheep-classification-challenge-2025 -f result_pred.csv -m "eddicientnetxconvnexttint"