In [46]:
from transformers import Swinv2Config, Swinv2Model
from transformers import AutoModel
from transformers import (
    AutoImageProcessor,
    AutoModelForImageClassification,
    TrainingArguments,
    Trainer
)
from datasets import load_dataset, ClassLabel

import numpy as np
import torch
import torch.nn as nn
import os
import math
import cv2
from random import randint
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch import einsum
from einops import rearrange, repeat
from einops.layers.torch import Rearrange

import torchvision
from torchvision import datasets
from torchvision import transforms
from torch.utils.data.sampler import SubsetRandomSampler
from torch.utils.data import Dataset, DataLoader
import torch.utils.data as Data
from torchvision import models
import torch.optim as optim

import urllib.request
import zipfile
import os
import time
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [111]:
num_classes = 5  
model_name = "microsoft/swinv2-tiny-patch4-window8-256"

model = AutoModelForImageClassification.from_pretrained(
    model_name,
    num_labels=num_classes,
    id2label={i: str(i) for i in range(num_classes)},
    label2id={str(i): i for i in range(num_classes)},
    ignore_mismatched_sizes=True,
)

Some weights of Swinv2ForImageClassification were not initialized from the model checkpoint at microsoft/swinv2-tiny-patch4-window8-256 and are newly initialized because the shapes did not match:
- classifier.weight: found shape torch.Size([1000, 768]) in the checkpoint and torch.Size([5, 768]) in the model instantiated
- classifier.bias: found shape torch.Size([1000]) in the checkpoint and torch.Size([5]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [89]:
# в случае с обычным MLP пропускается
kernel_size = 3
window_size = 2* model.swinv2.config.window_size -1
conv_pos_1 = nn.Conv1d(2, 512, kernel_size, padding='same')
for stage in model.swinv2.encoder.layers:
    for block in stage.blocks:
        out = block.attention.self.continuous_position_bias_mlp[2].out_features
        conv_pos_2 = nn.Conv1d(512, out, kernel_size, padding='same')
        block.attention.self.continuous_position_bias_mlp = nn.Sequential(Rearrange('b h w c -> b c (h w)'),
                                                                            conv_pos_1,
                                                                            nn.ReLU(),
                                                                            conv_pos_2,
                                                                            Rearrange('b c (h w) -> b h w c', h=window_size, w=window_size))

In [112]:
# для случайной инициализации весов (для более честного сравнения с conv1d), пропускается если используется свертка
pos_1 = nn.Linear(2, 512)
for stage in model.swinv2.encoder.layers:
    for block in stage.blocks:
        out = block.attention.self.continuous_position_bias_mlp[2].out_features
        pos_2 = nn.Linear(512, out)
        block.attention.self.continuous_position_bias_mlp = nn.Sequential(pos_1,
                                                                          nn.ReLU(),
                                                                          pos_2,
                                                                          )

In [113]:
for param in model.parameters():
    param.requires_grad = False

for param in model.classifier.parameters():
    param.requires_grad = True

for stage in model.swinv2.encoder.layers:
    for block in stage.blocks:
        for param in block.attention.self.continuous_position_bias_mlp.parameters():
            param.requires_grad = True

In [114]:
data_path = 'D:\\air_qual\\dataset'
for dirname, _, filenames in os.walk(data_path):
    for filename in filenames:
        os.path.join(dirname, filename)
        
class_names = sorted(os.listdir(data_path))
num_classes = len(class_names)

img_size = (224, 224, 3)

print('classes: ', class_names)

classes:  ['Daphne', 'Fred', 'Scooby', 'Shaggy', 'Velma']


In [115]:
labels = []
images = []

print('images:\n')
for cl in class_names:
    print(cl, end=' -> ')
    for img in os.listdir(data_path +'/'+ cl):
        label = np.zeros(num_classes)
        label[class_names.index(cl)] = 1
        labels.append(label)
        image = cv2.imread(data_path + "/" + cl + '/' + img, cv2.IMREAD_COLOR)
        image = cv2.resize(image,(224,224))[:, :, ::-1]
        image = np.asarray(image)
        images.append(image)
    print('done')


labels = np.asarray(labels)
images = np.asarray(images)

print(f'\n\nlabels shape: {labels.shape}')
print(f'images shape: {images.shape}')

images:

Daphne -> done
Fred -> done
Scooby -> done
Shaggy -> done
Velma -> done


labels shape: (221, 5)
images shape: (221, 224, 224, 3)


In [116]:
class DataSet():
  def __init__(self,images,labels,transform = None):
    self.image = images
    self.label = labels
    self.transform = transform

  def __getitem__(self, index):
      image = self.image[index]
      if self.transform:
          image = self.transform(self.image[index])
      return image, self.label[index]

  def __len__(self):
    return len(self.image)

In [117]:
X_train, X_val, y_train, y_val = train_test_split(images, labels, test_size=0.2, random_state=42)

y_train = torch.from_numpy(y_train).long()
y_val = torch.from_numpy(y_val).long()

if len(y_train.shape) > 1 and y_train.shape[1] > 1:
    y_train = torch.argmax(y_train, dim=1)
if len(y_val.shape) > 1 and y_val.shape[1] > 1:
    y_val = torch.argmax(y_val, dim=1)


DATA_MEANS = (X_train / 255.0).mean(axis=(0, 1, 2))
DATA_STD = (X_train / 255.0).std(axis=(0, 1, 2))
print("Data mean", DATA_MEANS)
print("Data std", DATA_STD)

Data mean [0.48534315 0.4474048  0.40226592]
Data std [0.35551016 0.32614718 0.33380135]


In [118]:
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=DATA_MEANS, std=DATA_STD)
])

