<a href="https://colab.research.google.com/github/AndriyF-Git/Deep_Learning_IPZ43/blob/main/%D0%9A%D0%BE%D0%BF%D1%96%D1%8F_%D0%B7%D0%B0%D0%BF%D0%B8%D1%81%D0%BD%D0%B8%D0%BA%D0%B0_%22DeepLearn_ipynb%22.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# !pip -q install -U kaggle
# !kaggle --version

# import os
# os.environ["KAGGLE_API_TOKEN"] = ""

# # тест
# !kaggle competitions list | head


In [None]:
# %%bash
# COMP="image-classification-real-or-ai-generated-photo"
# mkdir -p /content/$COMP
# kaggle competitions download -c $COMP -p /content/$COMP
# unzip -q /content/$COMP/*.zip -d /content/$COMP
# ls -lah /content/$COMP


In [None]:
# !pip -q install -U "numpy==2.0.2" "opencv-python-headless==4.12.0.88" timm scikit-learn "albumentations>=1.4.20"

In [None]:
# print(train_df[img_col].head(20).tolist())
# print(train_df[img_col].dtype)


In [None]:
# # подивитись 10 випадкових значень
# print(train_df[img_col].sample(10, random_state=42).tolist())


In [None]:
import os, random
import numpy as np
import pandas as pd
from tqdm.auto import tqdm

import cv2
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.cuda.amp import autocast, GradScaler

import timm
import albumentations as A
from albumentations.pytorch import ToTensorV2

import matplotlib.pyplot as plt
import numpy as np
import torch

from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import roc_auc_score


# ========== CONFIG ==========
SEED = 42
IMG_SIZE = 224
BATCH_SIZE = 16
EPOCHS = 5
LR = 1e-4
WD = 1e-2
MODEL_NAME = "tf_efficientnetv2_s"   # спробуй потім: "efficientnetv2_s"
NUM_WORKERS = 0
FOLDS_TO_TRAIN = 3  # для старту 2, потім постав 5

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("device:", device)

