In [None]:
# نصب کتابخانه‌های مورد نیاز
!pip install gdown
!pip install tqdm
!pip install huggingface_hub
!pip install timm
!pip install torchmetrics
!pip install albumentations
!pip install torchsummary




In [None]:
import os
from huggingface_hub import hf_hub_download
import tarfile

# دانلود دیتاست
hf_hub_download(repo_id='RayanAi/inat_train_modified',
               filename="inat_train_modified.tar.gz",
               repo_type="dataset",
               local_dir=".")

# استخراج فایل tar.gz
with tarfile.open("inat_train_modified.tar.gz", "r:gz") as tar:
    tar.extractall(path=".")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


inat_train_modified.tar.gz:   0%|          | 0.00/11.4G [00:00<?, ?B/s]

In [None]:
from typing import Iterator
from torch.utils.data import Dataset
from PIL import Image
from torchvision import transforms

class Node:
    def __init__(self, name):
        self.name = name
        self._count = 0
        self.children = {}
        self._entities = []

    def add_to_node(self, path, entity, level=0):
        if level >= len(path):
            self._entities.append(entity)
            return
        part = path[level]
        if part not in self.children:
            self.children[part] = Node(path[:level+1])
        self.children[part].add_to_node(path, entity, level=level+1)
        self._count += 1

    @property
    def is_leaf(self):
        return len(self._entities) > 0

    @property
    def count(self):
        if self.is_leaf:
            return len(self._entities)
        else:
            return self._count

    @property
    def entities(self):
        if self.is_leaf:
            return list((entity, self.name) for entity in self._entities)
        else:
            child_entities = []
            for child in self.children.values():
                child_entities.extend(child.entities)
        return child_entities

    def level_iterator(self, level=None):
        if level == 0:
            yield self
        elif level is None and self.is_leaf:
            yield self
        elif self.is_leaf and level != 0:
            raise Exception("Incorrect level is specified in tree.")
        else:
            if level is not None:
                level -= 1
            for child in self.children.values():
                for v in child.level_iterator(level):
                    yield v

    def print_node(self, level=0, max_level=None):
        print(' ' * (level * 4) + f"{self.name[-1]} ({self.count})")
        for node in self.children.values():
            if max_level is None or level < max_level:
                node.print_node(level + 1, max_level=max_level)
        return

class HierarchicalDataset(Dataset):
    def __init__(self, dataset_path, level=None, transform=None, verbose=True):
        self.tree = Node(("Dataset",))  # Initialize with root
        self.level = level if level is not None else 2  # تغییر به سطح 2 بر اساس قوانین مسابقه
        self.classes = set()
        self.data = []
        self.transform = transform

        index = 0
        for group_name in sorted(os.listdir(dataset_path)):
            group_dir = os.path.join(dataset_path, group_name)
            if not os.path.isdir(group_dir):
                continue
            for image_name in sorted(os.listdir(group_dir)):
                image_path = os.path.join(group_dir, image_name)
                group = tuple(group_name.split("_")[1:])  # Assuming format like 'class_name'
                if len(group) < self.level:
                    continue  # Skip if group path is shorter than required level
                group = group[:self.level]
                self.data.append({
                    "image_path": image_path,
                    "group": group,
                })
                self.tree.add_to_node(group, index)
                index += 1
                self.classes.add(group)

        self.classes = {group: idx for idx, group in enumerate(sorted(list(self.classes)))}

        if verbose:
            print(f"Dataset Length: {len(self.data)}")
            print("Hierarchical Structure (up to level 2):")
            self.tree.print_node(max_level=2)
            print(f"Number of classes: {len(self.classes)}")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        image = Image.open(sample["image_path"]).convert('RGB')
        group = sample["group"][:self.level]
        target = self.classes[group]
        if self.transform:
            image = self.transform(image)
        return image, target

    def get_group_iterator(self, level=None) -> Iterator[Node]:
        for group in self.tree.level_iterator(level):
            yield group


