# Импорт библиотек

In [1]:
from PIL import Image
from tqdm import tqdm
import yaml
import os
import cv2
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
from sklearn.utils.class_weight import compute_class_weight
import pandas as pd
import numpy as np
import rarfile
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.transforms import v2
from torch.utils.data import DataLoader, Dataset
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torchvision.io import read_image


KeyboardInterrupt



выбор девайса для обучения

In [None]:
if torch.cuda.is_available(): device = 'cuda'
else: device = 'cpu'
print(f"Using {device} device")

## данные для обучения

просмотр мета данных

In [None]:
with open('data.yaml') as file:
    options = yaml.safe_load(file)
    file.close()

In [None]:
options

загрузка тренировочных и валидационных данных в датафрейм

In [None]:
dir_path = 'img_data/'

In [None]:
train_df = pd.DataFrame()
path = '/train/yamls'

for file in tqdm(os.listdir(dir_path + path)):

    if file.endswith('.txt'):
        data = np.loadtxt(os.path.join(dir_path + path, file))

        if data.shape[-1] == 0:
            continue
        try:
            data = data.reshape((-1, 5))
        except ValueError:
            continue

        data = pd.DataFrame({'img' : [file[:-3]+'jpg'], 'labels' : [data[:, 0].astype(np.int64)], 'coords' : [data[:, 1:]]})
        train_df = pd.concat([train_df, data], ignore_index=True)

In [None]:
train_df

In [None]:
val_df = pd.DataFrame()
path = '/valid/yamls'

for file in tqdm(os.listdir(dir_path + path)):

    if file.endswith('.txt'):
        data = np.loadtxt(os.path.join(dir_path + path, file))

        if data.shape[-1] == 0:
            continue
        try:
            data = data.reshape((-1, 5))
        except ValueError:
            continue

        data = pd.DataFrame({'img' : [file[:-3]+'jpg'], 'labels' : [data[:, 0].astype(np.int64)], 'coords' : [data[:, 1:]]})
        val_df = pd.concat([val_df, data], ignore_index=True)

In [None]:
val_df

## анализ данных

In [None]:
train_df.info()

проверка на пропуски

In [None]:
train_df.isna().sum().sum()

проверка на дубликаты

In [None]:
train_df.img.duplicated().sum()

In [None]:
train_df.coords.duplicated().sum()

## проверка на наличие изображений не относящихся к тематике датасета

задаем трансформ

In [None]:
transform = v2.Compose([
    v2.Resize((256,256)),
    v2.ToTensor(),
])

определение автоэнкодера

In [None]:
# Определение модели автоэнкодера
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 16, 3, stride=3, padding=1),  # (batch_size, 16, 10, 10)
            nn.ReLU(),
            nn.MaxPool2d(2, stride=2),  # (batch_size, 16, 5, 5)
            nn.Conv2d(16, 8, 3, stride=2, padding=1),  # (batch_size, 8, 3, 3)
            nn.ReLU(),
            nn.MaxPool2d(2, stride=1)  # (batch_size, 8, 2, 2)
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(8, 16, 3, stride=2),  # (batch_size, 16, 5, 5)
            nn.ReLU(),
            nn.ConvTranspose2d(16, 8, 5, stride=3, padding=1),  # (batch_size, 8, 15, 15)
            nn.ReLU(),
            nn.ConvTranspose2d(8, 3, 2, stride=2, padding=1),  # (batch_size, 3, 32, 32)
            nn.Tanh()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [None]:
ae_model = Autoencoder().to(device)

In [None]:
criterion = nn.MSELoss()
optimizer = optim.Adam(ae_model.parameters(), lr=3e-4)

создание датасета для обучения автоэнкодера

In [None]:
class Q_Dataset(Dataset):
    def __init__(self, img_names, dir_path = '', transform=None):
        super().__init__()
        self.names = img_names
        self.transform = transform
        self.dir_path = dir_path

    def __len__(self):
        return len(self.names)

    def __getitem__(self, ind):

        img = read_image(self.dir_path + self.names[ind])

        img = self.transform(img / 255)

        return img

In [None]:
val_dir_path = 'img-data/valid/images/'

ae_valset = Q_Dataset(
    img_names = val_df['img'].tolist(),
    dir_path = val_dir_path,
    transform = transform
)

In [None]:
ae_valset[0].shape

In [None]:
ae_valloader = DataLoader(ae_valset, batch_size=1)

обучение автоэнкодера

In [None]:
num_epochs = 25
ae_model.train()

for epoch in range(num_epochs):
    print('Epoch', epoch+1)

    for img in tqdm(ae_valloader):
        img = img.to(device)

        output = ae_model(img)

        loss = criterion(output, img)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}\n")

создание датасета для поиска аномалий

