In [None]:
import os
import cv2
import torch
import pickle
import xml.etree.ElementTree as ET
from tqdm import tqdm
from torchvision import transforms
from torchvision.models.detection import ssdlite320_mobilenet_v3_large
import matplotlib.pyplot as plt

# PyTorch 얼굴 인식 모델 초기화 (SSD Mobilenet V3 모델 사용)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ssdlite320_mobilenet_v3_large(pretrained=True)
model.eval()
model.to(device)

print("얼굴 인식 모델 초기화 완료")


In [None]:
def get_face_descriptors_and_boxes(image_path):
    image = cv2.imread(image_path)  # RGB로 읽기
    if image is None:
        print(f"Could not read image: {image_path}")
        return [], []

    image_tensor = transforms.functional.to_tensor(image).unsqueeze(0).to(device)
    with torch.no_grad():
        detections = model(image_tensor)[0]
    
    descriptors = []
    boxes = []
    h, w, _ = image.shape
    
    for i in range(len(detections['scores'])):
        score = detections['scores'][i].cpu().numpy()
        if score > 0.5:
            bbox = detections['boxes'][i].cpu().numpy()
            startX = int(bbox[0] * w)
            startY = int(bbox[1] * h)
            endX = int(bbox[2] * w)
            endY = int(bbox[3] * h)
            face = image[startY:endY, startX:endX]
            if face.shape[0] > 0 and face.shape[1] > 0:
                face_blob = cv2.dnn.blobFromImage(face, 1.0/255, (96, 96), (0, 0, 0), swapRB=True, crop=False)
                descriptors.append(face_blob.flatten())
                boxes.append((startX, startY, endX, endY))
    return descriptors, boxes

print("얼굴 인식 함수 정의 완료")


In [None]:
# 얼굴 특징 벡터를 저장할 파일 경로
face_descriptors_file = "face_descriptors_3.pkl"

# P2L_faces 디렉토리에서 얼굴 특징 벡터 추출
face_descriptors = {}
base_path_faces = "./P2L_faces"
for session in tqdm(os.listdir(base_path_faces), desc="Sessions"):
    session_path = os.path.join(base_path_faces, session)
    if not os.path.isdir(session_path):
        print(f"Skipping non-directory session: {session_path}")
        continue
    print(f"Processing session: {session_path}")
    for person_id in tqdm(os.listdir(session_path), desc=f"Processing session {session}", leave=False):
        person_path = os.path.join(session_path, person_id)
        if not os.path.isdir(person_path):
            print(f"Skipping non-directory person ID: {person_path}")
            continue
        print(f"Processing person ID: {person_id} in session: {session}")
        face_descriptors[person_id] = []
        for face_image in os.listdir(person_path):
            face_image_path = os.path.join(person_path, face_image)
            print(f"Processing face image: {face_image_path}")
            descriptors, _ = get_face_descriptors_and_boxes(face_image_path)
            face_descriptors[person_id].extend(descriptors)

# 얼굴 특징 벡터를 파일로 저장
with open(face_descriptors_file, 'wb') as f:
    pickle.dump(face_descriptors, f)

print("얼굴 특징 벡터 추출 및 저장 완료")


In [None]:
# 얼굴 특징 벡터 파일 로드
face_descriptors_file = "face_descriptors_3.pkl"
with open(face_descriptors_file, 'rb') as f:
    face_descriptors = pickle.load(f)

print("얼굴 특징 벡터 로드 완료")


In [None]:
def match_faces_in_frame(image_path, face_descriptors):
    image = cv2.imread(image_path)
    if image is None:
        print(f"Could not read image: {image_path}")
        return []
    
    image_tensor = transforms.functional.to_tensor(image).unsqueeze(0).to(device)
    with torch.no_grad():
        detections = model(image_tensor)[0]
    
    matches = []
    h, w, _ = image.shape
    
    for i in range(len(detections['scores'])):
        score = detections['scores'][i].cpu().numpy()
        if score > 0.5:
            bbox = detections['boxes'][i].cpu().numpy()
            startX = int(bbox[0] * w)
            startY = int(bbox[1] * h)
            endX = int(bbox[2] * w)
            endY = int(bbox[3] * h)
            face = image[startY:endY, startX:endX]
            if face.shape[0] > 0 and face.shape[1] > 0:
                face_blob = cv2.dnn.blobFromImage(face, 1.0/255, (96, 96), (0, 0, 0), swapRB=True, crop=False)
                face_descriptor = face_blob.flatten()
                
                for person_id, descriptors in face_descriptors.items():
                    for descriptor in descriptors:
                        dist = np.linalg.norm(face_descriptor - descriptor)
                        if dist < 0.6:  # 거리 임계값을 설정하여 매칭 판단
                            matches.append((person_id, (startX, startY, endX, endY)))
                            break
    return matches