def seed_everything(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

seed_everything(SEED)


In [None]:
from google.colab import drive
drive.mount('/content/g2drive')

In [None]:
DATA_DIR = "/content/image-classification-real-or-ai-generated-photo"
TRAIN_DIR = os.path.join(DATA_DIR, "train")
TEST_DIR  = os.path.join(DATA_DIR, "test")

train_df = pd.read_csv(os.path.join(DATA_DIR, "train.csv"))
test_df  = pd.read_csv(os.path.join(DATA_DIR, "test.csv"))
sub_df   = pd.read_csv(os.path.join(DATA_DIR, "sample_submission.csv"))

# submission columns
id_col = sub_df.columns[0]
target_col = sub_df.columns[1]

# label col: беремо ту, якої нема в test.csv
candidate_labels = [c for c in train_df.columns if c not in test_df.columns]
if target_col in train_df.columns:
    label_col = target_col
elif len(candidate_labels) == 1:
    label_col = candidate_labels[0]
else:
    for c in ["label", "target", "class", "is_ai", "generated"]:
        if c in train_df.columns:
            label_col = c
            break
    else:
        raise ValueError(f"Can't infer label column. train cols: {train_df.columns.tolist()}")

# image col: спільна колонка, що не id і не label
img_col = None
for c in train_df.columns:
    if c != label_col and c in test_df.columns and c != id_col:
        img_col = c
        break
if img_col is None:
    img_col = [c for c in train_df.columns if c != label_col][0]

print("id_col:", id_col)
print("img_col:", img_col)
print("label_col:", label_col)
print("submit target_col:", target_col)

import glob
import os

EXTS = [".jpg", ".jpeg", ".png", ".webp"]

def resolve_img_path(p, is_train: bool):
    base = TRAIN_DIR if is_train else TEST_DIR
    p = str(p)

    # 1) якщо це вже існуючий шлях
    if os.path.exists(p):
        return p

    # 2) якщо CSV містить "train/..." або "test/..."
    cand = os.path.join(DATA_DIR, p)
    if os.path.exists(cand):
        return cand

    # 3) пробуємо як "base + p" (якщо p вже з розширенням/папками)
    cand = os.path.join(base, p)
    if os.path.exists(cand):
        return cand

    # 4) якщо p без розширення — пробуємо додати розширення
    root, ext = os.path.splitext(p)
    if ext == "":
        for e in EXTS:
            cand = os.path.join(base, p + e)
            if os.path.exists(cand):
                return cand

        # 5) якщо файли у підпапках — шукаємо рекурсивно
        # (обережно: glob трохи повільніший, але як fallback — норм)
        for e in EXTS:
            hits = glob.glob(os.path.join(base, "**", p + e), recursive=True)
            if hits:
                return hits[0]

    # 6) останній шанс: шукаємо за basename як є (коли p = "306.jpg", але лежить в підпапці)
    hits = glob.glob(os.path.join(base, "**", os.path.basename(p)), recursive=True)
    if hits:
        return hits[0]

    return os.path.join(base, p)  # повертаємо “очікуваний” шлях для нормальної помилки


# sanity check
print("train sample path:", resolve_img_path(train_df.loc[0, img_col], True))
print("test sample path:", resolve_img_path(test_df.loc[0, img_col], False))


In [None]:
train_tfms = A.Compose([
    A.LongestMaxSize(max_size=IMG_SIZE),
    A.PadIfNeeded(min_height=IMG_SIZE, min_width=IMG_SIZE, border_mode=cv2.BORDER_REFLECT_101),

    A.RandomResizedCrop(size=(IMG_SIZE, IMG_SIZE), scale=(0.75, 1.0), ratio=(0.9, 1.1)),

    A.HorizontalFlip(p=0.5),
    A.ColorJitter(p=0.5),
    A.GaussianBlur(blur_limit=(3, 7), p=0.1),
    # A.ImageCompression(quality_lower=60, quality_upper=100, p=0.35),
    A.Normalize(),
    ToTensorV2(),
])

valid_tfms = A.Compose([
    A.LongestMaxSize(max_size=IMG_SIZE),
    A.PadIfNeeded(min_height=IMG_SIZE, min_width=IMG_SIZE, border_mode=cv2.BORDER_REFLECT_101),

    A.CenterCrop(height=IMG_SIZE, width=IMG_SIZE),

    A.Normalize(),
    ToTensorV2(),
])


In [None]:
class ImgDataset(Dataset):
    def __init__(self, df, is_train=True, tfms=None):
        self.df = df.reset_index(drop=True)
        self.is_train = is_train
        self.tfms = tfms

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        path = resolve_img_path(row[img_col], self.is_train)

        img = cv2.imread(path)
        if img is None:
            raise FileNotFoundError(f"Can't read image: {path}")
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        if self.tfms is not None:
            img = self.tfms(image=img)["image"]

        if self.is_train:
            y = float(row[label_col])
            y = torch.tensor([y], dtype=torch.float32)
            return img, y
        else:
            return img, row[id_col]


In [None]:
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED)
train_df["fold"] = -1
for fold, (_, val_idx) in enumerate(skf.split(train_df, train_df[label_col].astype(int))):
    train_df.loc[val_idx, "fold"] = fold

train_df["fold"].value_counts().sort_index()


In [None]:
def build_model():
    return timm.create_model(MODEL_NAME, pretrained=True, num_classes=1)