In [None]:
t_transform = v2.Compose([
    v2.Resize((64, 64)),
])

In [None]:
train_dir_path = 'img-data/train/images/'

ae_trainset = Q_Dataset(
    img_names = train_df['img'].tolist(),
    dir_path = train_dir_path,
    transform = t_transform
)

In [None]:
ae_trainset[0].shape

In [None]:
ae_trainloader = DataLoader(ae_trainset, batch_size=1)

поиск аномалий

In [None]:
count = 100
embeds = []
ae_model.eval()

with torch.no_grad():
    for images in tqdm(ae_trainloader):
        images = images.to(device)

        epoch_embeds = ae_model(images).cpu()

        embeds.append(epoch_embeds)
        count += 1
        if count == 100:
            break

In [None]:
embeds = torch.concat(embeds, dim=0)

In [None]:
mean_embed = embeds.mean(dim = 0)

In [None]:
embeds.shape[0]

In [None]:
embeds[4096].shape

In [None]:
embeds[4095].shape

In [None]:
mean_embed.shape

In [None]:
cos_sims = []

for i in range(embeds.shape[0]):
    cos_sims.append(torch.nn.CosineSimilarity(dim=0)(mean_embed, embeds[i])) #.item())

In [None]:
cos_sims = torch.stack(cos_sims)

In [None]:
cos_sims = torch.tensor(cos_sims)

In [None]:
torch.topk(cos_sims * (-1), k=10).indices[0]

In [None]:
ae_trainset[torch.topk(cos_sims * (-1), k=10).indices[0]]

In [None]:
import matplotlib.pyplot as plt
plt.imshow(dataset[torch.topk(cos_sims * (-1), k=10).indices[9]].permute(1,2,0))

In [None]:
#top_anomaly_indices = np.argsort(anomaly_scores)[-10:]
#print("Top anomaly indices:", top_anomaly_indices)

In [None]:
img1 = plt.imread(train_dir_path + train_df.img[top_anomaly_indices[0]])

for i in top_anomaly_indices[1:]:
    img2 = plt.imread(train_dir_path + train_df.img[i])
    img1 = np.concatenate((img1, img2), axis=1)
plt.imshow(img1)

## удаление нерелевантных данных

## проверка на присутствие несуществующих классов и их удаление

создание списка со всеми классами

In [None]:
lst = []
for i in tqdm(train_df.labels):
    for j in i:
        lst.append(j)
len(lst)

In [None]:
sns.barplot(data=Counter(lst))
Counter(lst)

в данных присутсвуют несуществующие класссы, удалим их

## удаление данных с несуществующими классами

In [None]:
train_df = train_df[train_df.labels.apply(lambda x: (x > 5).sum() == 0)]

In [None]:
train_df.shape

In [None]:
lst = []
for i in tqdm(train_df.labels):
    for j in i:
        lst.append(j)
len(lst)

In [None]:
set(lst)

несуществующий класс был удален

## визуальный анализ соотношения классов

In [None]:
sns.barplot(data=Counter(lst))
Counter(lst)

## подсчет весов классов

In [None]:
weight = compute_class_weight(class_weight='balanced', classes=np.unique(lst), y=lst)
weight = torch.FloatTensor(weight).to(device)
weight

## аугментация

In [None]:
transform = A.Compose([
    A.Resize(256, 256),

    A.RandomBrightnessContrast(p=0.5),
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),

    A.Normalize(mean=[0.407, 0.457, 0.485], std=[1,1,1]),
])

создание датасета с найденными и удаленными выбросами и аугментацией

In [None]:
class Dataset(Dataset):
    def __init__(self, img_names, labels, coords, dir_path = '', transform=None):
        super().__init__()
        self.names = img_names
        self.coords = coords
        self.labels = labels
        self.dir_path = dir_path
        self.transform = transform

    def __len__(self):
        return len(self.names)

    def __getitem__(self, ind):

        #img = read_image(self.dir_path + self.names[ind])
        img = np.array(Image.open(self.dir_path + self.names[ind]))

        if self.transform:
            augmentations = self.transform(image=img)
            img = augmentations["image"]

        img = np.transpose(img, (2, 0, 1))
        img = torch.from_numpy(img)
        #img /= 255

        return img, self.labels[ind], self.coords[ind]

In [None]:
train_dir_path = 'vcs-data/train+val/train/images/'

trainset = Dataset(
    img_names = train_df['img'].tolist(),
    labels = train_df['labels'].tolist(),
    coords = train_df['coords'].tolist(),
    dir_path = train_dir_path,
    transform = transform
)

In [None]:
trainset[0]

In [None]:
trainset[0][0].shape

создание даталодера

In [None]:
trainloader = DataLoader(trainset, batch_size=1)