print("CCTV 프레임에서 얼굴을 매칭하는 함수 정의 완료")


In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score, average_precision_score

def compute_iou(boxA, boxB):
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])
    interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1)
    boxAArea = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1)
    boxBArea = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1)
    iou = interArea / float(boxAArea + boxBArea - interArea)
    return iou

print("성능 평가를 위한 함수 정의 완료")


In [None]:
import time
import xml.etree.ElementTree as ET
from tqdm import tqdm

results = []
ground_truth = []
ious = []
start_time = time.time()

base_path_cctv = "./P2L"
groundtruth_dir = "./groundtruth"
for xml_file in tqdm(os.listdir(groundtruth_dir), desc="XML Files"):
    if xml_file.endswith('.xml'):
        xml_path = os.path.join(groundtruth_dir, xml_file)
        
        # XML 파일 파싱
        try:
            tree = ET.parse(xml_path)
            root = tree.getroot()
        except Exception as e:
            print(f"Error parsing {xml_file}: {e}")
            continue
        
        # 프레임별로 처리
        for frame in tqdm(root.findall('frame'), desc=f"Processing {xml_file}", leave=False):
            frame_number = frame.get('number')
            person = frame.find('person')
            if person is not None:
                person_id = person.get('id')
                ground_truth.append(person_id)
                
                left_eye = person.find('leftEye')
                right_eye = person.find('rightEye')
                if left_eye is not None and right_eye is not None:
                    lx, ly = int(left_eye.get('x')), int(left_eye.get('y'))
                    rx, ry = int(right_eye.get('x')), int(right_eye.get('y'))
                    gt_box = [lx, ly, rx, ry]
                    
                    # 매치되는 P2L 디렉토리의 서브디렉토리 탐색
                    subdir_path = os.path.join(base_path_cctv, xml_file.replace('.xml', ''))
                    if os.path.isdir(subdir_path):
                        for file in os.listdir(subdir_path):
                            if file.startswith(frame_number) and file.endswith('.jpg'):
                                image_path = os.path.join(subdir_path, file)
                                
                                # 얼굴 인식 수행
                                try:
                                    matches = match_faces_in_frame(image_path, face_descriptors)
                                except Exception as e:
                                    print(f"Error processing {image_path}: {e}")
                                    continue

                                # 매칭된 얼굴의 바운딩 박스와 Ground Truth 비교
                                for match, pred_box in matches:
                                    pred_person_id = match
                                    (startX, startY, endX, endY) = pred_box

                                    # IoU 계산
                                    iou = compute_iou(gt_box, [startX, startY, endX, endY])
                                    ious.append(iou)

                                    # 결과 저장
                                    results.append(pred_person_id)

# 성능 지표 계산
end_time = time.time()
total_time = end_time - start_time
mean_iou = np.mean(ious)
average_precision = average_precision_score(ground_truth, results, average='weighted')

precision = precision_score(ground_truth, results, average='weighted')
recall = recall_score(ground_truth, results, average='weighted')
f1 = f1_score(ground_truth, results, average='weighted')

print(f"Total Inference Time: {total_time:.2f} seconds")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1 Score: {f1:.2f}")
print(f"Mean IoU: {mean_iou:.2f}")
print(f"mAP: {average_precision:.2f}")

# 결과를 파일에 저장
with open("recognition_results.txt", "w") as f:
    for gt, match in zip(ground_truth, results):
        f.write(f"Ground Truth: {gt}, Match: {match}\n")

print("성능 평가 및 결과 저장 완료")


위의 total inference time을 한 장당 평균적인 inference time으로 수정하자.

---

xml 파일 하나에 대해서만

In [7]:
import time
import xml.etree.ElementTree as ET
from tqdm import tqdm
from sklearn.metrics import precision_score, recall_score, f1_score, classification_report, accuracy_score
import numpy as np

results = []
ground_truth = []
ious = []

base_path_cctv = "./P2L"
groundtruth_dir = "./groundtruth"