In [None]:
from torchvision import transforms
import albumentations as A
from albumentations.pytorch import ToTensorV2
import numpy as np
import torch

# تعریف MixUp
def mixup_data(x, y, alpha=0.2):
    '''Returns mixed inputs, pairs of targets, and lambda'''
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    batch_size = x.size()[0]
    if torch.cuda.is_available():
        index = torch.randperm(batch_size).cuda()
    else:
        index = torch.randperm(batch_size)

    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

# تعریف CutMix
def cutmix_data(x, y, alpha=1.0):
    '''Returns cutmixed inputs, pairs of targets, and lambda'''
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    batch_size, C, H, W = x.size()
    if torch.cuda.is_available():
        index = torch.randperm(batch_size).cuda()
    else:
        index = torch.randperm(batch_size)

    # تعریف منطقه برای برش
    cx = np.random.randint(W)
    cy = np.random.randint(H)
    rw = int(W * np.sqrt(1 - lam))
    rh = int(H * np.sqrt(1 - lam))

    x1 = np.clip(cx - rw // 2, 0, W)
    y1 = np.clip(cy - rh // 2, 0, H)
    x2 = np.clip(cx + rw // 2, 0, W)
    y2 = np.clip(cy + rh // 2, 0, H)

    x[:, :, y1:y2, x1:x2] = x[index, :, y1:y2, x1:x2]
    lam = 1 - ((x2 - x1) * (y2 - y1) / (W * H))
    y_a, y_b = y, y[index]
    return x, y_a, y_b, lam

# تعریف داده‌افزایی برای آموزش
train_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomRotation(15),
    transforms.ToTensor(),  # تبدیل به تنسور قبل از RandomErasing
    transforms.Normalize((0.4556, 0.4714, 0.3700), (0.2370, 0.2318, 0.2431)),
    transforms.RandomErasing(p=0.1)  # اعمال RandomErasing بعد از تبدیل به تنسور
])

# پیش‌پردازش برای مجموعه اعتبارسنجی
val_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize((0.4556, 0.4714, 0.3700), (0.2370, 0.2318, 0.2431))
])


In [None]:
import torch
from torchvision import models
from torch import nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
import numpy as np
from torch.amp import GradScaler  # استفاده از torch.amp
from torchmetrics import Accuracy

def get_model(num_classes):
    model = models.resnet50(pretrained=True)
    num_ftrs = model.fc.in_features
    model.fc = nn.Linear(num_ftrs, num_classes)
    return model

# تنظیم دستگاه
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# بارگذاری مدل و دیتاست آموزشی
dataset_path = 'train'  # مسیر دیتاست
full_dataset = HierarchicalDataset(dataset_path=dataset_path, level=2, transform=train_transform)
model = get_model(num_classes=len(full_dataset.classes)).to(device)

# تنظیمات آموزش
learning_rate = 1e-4
batch_size = 256
num_epochs = 30
validation_split = 0.1
random_seed = 42

# تقسیم‌بندی به مجموعه آموزشی و اعتبارسنجی
dataset_size = len(full_dataset)
indices = list(range(dataset_size))
split = int(np.floor(validation_split * dataset_size))

np.random.seed(random_seed)
np.random.shuffle(indices)

train_indices, val_indices = indices[split:], indices[:split]

train_subset = torch.utils.data.Subset(full_dataset, train_indices)

# ایجاد یک مجموعه جدید برای اعتبارسنجی بدون اعمال transforms آموزشی
val_dataset = HierarchicalDataset(dataset_path=dataset_path, level=2, transform=val_transform, verbose=False)
val_subset = torch.utils.data.Subset(val_dataset, val_indices)

# ایجاد یک مجموعه جدید بدون اعمال transforms برای محاسبه وزن‌های کلاس
train_dataset_no_transform = HierarchicalDataset(dataset_path=dataset_path, level=2, transform=None, verbose=False)
train_subset_no_transform = torch.utils.data.Subset(train_dataset_no_transform, train_indices)