In [119]:
batch_size = 16

dataset_train = DataSet(X_train, y_train,transform)
dataset_test = DataSet(X_val, y_val,transform)


train_loader = DataLoader(dataset_train, batch_size=batch_size, shuffle=True, num_workers=0)#num_workers=0 важно из-за проблем с многопоточностью в windows
test_loader = DataLoader(dataset_test, batch_size=batch_size, shuffle=False, num_workers=0)

In [120]:
num_epochs = 100
batch_size = 12
learning_rate = 0.0003
num_classes = 5
patience = 12 # количество эпох которые loss может не уменьшаться перед остановкой обучения

In [121]:
time_start = time.time()

model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = ReduceLROnPlateau(optimizer, 'min',patience=8)
total_step = len(train_loader)
total_step_test = len(test_loader)

best_loss = float('inf')
test_losses = []
train_losses = []
test_accuracies = []
train_accuracies = []
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    train_acc = 0
    test_acc = 0
    for i, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)
        
        outputs = model(images).logits
        loss = criterion(outputs , labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        ver, predicted = torch.max(outputs, 1)
        
        train_acc += accuracy_score(predicted.cpu().numpy(),labels.cpu().numpy())
    final_train_acc = train_acc/total_step
    train_accuracies.append(final_train_acc)
    final_train_loss = running_loss / total_step

    train_losses.append(final_train_loss)
    with torch.no_grad():
        correct = 0
        total = 0
        running_loss_test = 0
        for (images, labels) in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images).logits 
            ver, predicted = torch.max(outputs, 1)
            loss_test = criterion(outputs, labels)
            running_loss_test += loss.item()
            test_acc += accuracy_score(predicted.cpu().numpy(),labels.cpu().numpy())
        final_test_acc = test_acc/total_step
        val_loss = running_loss_test/total_step_test
        scheduler.step(val_loss)
        test_accuracies.append(final_test_acc)
        test_losses.append(val_loss)
        print(f'Epoch [{epoch+1}/{num_epochs}], Test Accuracy: {final_test_acc}, train acc: {final_train_acc} ')
        print(f"test loss: {val_loss}, train loss: {final_train_loss}")
time_end = time.time()
print(f"время обучения: {time_end-time_start}")

Epoch [1/100], Test Accuracy: 0.060751748251748255, train acc: 0.22727272727272727 
test loss: 1.5578160285949707, train loss: 1.6206666231155396
Epoch [2/100], Test Accuracy: 0.06512237762237762, train acc: 0.2840909090909091 
test loss: 1.5189154148101807, train loss: 1.5589484734968706
Epoch [3/100], Test Accuracy: 0.0777972027972028, train acc: 0.3068181818181818 
test loss: 1.5383647680282593, train loss: 1.5245891484347256
Epoch [4/100], Test Accuracy: 0.09746503496503496, train acc: 0.42613636363636365 
test loss: 1.470696210861206, train loss: 1.4768278923901645
Epoch [5/100], Test Accuracy: 0.09877622377622379, train acc: 0.5227272727272727 
test loss: 1.43265962600708, train loss: 1.4162266471169211
Epoch [6/100], Test Accuracy: 0.10751748251748251, train acc: 0.5511363636363636 
test loss: 1.235213279724121, train loss: 1.353129354390231
Epoch [7/100], Test Accuracy: 0.11145104895104894, train acc: 0.5511363636363636 
test loss: 1.192201852798462, train loss: 1.2927166115153

In [122]:
max(test_accuracies)

0.22027972027972031