In [1]:
import torch
import torchvision.models as models
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.optim import lr_scheduler


import numpy as np
import matplotlib.pyplot as plt
import cv2
import random
import seaborn as sns


from sklearn.metrics import precision_score, recall_score,f1_score
from tqdm import tqdm
from utils.config import CFG
from utils.Custom_Dataset import CustomDataset
from models.RNN import CNNRNN
from models.LSTM import *
from sklearn.metrics import (confusion_matrix,recall_score, f1_score)


from utils.feature_extraction import *

import warnings
warnings.filterwarnings("ignore")


seed = 4
deterministic = True
random.seed(seed)

torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

print(torch.device("cuda" if torch.cuda.is_available() else 'cpu'))
print(torch.__version__)
print(cv2.__version__)


  from .autonotebook import tqdm as notebook_tqdm
  from scipy.stats import gaussian_kde


cuda
1.13.1+cu116
4.10.0


테스트데이터 불러오기

In [2]:
valid_data=np.load('./data/image_valid.npy')
valid_labels=np.load('./data/label_valid.npy')
print("Valid_data: ", valid_data.shape)
print("Valid_label: ", valid_labels.shape)
valid_dataset=CustomDataset(valid_data,valid_labels,mode='Valid')
valid_dataloader = DataLoader(valid_dataset, batch_size=1, shuffle=False)

Valid_data:  (492, 60, 224, 224, 3)
Valid_label:  (492,)


In [4]:
def evaluate_model(model, valid_loader, criterion, device='cpu'):
    error_batch=[]
    error_labels=[]
    index=[]
    model.eval()
    
    total_loss = 0.0
    correct_predictions = 0
    y_true=[]
    y_pred=[]
    losses=[]
    with torch.cuda.amp.autocast():
        with torch.no_grad(): 
            for i,(images, labels) in enumerate(tqdm(valid_loader)):
                images = images.to(device)
                labels = labels.to(device).long()
            
                
                outputs,_ = model(images)
               
                

                loss = criterion(outputs, labels) 
                
                total_loss += loss 
                
                predicted = torch.argmax(outputs, dim=1)

              

                y_pred.append(predicted.item())
                y_true.append(labels.item())
                losses.append(outputs)

                
                
                if predicted!=labels:
                    error_batch.append(images) # 올바르게 예측하지 못한 이미지 
                    error_labels.append(labels)# 올바르게 예측하지 못한 라벨
                    index.append(i)# 올바르게 예측하지 못한 인덱스
                correct_predictions += (predicted == labels).sum().item()
    avg_loss = total_loss / len(valid_loader)  # 평균 손실 계산
    accuracy = correct_predictions / len(valid_dataset)  # 정확도 계산
    
    return avg_loss, accuracy,y_pred,y_true,error_batch,error_labels,index,losses

모델 불러오기

In [5]:
model=torch.load('F:/VSC/opensg/6_LSTM_1.0_best_model.pt').to(CFG['device'])

평가

In [None]:
criterion = nn.CrossEntropyLoss()
test_loss, test_accuracy,y_pred,y_true,error_batch,error_labels,index,losses = evaluate_model(model, valid_dataloader, criterion, CFG['device'])
precision = precision_score(y_true, y_pred, pos_label=1)
print(f'Precision (정밀도): {precision:.4f}')

# Recall (재현율) 계산
recall = recall_score(y_true, y_pred, pos_label=1)
print(f'Recall (재현율): {recall:.4f}')

f1 = f1_score(y_true, y_pred, pos_label=1)
print(f'F1 스코어: {f1:.4f}')

print(f'ACCURACY: {test_accuracy}')

혼동행렬

In [None]:
conf_matrix = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(6,4))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=['Normal', 'Deadlock'], yticklabels=['Normal', 'Deadlock'])
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()

+ add_image
    
    영상에서 1초마다 프레임을 Buffer에 저장
+ clear_buffer

    clear_buffer를 수행시 두번째 이미지 부터 buffer에 저장