def train_fold(fold=0, save_best_by="loss"):
    """
    save_best_by: "loss" (min valid_loss) або "auc" (max valid_auc)
    """
    assert save_best_by in ["loss", "auc"]

    tr = train_df[train_df.fold != fold].copy()
    va = train_df[train_df.fold == fold].copy()

    dl_tr = DataLoader(
        ImgDataset(tr, True, train_tfms),
        batch_size=BATCH_SIZE, shuffle=True,
        num_workers=NUM_WORKERS, pin_memory=False
    )
    dl_va = DataLoader(
        ImgDataset(va, True, valid_tfms),
        batch_size=BATCH_SIZE, shuffle=False,
        num_workers=NUM_WORKERS, pin_memory=False
    )

    model = build_model().to(device)
    opt = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WD)
    sched = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=EPOCHS)
    loss_fn = nn.BCEWithLogitsLoss()

    use_amp = (device.type == "cuda")
    scaler = GradScaler(enabled=use_amp)

    # best tracking
    best_loss = 1e18
    best_auc = -1e18
    best_path = f"/content/best_fold{fold}_{save_best_by}.pt"

    history = {
        "fold": [],
        "epoch": [],
        "train_loss": [],
        "valid_loss": [],
        "valid_acc": [],
        "valid_auc": [],
        "lr": [],
    }

    for ep in range(EPOCHS):
        # -------- train --------
        model.train()
        tr_losses = []

        for x, y in tqdm(dl_tr, desc=f"fold{fold} ep{ep+1}/{EPOCHS} train"):
            x, y = x.to(device), y.to(device)
            opt.zero_grad(set_to_none=True)

            with autocast(enabled=use_amp):
                logits = model(x)
                loss = loss_fn(logits, y)

            if use_amp:
                scaler.scale(loss).backward()
                scaler.step(opt)
                scaler.update()
            else:
                loss.backward()
                opt.step()

            tr_losses.append(loss.item())

        # -------- valid --------
        model.eval()
        va_losses = []
        probs_all = []
        targets_all = []

        with torch.no_grad():
            for x, y in tqdm(dl_va, desc=f"fold{fold} ep{ep+1}/{EPOCHS} valid"):
                x, y = x.to(device), y.to(device)

                with autocast(enabled=use_amp):
                    logits = model(x)
                    loss = loss_fn(logits, y)

                va_losses.append(loss.item())

                probs = torch.sigmoid(logits).detach().cpu().numpy().reshape(-1)
                targets = y.detach().cpu().numpy().reshape(-1)

                probs_all.append(probs)
                targets_all.append(targets)

        tr_loss = float(np.mean(tr_losses))
        va_loss = float(np.mean(va_losses))

        probs_all = np.concatenate(probs_all)
        targets_all = np.concatenate(targets_all).astype(int)

        preds = (probs_all >= 0.5).astype(int)
        va_acc = float((preds == targets_all).mean())

        # AUC може впасти лише якщо раптом один клас у fold (у тебе 50/50, тож має бути ок)
        try:
            va_auc = float(roc_auc_score(targets_all, probs_all))
        except ValueError:
            va_auc = float("nan")

        current_lr = opt.param_groups[0]["lr"]

        history["fold"].append(fold)
        history["epoch"].append(ep + 1)
        history["train_loss"].append(tr_loss)
        history["valid_loss"].append(va_loss)
        history["valid_acc"].append(va_acc)
        history["valid_auc"].append(va_auc)
        history["lr"].append(current_lr)

        print(
            f"fold{fold} ep{ep+1}: "
            f"train_loss={tr_loss:.4f} valid_loss={va_loss:.4f} "
            f"valid_acc={va_acc:.4f} valid_auc={va_auc:.4f} lr={current_lr:.2e}"
        )

        # save best
        if save_best_by == "loss":
            if va_loss < best_loss:
                best_loss = va_loss
                torch.save(model.state_dict(), best_path)
        else:  # "auc"
            if (not np.isnan(va_auc)) and (va_auc > best_auc):
                best_auc = va_auc
                torch.save(model.state_dict(), best_path)

        sched.step()

    return best_path, pd.DataFrame(history)




In [None]:
model_paths = []
metrics_dfs = []

for fold in range(FOLDS_TO_TRAIN):
    best_path, df_hist = train_fold(fold, save_best_by="loss")
    model_paths.append(best_path)
    metrics_dfs.append(df_hist)

metrics = pd.concat(metrics_dfs, ignore_index=True)
metrics.to_csv("/content/metrics_history.csv", index=False)
print("Saved metrics:", "/content/metrics_history.csv")
model_paths


In [None]:
@torch.no_grad()
def predict_test(model_paths):
    dl_te = DataLoader(
        ImgDataset(test_df, False, valid_tfms),
        batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, pin_memory=False
    )

    all_ids = None
    all_preds = None
    use_amp = (device.type == "cuda")

    for mp in model_paths:
        model = build_model().to(device)
        model.load_state_dict(torch.load(mp, map_location=device))
        model.eval()

        preds = []
        ids = []
        for x, img_id in tqdm(dl_te, desc=f"infer {os.path.basename(mp)}"):
            x = x.to(device)
            with autocast(enabled=use_amp):
              logits = model(x)
            logits = logits.float().cpu().numpy().reshape(-1)
            prob = 1 / (1 + np.exp(-logits))
            preds.append(prob)
            ids.extend(list(img_id))

        preds = np.concatenate(preds)
        if all_preds is None:
            all_preds = preds
            all_ids = ids
        else:
            all_preds += preds

    all_preds /= len(model_paths)
    return pd.DataFrame({id_col: all_ids, target_col: all_preds})