In [8]:
def calculate_bounding_box(left_eye, right_eye):
    center_x = (left_eye[0] + right_eye[0]) // 2
    center_y = (left_eye[1] + right_eye[1]) // 2
    eye_distance = ((right_eye[0] - left_eye[0]) ** 2 + (right_eye[1] - left_eye[1]) ** 2) ** 0.5
    scale_factor = 2.0
    box_size = int(eye_distance * scale_factor)
    startX = center_x - box_size // 2
    startY = center_y - box_size // 2
    endX = center_x + box_size // 2
    endY = center_y + box_size // 2
    return [startX, startY, endX, endY]

print("Bounding box calculation function defined.")


Bounding box calculation function defined.


In [9]:
# 첫 번째 XML 파일만 처리
xml_files = [f for f in os.listdir(groundtruth_dir) if f.endswith('.xml')]
if xml_files:
    xml_file = xml_files[0]
    xml_path = os.path.join(groundtruth_dir, xml_file)
    
    # XML 파일 파싱
    try:
        tree = ET.parse(xml_path)
        root = tree.getroot()
        print(f"Parsed {xml_file} successfully.")
    except Exception as e:
        print(f"Error parsing {xml_file}: {e}")
else:
    print("No XML files found in the groundtruth directory.")


Parsed P1E_S1_C1.xml successfully.


In [10]:
if xml_files:
    frame_count = len(root.findall('frame'))
    start_time = time.time()
    
    for frame in tqdm(root.findall('frame'), desc=f"Processing {xml_file}"):
        frame_number = frame.get('number')
        person = frame.find('person')
        if person is not None:
            person_id = person.get('id')
            ground_truth.append(person_id)
            
            left_eye = person.find('leftEye')
            right_eye = person.find('rightEye')
            if left_eye is not None and right_eye is not None:
                lx, ly = int(left_eye.get('x')), int(left_eye.get('y'))
                rx, ry = int(right_eye.get('x')), int(right_eye.get('y'))

                # 바운딩 박스 생성
                gt_box = calculate_bounding_box((lx, ly), (rx, ry))
                
                # 매치되는 P2L 디렉토리의 서브디렉토리 탐색
                subdir_path = os.path.join(base_path_cctv, xml_file.replace('.xml', ''))
                if os.path.isdir(subdir_path):
                    print(f"Processing frame {frame_number} in {subdir_path}")
                    matched = False
                    for file in os.listdir(subdir_path):
                        if file.startswith(frame_number) and file.endswith('.jpg'):
                            image_path = os.path.join(subdir_path, file)
                            print(f"Matching faces in {image_path}")
                            
                            # 얼굴 인식 수행
                            try:
                                matches = match_faces_in_frame(image_path, face_descriptors)
                                print(f"Found matches: {matches}")
                            except Exception as e:
                                print(f"Error processing {image_path}: {e}")
                                continue

                            if matches:
                                # 매칭된 얼굴의 바운딩 박스와 Ground Truth 비교
                                for match, pred_box in matches:
                                    pred_person_id = match
                                    (startX, startY, endX, endY) = pred_box

                                    # IoU 계산
                                    iou = compute_iou(gt_box, [startX, startY, endX, endY])
                                    ious.append(iou)

                                    # 결과 저장
                                    results.append(pred_person_id)
                                    matched = True
                    if not matched:
                        results.append("Unknown")
                else:
                    print(f"Subdirectory {subdir_path} does not exist.")
                    results.append("Unknown")
            else:
                print(f"Left or right eye not found for person {person_id} in frame {frame_number}.")
                results.append("Unknown")
        else:
            print(f"No person found in frame {frame_number}.")
            results.append("Unknown")
else:
    print("No XML files to process.")


Processing P1E_S1_C1.xml: 100%|██████████| 2292/2292 [00:00<00:00, 40589.52it/s]

No person found in frame 00000000.
No person found in frame 00000001.
No person found in frame 00000002.
No person found in frame 00000003.
No person found in frame 00000004.
No person found in frame 00000005.
No person found in frame 00000006.
No person found in frame 00000007.
No person found in frame 00000008.
No person found in frame 00000009.
No person found in frame 00000010.
No person found in frame 00000011.
No person found in frame 00000012.
No person found in frame 00000013.
No person found in frame 00000014.
No person found in frame 00000015.
No person found in frame 00000016.
No person found in frame 00000017.
No person found in frame 00000018.
No person found in frame 00000019.
No person found in frame 00000020.
No person found in frame 00000021.
No person found in frame 00000022.
No person found in frame 00000023.
No person found in frame 00000024.
No person found in frame 00000025.
No person found in frame 00000026.
No person found in frame 00000027.
No person found in f




In [11]:
# results 리스트에서 "Unknown"이 아닌 요소들을 필터링하여 출력
non_unknown_results = [result for result in results if result != "Unknown"]