# محاسبه وزن‌های کلاس
def compute_class_weights(dataset):
    class_counts = {}
    for _, label in dataset:
        class_counts[label] = class_counts.get(label, 0) + 1
    total_samples = len(dataset)
    num_classes = len(full_dataset.classes)
    class_weights = [total_samples / (num_classes * class_counts[i]) for i in range(num_classes)]
    return torch.tensor(class_weights, dtype=torch.float).to(device)

class_weights = compute_class_weights(train_subset_no_transform)

# بارگذاری داده‌ها
train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True, num_workers=8, pin_memory=True)
val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False, num_workers=8, pin_memory=True)

# تعریف تابع هزینه و بهینه‌ساز
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=1e-5)

# تنظیم‌کننده نرخ یادگیری
scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=learning_rate, steps_per_epoch=len(train_loader), epochs=num_epochs, anneal_strategy='linear')


Using device: cuda
Dataset Length: 99970
Hierarchical Structure (up to level 2):
Dataset (99970)
    Animalia (49112)
        Annelida (13)
        Arthropoda (29675)
        Chordata (18518)
        Cnidaria (124)
        Echinodermata (83)
        Mollusca (699)
    Fungi (1812)
        Ascomycota (396)
        Basidiomycota (1416)
    Plantae (49046)
        Bryophyta (133)
        Chlorophyta (13)
        Marchantiophyta (22)
        Rhodophyta (22)
        Tracheophyta (48856)
Number of classes: 13


In [None]:
from tqdm import tqdm
import torch
import torch.nn.functional as F
from torchmetrics import Accuracy

def train_one_epoch(model, dataloader, criterion, optimizer, device, scaler, scheduler):
    model.train()
    running_loss = 0.0
    # مشخص کردن task و تعداد کلاس‌ها
    num_classes = len(full_dataset.classes)
    accuracy = Accuracy(task="multiclass", num_classes=num_classes).to(device)
    progress_bar = tqdm(dataloader, desc="Training", leave=False)

    for inputs, labels in progress_bar:
        inputs, labels = inputs.to(device, non_blocking=True), labels.to(device, non_blocking=True)

        # اعمال MixUp
        mixed_inputs, targets_a, targets_b, lam = mixup_data(inputs, labels, alpha=0.2)

        optimizer.zero_grad()

        with torch.amp.autocast(device_type='cuda'):  # استفاده از autocast برای mixed precision
            outputs = model(mixed_inputs)
            loss = lam * criterion(outputs, targets_a) + (1 - lam) * criterion(outputs, targets_b)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        # به‌روزرسانی نرخ یادگیری
        scheduler.step()

        # آمارگیری
        running_loss += loss.item() * inputs.size(0)
        preds = torch.argmax(outputs, dim=1)
        # برای محاسبه دقت اصلی، نیاز به استفاده از targets_a و targets_b نیست
        accuracy.update(preds, labels)

        progress_bar.set_postfix(loss=loss.item(), accuracy=accuracy.compute().item()*100)

    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = accuracy.compute().item()
    return epoch_loss, epoch_acc