pred_df = predict_test(model_paths)

submission = sub_df[[id_col, target_col]].merge(pred_df, on=id_col, how="left", suffixes=("", "_pred"))
submission[target_col] = submission[f"{target_col}_pred"].fillna(0.5)
submission = submission[[id_col, target_col]]

out_path = "/content/submission.csv"
submission.to_csv(out_path, index=False)

print("Saved:", out_path)
submission.head()


In [None]:
avg = metrics.groupby("epoch")[["train_loss","valid_loss","valid_acc","valid_auc"]].mean().reset_index()

plt.figure()
plt.plot(avg["epoch"], avg["train_loss"], label="train_loss_mean")
plt.plot(avg["epoch"], avg["valid_loss"], label="valid_loss_mean")
plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.title("Mean Loss across folds"); plt.legend(); plt.show()

plt.figure()
plt.plot(avg["epoch"], avg["valid_acc"], label="valid_acc_mean")
plt.xlabel("Epoch"); plt.ylabel("Accuracy"); plt.title("Mean Validation Accuracy across folds"); plt.legend(); plt.show()

plt.figure()
plt.plot(avg["epoch"], avg["valid_auc"], label="valid_auc_mean")
plt.xlabel("Epoch"); plt.ylabel("AUC"); plt.title("Mean Validation AUC across folds"); plt.legend(); plt.show()


In [None]:
SAVE_DIR = "/content/g2drive/MyDrive/kaggle_real_vs_ai"
os.makedirs(SAVE_DIR, exist_ok=True)

metrics.to_csv(f"{SAVE_DIR}/metrics_history.csv", index=False)
best_by_fold.to_csv(f"{SAVE_DIR}/best_by_fold.csv", index=False)

# якщо хочеш зберегти моделі:
for p in model_paths:
    !cp -f "{p}" "{SAVE_DIR}/"
print("Saved to:", SAVE_DIR)


In [None]:
from IPython.display import display

best_by_fold = (
    metrics.sort_values(["fold", "valid_loss"])
           .groupby("fold", as_index=False)
           .first()
           .loc[:, ["fold","epoch","train_loss","valid_loss","valid_acc","valid_auc","lr"]]
)

display(
    best_by_fold.style.format({
        "train_loss":"{:.4f}",
        "valid_loss":"{:.4f}",
        "valid_acc":"{:.2%}",
        "valid_auc":"{:.4f}",
        "lr":"{:.2e}"
    })
)

summary = pd.DataFrame({
    "valid_loss_mean±std": [f"{best_by_fold['valid_loss'].mean():.4f} ± {best_by_fold['valid_loss'].std(ddof=1):.4f}"],
    "valid_acc_mean±std":  [f"{best_by_fold['valid_acc'].mean():.2%} ± {best_by_fold['valid_acc'].std(ddof=1):.2%}"],
    "valid_auc_mean±std":  [f"{best_by_fold['valid_auc'].mean():.4f} ± {best_by_fold['valid_auc'].std(ddof=1):.4f}"],
})
display(summary)


In [None]:
import os, shutil, time

SAVE_DIR = "/content/drive/MyDrive/real_vs_ai_saved"
os.makedirs(SAVE_DIR, exist_ok=True)

# збережемо чекпойнти
for p in model_paths:
    dst = os.path.join(SAVE_DIR, os.path.basename(p))
    shutil.copy2(p, dst)
    print("Saved:", dst)

# збережемо метрики, якщо є
if "metrics" in globals():
    metrics_path = os.path.join(SAVE_DIR, "metrics_history.csv")
    metrics.to_csv(metrics_path, index=False)
    print("Saved:", metrics_path)

# збережемо графіки, якщо ти їх вже робив як loss_curve.png і т.д.
for fn in ["loss_curve.png", "accuracy_curve.png", "auc_curve.png"]:
    src = f"/content/{fn}"
    if os.path.exists(src):
        dst = os.path.join(SAVE_DIR, fn)
        shutil.copy2(src, dst)
        print("Saved:", dst)

print("\nAll saved to:", SAVE_DIR)


In [None]:
train_df[label_col].value_counts(normalize=True)
