In [None]:
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import DataLoader
from Dataset.NSRVideoDataset import NSRVideoDataset
from easydict import EasyDict
from tqdm import tqdm
import pandas as pd
import numpy as np
import random
import torch
import time
import os

In [None]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

In [None]:
args = EasyDict({
    # 'dataset_path' : r"D:\Video-Dataset\2022-NSR-voting-before-preprocess", # root directory path (for setting thresholding value)
    'dataset_path' : r"D:\Video-Dataset\2023-NSR-original", # new dataset (for test all_segmentation)
    'split' : (0.8, 0.1), # train/validation, validation/train
    'dataset_type' : ("Train", "Validation", "Test"),
    'batch_size' : 16, #16
    'epochs' : 300,
    'learning_rate' : 1e-3,
    'model_name' : "Densenet201",
    'desc' : '',
    'is_transform' : True,
    'feature': 'diag',  # diag, hist -> # diag, hist, frame_diff_hist
    'use_frame_df': False, # True, False
    'is_voting': True, # 전체 세그먼트 분할을 위한 추가 보팅 작업
    'is_preprocess' : False
    })

if args.feature == "hist":
    if args.use_frame_df:
        args.desc = 'histogram_difference_log'
    else:
        args.desc = 'histogram_log'
elif args.feature == "diag":
    if args.use_frame_df:
        args.desc = 'diagnal_difference_log'
    else:
        args.desc = 'diagnal_log'
elif args.feature == "frame_diff_hist":
    args.desc = 'frame_difference_histogram'

In [None]:
test_dataset = NSRVideoDataset(args.dataset_path, args.split, args.dataset_type[2], args.feature, args.use_frame_df, args.is_transform, args.is_voting, args.is_preprocess)
test_dataloader = DataLoader(test_dataset, batch_size=args.batch_size, shuffle=False, pin_memory=True, num_workers=1, drop_last=False)

In [None]:
from Models.Densenet201 import Densenet201

device = 'cuda' if torch.cuda.is_available() else 'cpu'

model = Densenet201().to(device, non_blocking=True)
# torchsummary.summary(model, (3, 224, 224))

criterion = torch.nn.BCEWithLogitsLoss().to(device, non_blocking=True)
optimizer = torch.optim.Adam(model.parameters(), lr=args.learning_rate)

In [None]:
model_path = r'C:\Users\VIP444\Documents\hist\VideoClassification\runs\2023-07-17-090248-Densenet201-diagnal_log\models'
load_path = os.path.join(model_path, 'checkpoint.pth.tar')

def load_checkpoint(checkpoint, model, optimizer):
    print("=> Loading checkpoint")
    model.load_state_dict(checkpoint['state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer'])

load_checkpoint(torch.load(load_path), model, optimizer)

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

outputs_results = []
labels_results = []
running_loss = 0.0
running_corrects = 0.0
running_corrects_test = 0.0
with torch.no_grad():
    for datas, labels in tqdm(test_dataloader, total=len(test_dataloader), desc="Test Progress"):
        labels = labels.to(device, non_blocking=True)
        datas = datas.float().to(device, non_blocking=True)

        outputs = model(datas)
        loss = criterion(outputs, labels)

        running_loss += loss.item() * datas.size(0)
        running_corrects += ((outputs > 0.5) == labels).float().sum().item()
        outputs = (outputs > 0.5).float().cpu().numpy()

        outputs_results.extend(outputs)
        labels_results.extend(labels)
            

In [None]:
video_name_set = set()
video_name_dict = {}
label_dict = {}

for data_path in test_dataset.data_paths:
    video_name_set.add(data_path[0].split('\\')[-2])

for video_name in list(video_name_set):
    video_name_dict[video_name] = []

for idx, value in enumerate(outputs_results):
    video_name = test_dataset.data_paths[idx][0].split('\\')[-2]
    if video_name in video_name_dict:
        video_name_dict[video_name].append(value.item())

        if label_dict.get(video_name) == None:
            label_dict[video_name] = test_dataset.data_paths[idx][1][0]

agree_label_total = 0
agree_total = 0
nonagree_label_total = 0
nonagree_total = 0
y_pred, y_true = [], []
threshold_value = 0.35

EXPERIMENT_DIR = f"./runs-voting/{time.strftime('%Y-%m-%d-%H%M%S')}-{threshold_value}-test-{args.desc}"
os.makedirs(EXPERIMENT_DIR, exist_ok=True)
txtf = open(f'{EXPERIMENT_DIR}/test-res.txt', 'w')
txtf.flush()

for key in video_name_dict.keys():
    video_name_dict_value_len = len(video_name_dict[key])
    ret = np.array(video_name_dict[key])

    # 통계
    if label_dict[key] == 1:
        agree_label_total += sum(ret == 1)
        agree_total += video_name_dict_value_len
    elif label_dict[key] == 0:
        nonagree_label_total += sum(ret == 0)
        nonagree_total += video_name_dict_value_len

    # 임계값 분류
    if sum(ret == 1) >= (video_name_dict_value_len * threshold_value):
        txtf.write(f'1 - {key}\n')
        video_name_dict[key] = 1
    else:
        txtf.write(f'0 - {key}\n')
        video_name_dict[key] = 0

    y_pred.append(video_name_dict[key])
    y_true.append(label_dict[key])

txtf.truncate()
txtf.close()

In [None]:
import shutil


error_indexes = []
for idx, (p, t) in enumerate(zip(y_pred, y_true)):
    if p != t:
        error_indexes.append(idx)

error_list = []
for idx in error_indexes:
    error_list.append(test_dataset.data_paths[idx])

legal_error_destination = EXPERIMENT_DIR + "\error_ag_to_non"
os.makedirs(legal_error_destination, exist_ok=True)
illegal_error_destination = EXPERIMENT_DIR + "\error_non_to_ag"
os.makedirs(illegal_error_destination, exist_ok=True)

for error_path, label in tqdm(error_list, total=len(error_list)):
    try:
        file_name = error_path.split("\\")[-2] + "-" + error_path.split("\\")[-1]
        if label[0] == 1:
            shutil.copy2(error_path, f"{legal_error_destination}\{file_name}")
        else:
            shutil.copy2(error_path, f"{illegal_error_destination}\{file_name}")
    except:
        print(error_path)
        continue



In [None]:
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sn
import datetime

today = datetime.datetime.now().strftime("%Y-%m-%d")
cm = confusion_matrix(y_true, y_pred)
labels = ["Non-edited", "Edited"]

accuracy = (cm[0][0] + cm[1][1]) / (cm[0][0] + cm[0][1] + cm[1][0] + cm[1][1])
print(accuracy)
txtf = open(f'{EXPERIMENT_DIR}/test-acc.txt', 'w')
txtf.flush()
txtf.write(f'{accuracy}\n')
txtf.truncate()
txtf.close()

df_cm = pd.DataFrame(cm, index = [label for label in labels], columns = [label for label in labels])
plt.figure(figsize = (12,7))

plt.title('Predicted Class')

sn.set(font_scale=2)
sn.heatmap(df_cm, annot=True, fmt='d', annot_kws={"weight": "bold"})

plt.ylabel('Actual Class')
plt.savefig(f'{EXPERIMENT_DIR}\confusion_matrix-{today}-{threshold_value}.png')