In [None]:
# install library
# !pip install -q timm
# !pip install -q albumentations
# !pip install -q ultralytics

In [None]:
import os
import cv2
import sys
import random
import numpy as np
import pandas as pd
from tqdm import tqdm
from glob import glob
from copy import deepcopy
from einops import rearrange

import timm
import torch
import torch.nn as nn
import albumentations as A
import torch.optim as optim
from ultralytics import YOLO
import torch.nn.functional as F
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from albumentations.pytorch.transforms import ToTensorV2

from sklearn.model_selection import train_test_split
from skimage.feature import graycomatrix

In [None]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

In [None]:
def get_co_occur_mat(img):
    gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    r_channel = img[:, :, 2]
    g_channel = img[:, :, 1]
    b_channel = img[:, :, 0]
    
    distance = [1]
    angles = [0, np.pi/4, np.pi/2, 3*np.pi/4]
    red_matrix = graycomatrix(r_channel, distances=distance, angles=angles, symmetric=True, normed=True)
    green_matrix = graycomatrix(g_channel, distances=distance, angles=angles, symmetric=True, normed=True)
    blue_matrix = graycomatrix(b_channel, distances=distance, angles=angles, symmetric=True, normed=True)
    combined_matrix = np.concatenate([red_matrix, green_matrix, blue_matrix], axis=-1)
    result = combined_matrix.squeeze(2)
    return result

In [None]:
class CustomModel(nn.Module):
    def __init__(self, num_classes=1):
        super().__init__()
        self.effi = timm.create_model("efficientnet_b0", pretrained=True)
        self.effi.conv_stem = nn.Conv2d(15, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        self.effi.bn2 = nn.Conv2d(1280,64,kernel_size =1, stride =1)
        self.effi.drop = nn.Identity()
        self.effi.act = nn.Identity()
        self.effi.global_pool = nn.Identity()
        self.effi.classifier = nn.Flatten()
        self.fc = nn.Sequential(
            nn.Linear(4096,1000),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.3),
            nn.Linear(1000, num_classes)
        )
    
    def forward(self, x):
        x = self.effi(x)
        x = self.effi.global_pool(x)
        x = self.effi.classifier(x)
        x = self.fc(x)
        x = F.sigmoid(x)
        return x