print("Results (excluding 'Unknown'):")
for result in non_unknown_results:
    print(result)


Results (excluding 'Unknown'):


In [12]:
if xml_files:
    # 성능 지표 계산
    end_time = time.time()
    total_time = end_time - start_time
    mean_iou = np.mean(ious) if ious else float('nan')
    
    precision = precision_score(ground_truth, results, average='weighted', zero_division=0)
    recall = recall_score(ground_truth, results, average='weighted', zero_division=0)
    f1 = f1_score(ground_truth, results, average='weighted', zero_division=0)
    accuracy = accuracy_score(ground_truth, results)
    report = classification_report(ground_truth, results, zero_division=0)

    print(f"Total Inference Time: {total_time:.2f} seconds")
    print(f"Average Inference Time per Frame: {total_time/frame_count:.2f} seconds")
    print(f"Precision: {precision:.2f}")
    print(f"Recall: {recall:.2f}")
    print(f"F1 Score: {f1:.2f}")
    print(f"Accuracy: {accuracy:.2f}")
    print(f"Mean IoU: {mean_iou:.2f}")
    print("Classification Report:\n", report)

    # 결과를 파일에 저장
    with open("recognition_results.txt", "w") as f:
        for gt, match in zip(ground_truth, results):
            f.write(f"Ground Truth: {gt}, Match: {match}\n")
        f.write("\nClassification Report:\n")
        f.write(report)

    print("성능 평가 및 결과 저장 완료")
else:
    print("No XML files found in the groundtruth directory.")


ValueError: Found input variables with inconsistent numbers of samples: [1652, 2292]

In [None]:
import xml.etree.ElementTree as ET
import cv2
import os
import matplotlib.pyplot as plt

# 특정 프레임 번호 설정
target_frame_number = "00003801"  # 원하는 프레임 번호를 설정합니다
groundtruth_dir = "./groundtruth"
base_path_cctv = "./P2L"

# 결과를 저장할 디렉토리 생성
result_dir = "groundtruth_results"
os.makedirs(result_dir, exist_ok=True)

# 특정 프레임에 대해 Ground Truth 바운딩 박스 추가 및 시각화
for xml_file in tqdm(os.listdir(groundtruth_dir), desc="XML Files"):
    if xml_file.endswith('.xml'):
        xml_path = os.path.join(groundtruth_dir, xml_file)
        
        # XML 파일 파싱
        try:
            tree = ET.parse(xml_path)
            root = tree.getroot()
        except Exception as e:
            print(f"Error parsing {xml_file}: {e}")
            continue
        
        # 특정 프레임 처리
        for frame in root.findall('frame'):
            frame_number = frame.get('number')
            if frame_number == target_frame_number:
                person = frame.find('person')
                if person is not None:
                    person_id = person.get('id')
                    left_eye = person.find('leftEye')
                    right_eye = person.find('rightEye')
                    if left_eye is not None and right_eye is not None:
                        lx, ly = int(left_eye.get('x')), int(left_eye.get('y'))
                        rx, ry = int(right_eye.get('x')), int(right_eye.get('y'))
                        gt_box = [lx, ly, rx, ry]
                        
                        # 매치되는 P2L 디렉토리의 서브디렉토리 탐색
                        subdir_path = os.path.join(base_path_cctv, xml_file.replace('.xml', ''))
                        if os.path.isdir(subdir_path):
                            for file in os.listdir(subdir_path):
                                if file.startswith(frame_number) and file.endswith('.jpg'):
                                    image_path = os.path.join(subdir_path, file)
                                    
                                    # 이미지 읽기
                                    image = cv2.imread(image_path)
                                    if image is None:
                                        print(f"Could not read image: {image_path}")
                                        continue
                                    
                                    # 바운딩 박스 그리기
                                    startX, startY, endX, endY = gt_box[0], gt_box[1], gt_box[2], gt_box[3]
                                    cv2.rectangle(image, (startX, startY), (endX, endY), (0, 255, 0), 2)
                                    cv2.putText(image, person_id, (startX, startY - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

                                    # 결과 이미지 저장
                                    result_image_path = os.path.join(result_dir, f"{frame_number}_{file}")
                                    cv2.imwrite(result_image_path, image)
                                    print(f"Result saved to {result_image_path}")

# 결과 이미지 시각화
if os.path.exists(result_image_path):
    from IPython.display import Image, display
    display(Image(filename=result_image_path))

print("특정 프레임에 대한 Ground Truth 바운딩 박스 추가 및 결과 저장 완료")