def validate(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    num_classes = len(full_dataset.classes)
    accuracy = Accuracy(task="multiclass", num_classes=num_classes).to(device)
    progress_bar = tqdm(dataloader, desc="Validation", leave=False)

    # ایجاد دیکشنری برای ذخیره دقت هر گروه
    group_correct = {}
    group_total = {}

    with torch.no_grad():
        for inputs, labels in progress_bar:
            inputs, labels = inputs.to(device, non_blocking=True), labels.to(device, non_blocking=True)

            with torch.amp.autocast(device_type='cuda'):
                outputs = model(inputs)
                loss = criterion(outputs, labels)

            running_loss += loss.item() * inputs.size(0)
            preds = torch.argmax(outputs, dim=1)
            accuracy.update(preds, labels)

            # به‌روزرسانی دقت هر گروه
            for i in range(len(labels)):
                label = labels[i].item()
                pred = preds[i].item()
                if label not in group_correct:
                    group_correct[label] = 0
                    group_total[label] = 0
                if pred == label:
                    group_correct[label] += 1
                group_total[label] += 1

            progress_bar.set_postfix(loss=loss.item(), accuracy=accuracy.compute().item()*100)

    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = accuracy.compute().item()

    # محاسبه دقت بر اساس K پایین‌ترین گروه‌ها (فرض K=10%)
    group_acc = {label: group_correct.get(label, 0) / group_total.get(label, 1) for label in group_total}
    sorted_groups = sorted(group_acc.items(), key=lambda x: x[1])
    K = max(1, int(0.1 * len(group_acc)))  # K=10%
    top_k_groups = sorted_groups[:K]
    top_k_avg_acc = sum([acc for _, acc in top_k_groups]) / K

    return epoch_loss, top_k_avg_acc


In [None]:
import torch
import os
import zipfile
from torch.amp import GradScaler

# ایجاد پوشه برای چک‌پوینت‌ها
checkpoint_dir = './checkpoints'
os.makedirs(checkpoint_dir, exist_ok=True)

best_val_acc = 0.0
scaler = GradScaler()  # استفاده از GradScaler بدون پارامتر device

# اضافه کردن Early Stopping
early_stopping_patience = 5
early_stopping_counter = 0

for epoch in range(1, num_epochs + 1):
    print(f"Epoch {epoch}/{num_epochs}")

    train_loss, train_acc = train_one_epoch(model, train_loader, criterion, optimizer, device, scaler, scheduler)
    val_loss, val_acc = validate(model, val_loader, criterion, device)

    print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}%")
    print(f"Val Loss: {val_loss:.4f} | Val Acc (Top K): {val_acc:.2f}%")

    # ذخیره چک‌پوینت بهترین مدل
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        checkpoint_path = os.path.join(checkpoint_dir, 'best_checkpoint.pth')
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'val_loss': val_loss,
            'val_acc': val_acc,
        }, checkpoint_path)
        print(f"New best model saved with Val Acc: {best_val_acc:.2f}%")
        early_stopping_counter = 0  # Reset counter
    else:
        early_stopping_counter += 1
        if early_stopping_counter >= early_stopping_patience:
            print("Early stopping triggered.")
            break

    print("-" * 30)

# بارگذاری بهترین مدل
best_checkpoint = torch.load(os.path.join(checkpoint_dir, 'best_checkpoint.pth'), map_location=device)
model.load_state_dict(best_checkpoint['model_state_dict'])

# ذخیره وزن‌های نهایی مدل
torch.save(model.state_dict(), 'resnet.pth')

# ایجاد فایل ZIP برای ارسال
with zipfile.ZipFile('submission.zip', 'w') as zipf:
    zipf.write('resnet.pth')

print("وزن‌های مدل به نام 'resnet.pth' ذخیره شده و در 'submission.zip' بسته‌بندی شدند.")


Epoch 1/30




Train Loss: 1.5993 | Train Acc: 0.45%
Val Loss: 1.1964 | Val Acc (Top K): 0.00%
------------------------------
Epoch 2/30




Train Loss: 1.5619 | Train Acc: 0.44%
Val Loss: 1.3702 | Val Acc (Top K): 0.00%
------------------------------
Epoch 3/30




Train Loss: 1.4824 | Train Acc: 0.42%
Val Loss: 1.3586 | Val Acc (Top K): 0.00%
------------------------------
Epoch 4/30




Train Loss: 1.5788 | Train Acc: 0.44%
Val Loss: 1.4874 | Val Acc (Top K): 0.00%
------------------------------
Epoch 5/30


                                                                                      

