In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, datasets
from torch.utils.data import DataLoader
from transformers import CLIPProcessor, CLIPModel
from tqdm import tqdm
from PIL import ImageFile

In [2]:
# Включение профилирования CUDNN
torch.backends.cudnn.benchmark = True

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DEVICE

device(type='cuda')

In [3]:
# Путь к данным animals
train_path = 'D:\\ProgPrj\\dsProjects\\gazprom-media\\ml\\train'

test_path = 'D:\\ProgPrj\\dsProjects\\gazprom-media\\ml\\test'

# опробовать добавить что-то сюда

In [4]:
# Преобразования для тренировочного и валидационного наборов данных
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomApply(torch.nn.ModuleList([transforms.ColorJitter()]), p=0.25),
    transforms.Resize((224, 224)),
    #transforms.CenterCrop(224),
    transforms.RandomRotation(degrees=(-10, 10)),
    transforms.RandomGrayscale(p=0.2),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    transforms.RandomErasing(p=0.1, value='random')
])

In [5]:
val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    #transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

In [6]:
# Загрузка данных
train_dataset = datasets.ImageFolder(train_path, transform=train_transform)

val_dataset = datasets.ImageFolder(test_path, transform=val_transform)

In [7]:
print(len(train_dataset.class_to_idx))

101


In [8]:
batch_size = 64

num_w = 12

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_w, pin_memory=True)

val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_w, pin_memory=True)

In [None]:
# Загрузка модели и процессора CLIP
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")



In [None]:
# Размерность выходных признаков из модели CLIP
hidden_size = model.config.projection_dim

In [None]:
"""class Attention(nn.Module):
    def __init__(self, dim):
        super(Attention, self).__init__()
        self.query = nn.Linear(dim, dim)
        self.key = nn.Linear(dim, dim)
        self.value = nn.Linear(dim, dim)
        self.scale = dim ** -0.5

    def forward(self, x):
        q = self.query(x)
        k = self.key(x)
        v = self.value(x)
        attn_weights = torch.softmax(q @ k.transpose(-2, -1) * self.scale, dim=-1)
        return attn_weights @ v

class CustomCLIPModel(nn.Module):
    def __init__(self, clip_model, num_classes):
        super(CustomCLIPModel, self).__init__()
        self.clip_model = clip_model
        self.fc1 = nn.Linear(hidden_size, 512)
        self.attention = Attention(512)
        self.dropout = nn.Dropout(0.5)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, num_classes)

    def forward(self, x):
        with torch.no_grad():
            features = self.clip_model.get_image_features(x)
        x = self.fc1(features)
        x = self.attention(x)
        x = self.dropout(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)
        return x"""

In [None]:
# Добавляем новый классификационный слой
class CustomCLIPModel(nn.Module):
    def __init__(self, clip_model, num_classes):
        super(CustomCLIPModel, self).__init__()
        self.clip_model = clip_model
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        with torch.no_grad():
            features = self.clip_model.get_image_features(x)
        x = self.fc(features)
        return x

In [None]:
num_classes = len(train_dataset.classes)
custom_model = CustomCLIPModel(model, num_classes).to(DEVICE)

criterion = nn.CrossEntropyLoss()
optimizer = optim.RAdam(custom_model.parameters(), lr=1e-3, weight_decay=1e-4)
#optimizer = optim.AdamW(custom_model.parameters(), lr=1e-4, weight_decay=1e-5)

In [None]:
from torch.cuda.amp import autocast, GradScaler

In [None]:
num_epochs = 50
train_loss_history = []
val_accuracy_history = []
val_f1_history = []

scaler = torch.cuda.amp.GradScaler()

In [None]:
from sklearn.metrics import accuracy_score, f1_score

for epoch in range(num_epochs):
    ImageFile.LOAD_TRUNCATED_IMAGES = True
    custom_model.train()
    running_loss = 0.0
    train_loader_tqdm = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}')

    for inputs, labels in train_loader_tqdm:
        inputs, labels = inputs.to(DEVICE, non_blocking=True), labels.to(DEVICE, non_blocking=True)

        optimizer.zero_grad()

        with torch.cuda.amp.autocast():
            outputs = custom_model(inputs)
            loss = criterion(outputs, labels)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item() * inputs.size(0)
        train_loader_tqdm.set_postfix(loss=loss.item())

    epoch_loss = running_loss / len(train_dataset)
    train_loss_history.append(epoch_loss)

    # Валидация модели
    custom_model.eval()
    val_loss = 0.0
    all_labels = []
    all_predictions = []

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(DEVICE, non_blocking=True), labels.to(DEVICE, non_blocking=True)

            with torch.cuda.amp.autocast():
                outputs = custom_model(inputs)
                loss = criterion(outputs, labels)

            val_loss += loss.item() * inputs.size(0)
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(outputs.argmax(dim=1).cpu().numpy())

    val_loss /= len(val_dataset)
    accuracy = accuracy_score(all_labels, all_predictions)
    f1 = f1_score(all_labels, all_predictions, average='weighted')

    print(f'Epoch {epoch+1}/{num_epochs} - Train Loss: {epoch_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {accuracy:.4f}, Val F1-Score: {f1:.4f}')

In [None]:
custom_model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in val_loader:
        inputs, labels = inputs.to(DEVICE, non_blocking=True), labels.to(DEVICE, non_blocking=True)
        outputs = custom_model(inputs)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

accuracy = accuracy_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds, average='macro')
val_accuracy_history.append(accuracy)
val_f1_history.append(f1)
print(f'Validation Accuracy: {accuracy:.4f}, F1 Score: {f1:.4f}')