In [None]:
class CustomDataset(Dataset):
    def __init__(self, video_paths: list, labels: list=None, mode="train", n=1, randomness=False):
        assert mode in ['train', 'test', 'validation','val']
        self.video_paths = video_paths
        self.labels = labels
        self.mode = mode
        self.n = n
        self.randomness = randomness
        self.yolo = torch.hub.load('ultralytics/yolov5', 'yolov5s')
        t = [A.Resize(480,480),
             A.CenterCrop(256,256,p=1),
             A.Normalize(0.5,0.5),
             ToTensorV2()]
        if mode == 'train':
            t = [A.Resize(480,480),
                 A.CenterCrop(256,256,p=1),
                 A.HorizontalFlip(p=0.5),
                 A.Normalize(0.5,0.5),
                 ToTensorV2()]
        self.transform = A.Compose(t)
        self.crop = A.Compose([A.CenterCrop(180,180,p=1),
                               A.Resize(256,256),
                               A.Normalize(0.5,0.5),
                               ToTensorV2()])
        
    def __len__(self):
        return len(self.video_paths)
    
    def _get_co_occurrence_matrix(self,frames):
        frames = torch.squeeze(frames, 0)# (3,256,256)

        # 이미지를 그레이스케일로 변환
        transform = transforms.Grayscale()
        gray_image = transform(frames)

        # PyTorch Tensor를 NumPy 배열로 변환하고 [0, 255] 범위로 스케일 조정
        gray_image_np = (gray_image.numpy() * 255).astype(np.uint8)

        # Co-occurrence matrix 계산
        distances = [1, 2, 3]  # 거리 설정
        angles = [0, np.pi/4, np.pi/2, 3*np.pi/4]  # 방향 설정

        co_occurrence_matrices = []
        for channel in range(gray_image_np.shape[0]):
            co_occurrence_matrix = graycomatrix(gray_image_np[channel], distances=distances, angles=angles, levels=256, symmetric=True, normed=True)
            co_occurrence_matrices.append(co_occurrence_matrix)

        # 각 채널별로 계산된 co-occurrence matrix를 합치기
        co_occurrence_matrix_combined = np.stack(co_occurrence_matrices, axis=0) # (1, 256, 256, 3, 4)
        co_occurrence_matrix_combined = torch.from_numpy(co_occurrence_matrix_combined)
        co_occurrence_matrix_combined = torch.squeeze(co_occurrence_matrix_combined, 0)
        co_occurrence_matrix_combined = rearrange(co_occurrence_matrix_combined, 'w h x y -> (x y) w h') # (12,256,256)
        co_occurrence_matrix_combined = torch.unsqueeze(co_occurrence_matrix_combined ,0)

        return co_occurrence_matrix_combined
    
    def _get_video_frames(self, cap):
        num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        frames = []
        
        trial = 0
        while len(frames) < self.n:
            if self.randomness:
                move_to = random.randint(1, num_frames-10*self.n)
                cap.set(cv2.CAP_PROP_POS_FRAMES, move_to)
            ret, frame = cap.read()
            if ret: # 프레임 존재
                results = self.yolo([frame])
                df = results.pandas().xyxy[0]
                df = df[df['name']=='person']
                if len(df) >= 1:
                    xmin, ymin, xmax, ymax, _,_,_ = df.iloc[0]
                    xmin, ymin, xmax, ymax = int(xmin), int(ymin), int(xmax), int(ymax)
                    frame = frame[ymin:ymax,xmin:xmax,:]
                    frame = self.transform(image=frame)['image']
                    frames.append(frame)
                else: # 프레임이 존재하나 욜로 모델이 검출을 못함
                    trial += 1
                    move_to = random.randint(1, num_frames-10*self.n)
                    cap.set(cv2.CAP_PROP_POS_FRAMES, move_to)
                    if trial == 3:
                        trial = 0
                        ret, frame = cap.read()
                        frame = self.crop(image=frame)['image']
                        frames.append(frame)
                    else:
                        continue
            else:
                trial += 1
                move_to = random.randint(1, num_frames-10*self.n)
                cap.set(cv2.CAP_PROP_POS_FRAMES, move_to)
                if trial == 3:
                    trial = 0
                    ret, frame = cap.read()
                    frame = self.crop(image=frame)['image']
                    frames.append(frame)
                else:
                    continue
        frames = torch.stack(frames) # (1,3,256,256)
        co = self._get_co_occurrence_matrix(frames)
        frames = torch.cat([frames, co], dim=1)
        frames = frames.detach().clone()
        return frames
    
    def __getitem__(self, index):
        video_path = self.video_paths[index]
        # 랜덤 프레임 가져오기
        cap = cv2.VideoCapture(video_path)
        frames = self._get_video_frames(cap)
        cap.release()
        if self.mode == 'test':
            return frames
        else:
            return frames, self.labels[index]