Train Loss: 1.5734 | Train Acc: 0.42%
Val Loss: 1.2419 | Val Acc (Top K): 0.00%
Early stopping triggered.


  best_checkpoint = torch.load(os.path.join(checkpoint_dir, 'best_checkpoint.pth'), map_location=device)


FileNotFoundError: [Errno 2] No such file or directory: './checkpoints/best_checkpoint.pth'

In [None]:
from torch.utils.data import Sampler
import math

class GroupBalancedSampler(Sampler):
    def __init__(self, dataset, batch_size):
        self.dataset = dataset
        self.batch_size = batch_size
        self.groups = {}
        for idx in range(len(self.dataset)):
            _, label = self.dataset[idx]
            if label not in self.groups:
                self.groups[label] = []
            self.groups[label].append(idx)
        self.group_keys = list(self.groups.keys())
        self.num_groups = len(self.group_keys)
        self.samples_per_group = math.ceil(batch_size / self.num_groups)

    def __iter__(self):
        indices = []
        for group in self.group_keys:
            group_indices = self.groups[group]
            if len(group_indices) < self.samples_per_group:
                group_indices = group_indices * (self.samples_per_group // len(group_indices) + 1)
            selected = group_indices[:self.samples_per_group]
            indices.extend(selected)
        np.random.shuffle(indices)
        return iter(indices)

    def __len__(self):
        return self.batch_size * math.ceil(len(self.dataset) / self.batch_size)


In [None]:
import torch.nn.functional as F

class FocalLoss(nn.Module):
    def __init__(self, alpha=None, gamma=2, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        ce_loss = F.cross_entropy(inputs, targets, reduction='none', weight=self.alpha)
        pt = torch.exp(-ce_loss)
        focal_loss = ((1 - pt) ** self.gamma) * ce_loss

        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        else:
            return focal_loss


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import torch

# مسیر مدل اصلی شما
model_path = '/content/resnet.pth'

# بارگذاری مدل
model = torch.load(model_path)

# تبدیل پارامترهای مدل به نیمه‌دقت
model.half()

# ذخیره مدل با دقت نیمه
torch.save(model, '/content/resnet_fp16.pth')


  model = torch.load(model_path)


AttributeError: 'collections.OrderedDict' object has no attribute 'half'

In [None]:
import zipfile
import os

def zip_multiple_files(input_paths, output_zip_path, compression_level=5):
    # Ensure compression level is within 1-9
    compression_level = max(1, min(compression_level, 1))

    # Create a zip file with the specified compression level
    compression = zipfile.ZIP_DEFLATED

    # Create the zip file
    with zipfile.ZipFile(output_zip_path, 'w', compression) as zipf:
        for input_path in input_paths:
            # Check if the file or directory exists
            if not os.path.exists(input_path):
                print(f"{input_path} does not exist.")
                continue

            # If it's a directory, recursively add files
            if os.path.isdir(input_path):
                for root, dirs, files in os.walk(input_path):
                    for file in files:
                        file_full_path = os.path.join(root, file)
                        zipf.write(file_full_path,
                                   os.path.relpath(file_full_path,
                                                   os.path.join(input_path, '..')))
            # If it's a single file, add it to the zip file
            else:
                zipf.write(input_path, os.path.basename(input_path))

    print(f"Successfully zipped files to {output_zip_path} with compression level {compression_level}")

# Example usage:
input_paths = ['/content/model.py', '/content/model.pth']  # List of files or directories to zip
output_zip_path = '/content/submission1.zip'  # Path to save the output zip file
compression_level = 5  # Compression level from 1 (fastest) to 9 (most compressed)
zip_multiple_files(input_paths, output_zip_path, compression_level)


In [None]:
!rm -rf __pycache__

In [None]:
!rm -rf model.py /content/resnet.pth /content/submission2.zip /content/checkpoints

In [None]:
!cp /content/submission4.zip /content/drive/MyDrive/ML/Rayan/Q3/submission



---