+ initial_buffer

    버퍼를 비우는 함수

+ is_ready

    buffer의 상태를 확인 buffer의 길이가 sequence_length와 같을때 True를 받음

+ buffer_len

    buffer의 길이를 반환

+ get_sequence

    버퍼에 담겨있는 이미지를 CNN-LSTM에 입력하기 위해 전처리

In [None]:
class ShiftBuffer:
    def __init__(self, sequence_length=60):
        """
        :param sequence_length: 모델에 입력할 시퀀스의 길이 (기본값: 60)
        """
        self.sequence_length = sequence_length
        self.buffer = []  # 초기 빈 리스트

    def add_image(self, image):
        """
        실시간으로 들어오는 이미지를 버퍼에 추가하며, 기존 프레임을 한 칸씩 앞으로 이동
        buffer에 sequence_length보다 더 저게 이미지가 담겨 있을경우 buffer에 이미지 저장
        """
        if len(self.buffer) < self.sequence_length:
            self.buffer.append(image)
    def clear_buffer(self):
        """
        첫번째 이미지를 제외
        """
        self.buffer = self.buffer[1:]
    def initial_buffer(self):
        """
        buffer 비우기
        """
        self.buffer=[]
    def is_ready(self):
        """
        buffer에 sequence_length만큼의 이미지가 쌓였는지 확인
        """
        return len(self.buffer) == self.sequence_length
    
    def buffer_len(self):
        return len(self.buffer)

    def get_sequence(self):
        """
        buffer에 쌓인 sequence를 반환
        :return: torch.Size([sequence_length, 3, H, W]) 형태의 시퀀스 텐서
        """
        return torch.stack(self.buffer).permute(0,3,1,2).unsqueeze(0).to(CFG['device'])

buffer = ShiftBuffer(sequence_length=60)

video_path: 영상주소

In [None]:

#video_path='./raw_data_ng/deadlock-20241015-ng-2.mp4'
video_path='./raw_data_normal/deadlock-20241015-normal-2.mp4'

cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS) #FPS 계산 현재 동영상 FPS는 30FPS
frame_count = 0
normal_accuracy='100'
deadlock_accuracy='0'
while cap.isOpened():
    ret, frame = cap.read()

    if not ret:
        break  # 동영상 끝까지 다 읽었을 때 루프 종료

    view_frame=cv2.resize(frame,(1320,640)) #시각화 동영상 크기 설정
    

    if cv2.waitKey(10) == ord('q'):
        break
    current_frame_num = cap.get(cv2.CAP_PROP_POS_FRAMES)
    if frame_count % fps == 0: #1초에 한장씩
        frame_resize=cv2.resize(frame,(224,224)) #이미지를 224x224로 조정
        frame_resize=torch.from_numpy(frame_resize).float()/255.0 # 255로 나누어주면서 0~1사이로 전처리
        buffer.add_image(frame_resize) #버퍼에 이미지 추가
        

    if buffer.is_ready(): #버퍼에 이미지 개수가 60개일때
        
        #모델 평가 수행
        with torch.no_grad():
            output=F.softmax(model(buffer.get_sequence()))
            predicted = torch.argmax(output, dim=1)
            #평가가 끝나면 버퍼에서 첫번째 이미지를 제거
            buffer.clear_buffer()

        deadlock_accuracy=str(output[0][1].item()*100)[:5]
        normal_accuracy=str(output[0][0].item()*100)[:5]
        
    cv2.putText(view_frame, "Normal :"+normal_accuracy+'%', (1000, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)  # 흰색 글씨

    cv2.putText(view_frame, "DEADLOCK: "+deadlock_accuracy+'%', (1000, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)  # 빨간 글씨
   
    
    
    
    cv2.imshow('video', view_frame)
    
    
    
    frame_count += 1


cap.release() 
cv2.destroyAllWindows() # 모든 창 닫기
buffer.initial_buffer() #버퍼 이미지 모두 제거
        
        