In [None]:
class Trainer:
    def __init__(self):
        self.train_path = '/mnt/elice/dataset/train'
        self.test_path = '/mnt/elice/dataset/test'
        self.submission_csv = "./sample_submission_v0.csv"
        self.save_path = "./best_effi3.pth"
        self.EPOCHS = 6
        self.LR = 0.001
        self.BATCH_SIZE = 16
        self.MAX_NORM = 5
        self.NUM_WORKERS = 0
        self.NUM_CLASSES = 1
        self.DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
        self.SEED = 777

    def setup(self):
        
        seed_everything(self.SEED)
        # 재현성 위해 sorting
        train_fakes = sorted(glob(f"{self.train_path}/fake/*"))
        train_reals = sorted(glob(f"{self.train_path}/real/*"))
        self.submit = pd.read_csv(self.submission_csv)
        x_test = [os.path.join(self.test_path, path) for path in self.submit["path"].values]
        # test_video_paths = sorted(glob(f"{self.train_path}/*"))
        # fake이면 1 real이면 0으로 할당
        train_video_paths = train_fakes + train_reals
        labels = [1 for _ in range(len(train_fakes))] + [0 for _ in range(len(train_reals))]
        x_train, x_val, y_train, y_val = train_test_split(
            train_video_paths,
            labels,
            test_size=0.2,
            random_state=self.SEED
        )
        
        train_dataset = CustomDataset(video_paths=x_train, labels=y_train, mode="train")
        val_dataset = CustomDataset(video_paths=x_val, labels=y_val, mode="val")
        test_dataset = CustomDataset(video_paths=x_test, labels=None, mode="test")
        
        self.train_dataloader = DataLoader(
            dataset=train_dataset,
            batch_size=self.BATCH_SIZE,
            shuffle=True,
            num_workers=self.NUM_WORKERS,
            pin_memory=True
        )
        
        self.val_dataloader = DataLoader(
            dataset=val_dataset, 
            batch_size=self.BATCH_SIZE,
            shuffle=False
        )
        
        self.test_dataloader = DataLoader(
            dataset=test_dataset, 
            batch_size=self.BATCH_SIZE,
            shuffle=False
        )
        
        self.model = CustomModel()
        self.loss_fn = nn.BCELoss()
        
        
    def train(self):
        self.model.to(self.DEVICE)
        optimizer = optim.AdamW(params=self.model.parameters(), lr=self.LR, weight_decay=1e-3)
        lr_scheduler = optim.lr_scheduler.ReduceLROnPlateau(
            optimizer=optimizer,
            mode='max',
            factor=0.5,
            patience=3,
            cooldown=5,
            min_lr=1e-9,
            threshold_mode='abs',
        )
        
        best_val_acc = 0
        best_model = None
        
        for epoch in range(1, self.EPOCHS+1):
            self.model.train()
            train_losses = []
            for imgs, labels in tqdm(self.train_dataloader):
                imgs = torch.squeeze(imgs, 1)
                imgs = imgs.float().to(self.DEVICE) # (b,3,h,w)
                labels = labels.float().to(self.DEVICE)
                
                optimizer.zero_grad()
                # with torch.cuda.amp.autocast():
                output = self.model(imgs)
                output = output.squeeze(-1)
                loss = self.loss_fn(output, labels)
                
                loss.backward()
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.MAX_NORM)
                optimizer.step()
                
                train_losses.append(loss.item())
                

            val_loss, val_acc = self._valid()
            train_loss = np.mean(train_losses)
            print(f"EPOCH: {epoch}, TRAIN LOSS: {train_loss:.4f}, VAL LOSS: {val_loss:.4f}, VAL ACC: {val_acc:.4f}")

            if lr_scheduler is not None:
                lr_scheduler.step(val_acc)

            if best_val_acc <= val_acc:
                best_val_acc = val_acc
                best_model = deepcopy(self.model)
                # save best model
                torch.save(self.model.state_dict(), self.save_path)
                early_stop = 0
            else:
                early_stop += 1

            if early_stop > 7:
                break
    
    def _valid(self):
        self.model.eval()
        val_losses = []
        val_accs = []
        with torch.no_grad():
            for imgs, labels in tqdm(self.val_dataloader):
                imgs = torch.squeeze(imgs, 1)
                imgs = imgs.float().to(self.DEVICE)
                labels = labels.float().to(self.DEVICE)
                
                probs = self.model(imgs)
                probs = probs.squeeze(-1)
                loss = self.loss_fn(probs, labels)
                probs = probs.cpu().detach().numpy()
                labels = labels.cpu().detach().numpy()

                preds = probs > 0.5
                batch_acc = (labels == preds).mean()
                val_accs.append(batch_acc)
                val_losses.append(loss.item())
        
        return np.mean(val_losses), np.mean(val_accs)
    
    def test(self, threshold=0.5):
        answer_lst = []
        logit_lst = []
        logit_df = deepcopy(self.submit)
        # model load
        model = CustomModel(self.NUM_CLASSES).to(self.DEVICE)
        model.load_state_dict(torch.load(self.save_path))
        model.eval()
        with torch.no_grad():
            for imgs in tqdm(self.test_dataloader):
                imgs = torch.squeeze(imgs, 1)
                imgs = imgs.float().to(self.DEVICE)

                probs = model(imgs)
                probs = probs.squeeze(-1)
                probs = probs.cpu().detach().numpy()
                logit_lst.append(probs[0])

                preds = 1 if probs > threshold else 0
                cur_ans.append(preds)
                cnt_1 = cur_ans.count(1)
                cnt_0 = cur_ans.count(0)
                ans = 1 if cnt_1 > cnt_0 else 0
                answer_lst.append(ans)

            self.submit["label"] = answer_lst
            self.submit["label"] = self.submit["label"].apply(lambda x: "fake" if x else "real")
            self.submit.to_csv("sample_submissionv1.csv", index=False)
            
            logit_df['label'] = logit_lst
            logit_df.to_csv("sample_submission_logitv1.csv", index=False)

In [None]:
trainer = Trainer()
trainer.setup()

In [None]:
trainer.train()

In [None]:
trainer.test()