In [6]:
from pathlib import Path
from PIL import Image, UnidentifiedImageError
from sklearn.model_selection import train_test_split
import csv
from PIL import Image
import numpy as np

from PIL import Image, UnidentifiedImageError

from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torch

import torch.nn as nn
import torch.nn.functional as F

from torch.optim import AdamW

from sklearn.metrics import accuracy_score, precision_score, recall_score

# УКАЖИ СВОИ ПУТИ
AI_DIR   = "/Users/davidmarshalov/Desktop/Мага/ML/архитектура НС/archive/AiArtData/AiArtData"
REAL_DIR = "/Users/davidmarshalov/Desktop/Мага/ML/архитектура НС/archive/RealArt/RealArt"

IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".webp"}

В этом файле реализован полный пайплайн классификации изображений (AI-арт vs Real): 

-загрузка и разбиение данных 

-аугментации

-PyTorch Dataset/DataLoader, обучение собственной компактной CNN 

-оценка качества по Accuracy, Precision, Recall

1 - ПОДГОТОВКА ДАТАСЕТА

In [21]:
#rglob - ищет все подряд идухие папки соответсвтующие шаблонуну (все элементы)
ai_all   = [p for p in Path(AI_DIR).rglob("*")   if p.is_file() and p.suffix.lower() in IMG_EXTS]
real_all = [p for p in Path(REAL_DIR).rglob("*") if p.is_file() and p.suffix.lower() in IMG_EXTS]

print("Найдено AI:", len(ai_all))
print("Найдено Real:", len(real_all))

Найдено AI: 539
Найдено Real: 434


In [22]:
#проверка на читаемость
ai_ok = []
for p in ai_all:
    try:
        Image.open(p).convert("RGB")  
        ai_ok.append(p)
    except (UnidentifiedImageError, OSError):
        pass  

real_ok = []
for p in real_all:
    try:
        Image.open(p).convert("RGB")
        real_ok.append(p)
    except (UnidentifiedImageError, OSError):
        pass

print("Годных AI:", len(ai_ok))
print("Годных Real:", len(real_ok))




Годных AI: 539
Годных Real: 434


In [23]:
X = ai_ok + real_ok
y = [1]*len(ai_ok) + [0]*len(real_ok)

In [24]:
X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

In [25]:
#цеполчка из трансформаций
val_tfms = transforms.Compose([
    transforms.Resize((224, 224)), #безопаснео кодирование без потери смысла
    transforms.ToTensor(),

])
#Аугментации
train_tfms = transforms.Compose([
    transforms.Resize((224, 224)),                    # выравниваем масштаб
    transforms.RandomResizedCrop(224, scale=(0.9, 1.0)),  # лёгкое кадрирование без потери смысла
    transforms.RandomHorizontalFlip(p=0.5),           # зеркалим по горизонтали
    transforms.RandomRotation(degrees=10),            # небольшой поворот (камерный «наклон»)
    transforms.ToTensor(),
])


In [33]:
#dataset и dataloader 

#dataset хранит список мутей и меток и знает как читьь и-тую картинку и какие преобразования применитб
#dataloader берет из dataset батчи перемешавает и может параллельно  читаь

from PIL import Image, UnidentifiedImageError

class ImagePathsDataset(Dataset):

    """
    paths  — список путей (X_train / X_val)
    labels — список меток (y_train / y_val), AI=1, Real=0
    tfms   — цепочка трансформаций (train_tfms или val_tfms)
    """

    def __init__(self, paths, labels, tfms):
        self.paths = paths
        self.labels = labels
        self.tfms = tfms

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        p = self.paths[idx]
        y = self.labels[idx]
        try:
            with Image.open(p) as im:
                im = im.convert("RGB")
        except (UnidentifiedImageError, OSError) as e:
            print(f"[WARN] Bad image: {p} ({e}) — skipping")
            return self.__getitem__((idx + 1) % len(self))
        img = self.tfms(im)
        return img, y

    
train_ds = ImagePathsDataset(X_train, y_train, train_tfms)
val_ds   = ImagePathsDataset(X_val,   y_val,   val_tfms)


In [34]:
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True,  num_workers=2, pin_memory=True)
val_loader   = DataLoader(val_ds,   batch_size=32, shuffle=False, num_workers=2, pin_memory=True)

train_loader = DataLoader(train_ds, batch_size=32, shuffle=True,  num_workers=0, pin_memory=False)
val_loader   = DataLoader(val_ds,   batch_size=32, shuffle=False, num_workers=0, pin_memory=False)


#shuffle - перемешивает порядок в каждую этоху
#num_workers - параллельно читает файлы

In [35]:
#Реализация неболшой cnn модели с 4 сверточными блоками 

# ВХОД размером Размер батча * 3* 224 *224

class CNN(nn.Module):
    def __init__(self, num_classes=1):
        super().__init__()
        # БЛОК 1 3×224×224 → 32×112×112
        #3 - сколько входных каналов у данных так как rgb то это 3 
        #32 - сколько фильтров на данном шаге собираемся обучить (B, 32, H_out W_out)
        #размер окна - 3*3
        #паддтнг - 1 - добавление рамки из нулей ширииной 1 чьтбы сохранить размерность и не обрезовать инфу у краев
        #параметров - 32*(3*3*3) - 864 (out_channels, in_channels, kernel_h, kernel_w)
        #bias - не добавляется смещение так как стоит batchnorm  для экономии
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1, bias=False)
        #для каждого канала считает по батчку среднее и дисперсию (по осям N,H,W) и нормирует а затем применяет обучаемые параметры( масшаб и сдвиг) по одному каналу
        self.bn1   = nn.BatchNorm2d(32)
        #берет макс в акждом непересекабщемся окне 2 на 2
        self.pool1 = nn.MaxPool2d(2)

        # БЛОК 2 32×112×112 → 64×56×56
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1, bias=False)
        self.bn2   = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(2)

        # БЛОК 3 64×56×56 → 128×28×28
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1, bias=False)
        self.bn3   = nn.BatchNorm2d(128)
        self.pool3 = nn.MaxPool2d(2)

        # БЛОК 4 128×28×28 → 256×14×14
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, padding=1, bias=False)
        self.bn4   = nn.BatchNorm2d(256)
        self.pool4 = nn.MaxPool2d(2)

        # Глобальный усреднительный пуллинг: 256×14×14 → 256×1×1
        self.gap = nn.AdaptiveAvgPool2d((1,1))

        # Классификатор (голова)    
        self.head = nn.Sequential(
            nn.Dropout(0.3), #случайно обнуляет 340 процентов компонент вектора - чтобы ен переобучиться
            nn.Linear(256, 256),
            nn.GELU(),
            nn.Dropout(0.3),
            nn.Linear(256, 128),
            nn.GELU(),
            nn.Dropout(0.2),
            nn.Linear(128, 1)# логит
        )

    def forward(self, x):
        x = self.pool1(F.relu(self.bn1(self.conv1(x))))
        x = self.pool2(F.relu(self.bn2(self.conv2(x))))
        x = self.pool3(F.relu(self.bn3(self.conv3(x))))
        x = self.pool4(F.relu(self.bn4(self.conv4(x))))
        x = self.gap(x)                 # усредняет каждую карту 14 на 14 в одно число итого форма (B×256×1×1)
        x = x.view(x.size(0), -1)       # B×256
        logits = self.head(x)           # B×1
        return logits.squeeze(1)        # (B,)


In [29]:
#лосс, оптимизатор
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") #выбор устройства вычислений
model = CNN().to(device) #создаем модель и переносим ее на выбранное устрйосвтво

# функция потерь для бинарной классификации. Ожидает логиты (сырые и внутри применяет сигмоиду + бинаруню кросс энтропию
#так как кассы несбалансированны - задаю штраф за ошибки по редкому классу
N_pos = 539
N_neg = 434
pos_weight = torch.tensor([N_neg / N_pos], device=device)
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)

optimizer = AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)


In [36]:
def train_one_epoch(model, loader, optimizer, criterion, device):
    """
    1) модель в режим обучения
    2) по батчам: прямой проход → лосс → backward → шаг оптимизатора
    3) возвращаем средний train loss за эпоху
    """
    model.train()
    total_loss = 0.0

    for imgs, labels in loader:
        imgs = imgs.to(device)
        labels = torch.as_tensor(labels, dtype=torch.float32, device=device)

        optimizer.zero_grad()#обнуляю градиенты перед новым шагом
        logits = model(imgs)                 # прямой проход на выходе (B,) - сырые логиты
        loss = criterion(logits, labels)     # считаем лосс по батчу (среднее по батчу)
        loss.backward() #обратый проход
        optimizer.step() #обновляем параметры по накопленным градиентам

        total_loss += loss.item() * imgs.size(0)

    return total_loss / len(loader.dataset)


@torch.no_grad()
def evaluate(model, loader, criterion, device, threshold=0.5):
    """
    1) модель в режим eval (замораживает Dropout/BN статистики)
    2) считаем средний val loss
    3) собираем предсказания → переводим в вероятности через сигмоиду
    4) считаем метрики: Accuracy / Precision / Recall
    """
    model.eval()
    total_loss = 0.0
    all_logits = []
    all_labels = []

    for imgs, labels in loader:
        imgs = imgs.to(device)
        labels = torch.as_tensor(labels, dtype=torch.float32, device=device)

        logits = model(imgs)
        loss = criterion(logits, labels)
        total_loss += loss.item() * imgs.size(0)

        all_logits.append(logits.cpu())
        all_labels.append(labels.cpu())

    all_logits = torch.cat(all_logits).numpy()   # (N,)
    all_labels = torch.cat(all_labels).numpy()   # (N,)
    probs = 1 / (1 + np.exp(-all_logits))        # сигмоида
    preds = (probs >= threshold).astype(int)

    acc  = accuracy_score(all_labels, preds)
    prec = precision_score(all_labels, preds, zero_division=0)
    rec  = recall_score(all_labels, preds, zero_division=0)

    return total_loss / len(loader.dataset), acc, prec, rec


In [39]:
EPOCHS = 10  
best_val_loss = float("inf")
best_epoch = -1
best_metrics = None
best_state = None

for epoch in range(1, EPOCHS + 1):
    # === train ===
    model.train()
    total_loss = 0.0
    for imgs, labels in train_loader:
        imgs = imgs.to(device)
        labels = torch.as_tensor(labels, dtype=torch.float32, device=device)

        optimizer.zero_grad()
        logits = model(imgs)                      
        loss = criterion(logits, labels)         
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * imgs.size(0)

    train_loss = total_loss / len(train_loader.dataset)

    model.eval()
    val_total_loss = 0.0
    all_logits, all_labels = [], []
    with torch.no_grad():
        for imgs, labels in val_loader:
            imgs = imgs.to(device)
            labels = torch.as_tensor(labels, dtype=torch.float32, device=device)

            logits = model(imgs)
            loss = criterion(logits, labels)
            val_total_loss += loss.item() * imgs.size(0)

            all_logits.append(logits.cpu())
            all_labels.append(labels.cpu())


    all_logits = torch.cat(all_logits).numpy()
    all_labels = torch.cat(all_labels).numpy()
    probs = 1 / (1 + np.exp(-all_logits))        
    preds = (probs >= 0.5).astype(int)

    val_loss = val_total_loss / len(val_loader.dataset)
    acc  = accuracy_score(all_labels, preds)
    prec = precision_score(all_labels, preds, zero_division=0)
    rec  = recall_score(all_labels, preds, zero_division=0)

    print(f"Epoch {epoch:02d} | "
          f"train_loss={train_loss:.4f} | val_loss={val_loss:.4f} | "
          f"acc={acc:.4f} | prec={prec:.4f} | rec={rec:.4f}")

    # === save best by val_loss ===
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_epoch = epoch
        best_metrics = {"acc": acc, "prec": prec, "rec": rec, "val_loss": val_loss}
        best_state = {k: v.cpu() for k, v in model.state_dict().items()}

# === restore best, save to disk, print best metrics ===
if best_state is not None:
    model.load_state_dict({k: v.to(device) for k, v in best_state.items()})
    torch.save(model.state_dict(), "cnn_best.pth")

    print("\n=== ЛУЧШАЯ МОДЕЛЬ ===")
    print(f"Epoch: {best_epoch}")
    print(f"val_loss: {best_metrics['val_loss']:.4f}")
    print(f"Accuracy: {best_metrics['acc']:.4f}")
    print(f"Precision: {best_metrics['prec']:.4f}")
    print(f"Recall: {best_metrics['rec']:.4f}")
    print("Сохранено в: cnn_best.pth")




Epoch 01 | train_loss=0.4351 | val_loss=0.5889 | acc=0.7231 | prec=0.8068 | rec=0.6574




Epoch 02 | train_loss=0.4159 | val_loss=0.5575 | acc=0.7231 | prec=0.7935 | rec=0.6759




Epoch 03 | train_loss=0.3971 | val_loss=0.5853 | acc=0.6821 | prec=0.7255 | rec=0.6852




Epoch 04 | train_loss=0.3926 | val_loss=0.6200 | acc=0.6872 | prec=0.7423 | rec=0.6667




Epoch 05 | train_loss=0.4086 | val_loss=0.6412 | acc=0.6718 | prec=0.6618 | rec=0.8333




Epoch 06 | train_loss=0.4416 | val_loss=0.5451 | acc=0.6872 | prec=0.7238 | rec=0.7037




Epoch 07 | train_loss=0.4182 | val_loss=0.5819 | acc=0.7077 | prec=0.7684 | rec=0.6759




Epoch 08 | train_loss=0.3906 | val_loss=0.7572 | acc=0.6205 | prec=0.8696 | rec=0.3704




Epoch 09 | train_loss=0.4158 | val_loss=0.5494 | acc=0.7333 | prec=0.7979 | rec=0.6944




Epoch 10 | train_loss=0.4243 | val_loss=0.6171 | acc=0.7179 | prec=0.7573 | rec=0.7222

=== ЛУЧШАЯ МОДЕЛЬ ===
Epoch: 6
val_loss: 0.5451
Accuracy: 0.6872
Precision: 0.7238
Recall: 0.7037
Сохранено в: cnn_best.pth


In [None]:
#Модель показала средний уровень качества — точность около 68–73%, при этом precision примерно 0.72 и примерно recall ≈ 0.70 и говорят, 
#что сеть относительно хорошо различает классы, но всё ещё путается в части примеров.
#
#Скорее модель видит базовые различия но требуется тонкая настройка, возможно поможет усиление аугментации (что повысит обощающую способность мб)
#добавить по больше dropout чтобы модеь не переобучалась

####################### Буду рад вашим предложениям и рекомендациям по улучшению модели. ############################################