In [None]:
from google.colab import drive
drive.mount('/content/drive')

# output_scenes 폴더를 /content/content 로 복사
!cp -r /content/drive/MyDrive/capstone_code/output_scenes /content

# 복사된 내용 확인
!ls /content

In [None]:
!pip install face_recognition
!pip install deep_sort_realtime

In [None]:
!git clone https://github.com/serengil/deepface.git
%cd deepface
!pip install -e .

# 씬 분할 순서 정렬
- 씬 분할 이후 파일을 불러올 때 순서를 정렬하기 위한 코드

In [4]:
import glob, re

mp4_paths = sorted(
    glob.glob("/content/output_scenes/Scene_*.mp4"),
    key=lambda fname: [int(t) if t.isdigit() else t.lower() for t in re.split(r'(\d+)', fname)]
)

- 주인공에 대한 얼굴 임베딩을 사전에 생성하는 코드
- 현재 실행코드에선 주인공에 얼굴 임베딩을 사전 생성하지 않고, 처음 본 객체에 새롭게 ID를 부여하도록 구현함.
- 현재는 필요 없지만, 해당 코드를 먼저 실행하지 않으면 'KerasHistory' object has no attribute 'layer' 오류가 발생함.
- 해당 문제가 발생하는 이유를 찾아 수정할 수 있다면 하는 것이 좋아보임.
- 주인공에 집중에서 해설을 하고 싶다면, 해당 코드에 임베딩을 원하는 파일에 주인공과 관련된 이미지 파일을 넣으면 됨.


In [None]:
import os
import numpy as np
from deepface import DeepFace

def build_actor_embeddings(actor_db_path, model_name="Facenet512", detector_backend="mtcnn"):
    """
    actor_db_path 디렉토리 내 하위 폴더(배우 이름)에서
    각 이미지 임베딩을 평균내어 {actor_name: avg_embedding} 형태의 dict 반환
    """
    actor_embeddings = {}
    for actor in os.listdir(actor_db_path):
        actor_folder = os.path.join(actor_db_path, actor)
        if not os.path.isdir(actor_folder) or actor.startswith('.'):
            continue

        embs = []
        for img in os.listdir(actor_folder):
            if not img.lower().endswith(('.jpg','.jpeg','.png')):
                continue
            img_path = os.path.join(actor_folder, img)
            rep = DeepFace.represent(
                img_path,
                model_name=model_name,
                detector_backend=detector_backend,
                enforce_detection=False
            )
            if rep and isinstance(rep, list) and 'embedding' in rep[0]:
                embs.append(np.array(rep[0]['embedding']))

        if embs:
            actor_embeddings[actor] = np.mean(embs, axis=0)
            print(f"  ▶ {actor}: {len(embs)}장 이미지로 임베딩 생성")
        else:
            print(f"  ⚠️ {actor}: 유효한 얼굴 이미지 없음")

    return actor_embeddings

# 예시: 한 번만 실행해서 저장해 두기
if __name__ == "__main__":
    db_path = "/content/drive/MyDrive/image"
    actor_embeddings = build_actor_embeddings(db_path)
    # 나중에 불러 쓰기 좋게 .npy로 저장
    np.save("/content/actor_embeddings.npy", actor_embeddings)


#최종 구현단계 (앞에서 구현한 코드 + 앞,뒤 프레임까지 비교)

## YOLO 객체 검출 후
- YOLO의 객체 검출을 이용해 frames의 파일을 생성
- 해당 객체의 바운딩 박스를 JSON 파일의 형태로 저장

In [None]:
!pip install torchreid

## 기존 알고리즘에 얼굴 정렬 알고리즘 추가함(모든 프레임 분석)
- 기존 알고리즘보다 얼굴을 인식하는 정확도는 상대적으로 오른 것을 확인할 수 있었음.
- 성능 확인을 위해 모든 프레임을 분석
- 너무 세세한 분석에 영상적인 의미를 잃어버림.

In [None]:
import os
import json
import cv2
import numpy as np
import torch
import torchreid
from mtcnn import MTCNN
from deepface import DeepFace
from tqdm import tqdm
from deep_sort_realtime.deepsort_tracker import DeepSort

# --- 설정 ---
FRAMES_DIR       = '/content/drive/MyDrive/capstone_code/frames_and_detections_allframes/frames'
DETECTIONS_JSON  = '/content/drive/MyDrive/capstone_code/frames_and_detections_allframes/content/detections.json'
OUTPUT_DIR       = '/content/output_pipeline_DeepSORT'
OUTPUT_FACE_DIR  = '/content/output_image'
FACE_THRESH      = 0.6
BODY_THRESH      = 0.7
DEVICE           = 'cuda' if torch.cuda.is_available() else 'cpu'

# --- 디렉터리 생성 ---
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(OUTPUT_FACE_DIR, exist_ok=True)

# --- 모델 및 트래커 초기화 ---
face_detector    = MTCNN()
face_model_name  = 'ArcFace'
body_model       = torchreid.models.build_model(
    name='resnet50_ibn_a', num_classes=1000, loss='softmax', pretrained=True
)
body_model.to(DEVICE).eval()
tracker = DeepSort(max_age=3, n_init=3, max_iou_distance=0.3)

# --- 칼만필터 dt 보정 ---
internal_tracker = tracker.tracker
kf = internal_tracker.kf
frame_interval = 10
for i in range(4):
    kf._motion_mat[i, i+4] = frame_interval

# --- 갤러리 및 ID 관리 ---
person_gallery = {}
next_person_id = 1
final_id       = {}
pid_to_canonical_tid = {}

# --- 얼굴 정렬 템플릿 및 함수 ---
TEMPLATE_5PTS = np.array([
    [38.2946, 51.6963], [73.5318, 51.5014], [56.0252, 71.7366],
    [41.5493, 92.3655], [70.7299, 92.2041]
], dtype=np.float32)

def align_face(img, kpts, output_size=(112,112)):
    M, _ = cv2.estimateAffinePartial2D(kpts, TEMPLATE_5PTS, method=cv2.LMEDS)
    return cv2.warpAffine(img, M, output_size, borderValue=0)

# --- 임베딩 헬퍼 ---
def extract_face_emb(img):
    rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    rep = DeepFace.represent(rgb, model_name=face_model_name, enforce_detection=False)
    if not rep:
        return None
    emb = np.array(rep[0]['embedding'], dtype=np.float32)
    return emb / np.linalg.norm(emb)

def extract_body_emb(img):
    rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    t = torch.from_numpy(rgb).permute(2,0,1).unsqueeze(0).float().to(DEVICE)/255.0
    with torch.no_grad():
        feat = body_model(t)
    emb = feat.squeeze(0).cpu().numpy()
    return emb / np.linalg.norm(emb)

def cosine(a, b):
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

def assign_person_id(emb, mode):
    global next_person_id
    pid = f"person_{next_person_id}"
    person_gallery[pid] = {'face': None, 'body': None}
    person_gallery[pid][mode] = emb
    next_person_id += 1
    return pid

# --- 매칭 헬퍼 ---
def match_face(emb, alpha=0.6):
    best_id, best_sim = 'unknown', 0.0
    for pid, embs in person_gallery.items():
        ref = embs.get('face')
        if ref is None: continue
        sim = cosine(emb, ref)
        if sim > best_sim:
            best_id, best_sim = pid, sim
    if best_sim >= FACE_THRESH:
        old = person_gallery[best_id]['face']
        updated = (1-alpha)*old + alpha*emb
        person_gallery[best_id]['face'] = updated / np.linalg.norm(updated)
        return best_id, best_sim
    return assign_person_id(emb, 'face'), 0.0

def match_body(emb, alpha=0.7):
    best_id, best_sim = 'unknown', 0.0
    for pid, embs in person_gallery.items():
        ref = embs.get('body')
        if ref is None: continue
        sim = cosine(emb, ref)
        if sim > best_sim:
            best_id, best_sim = pid, sim
    if best_sim >= BODY_THRESH:
        old = person_gallery[best_id]['body']
        updated = (1-alpha)*old + alpha*emb
        person_gallery[best_id]['body'] = updated / np.linalg.norm(updated)
        return best_id, best_sim
    return assign_person_id(emb, 'body'), 0.0

# --- 검출 결과 로드 ---
with open(DETECTIONS_JSON, 'r') as f:
    dets = json.load(f)

# --- 메인 파이프라인 ---
print('Starting pipeline with dt=', frame_interval)
for idx, fname in enumerate(tqdm(sorted(os.listdir(FRAMES_DIR)), desc='Pipeline')):
    if not fname.lower().endswith('.jpg'): continue
    frame = cv2.imread(os.path.join(FRAMES_DIR, fname))
    if frame is None: continue

    print(f"\n--- Frame {idx+1}: {fname} ---")
    boxes = dets.get(fname, [])
    dets_list = [([x1, y1, x2-x1, y2-y1], conf, 'person') for x1,y1,x2,y2,conf in boxes]
    print(f"Detections: {len(dets_list)}")

    tracks = tracker.update_tracks(dets_list, frame=frame)
    print(f"Updated tracks: {[t.track_id for t in tracks]}")

    curr_ids = {t.track_id for t in tracks}
    for old in list(final_id):
        if old not in curr_ids:
            print(f"Removing track_{old} from final_id mapping")
            final_id.pop(old)

    for t in tracks:
        raw_tid = t.track_id
        l, t_top, r, b = t.to_ltrb()
        x1, y1, x2, y2 = map(int, (l, t_top, r, b))

        # 기본 바운딩 박스와 트랙 ID (파란)
        cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2)
        cv2.putText(frame, f"track_{raw_tid}", (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1)

        roi = frame[y1:y2, x1:x2]

        if raw_tid in final_id:
            pid = final_id[raw_tid]
            print(f"track_{raw_tid} already assigned -> {pid}")
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame, pid, (x1, y1 - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
            continue

        faces = face_detector.detect_faces(roi)
        print(f"  track_{raw_tid}, faces={len(faces)}")
        if faces:
            x_f, y_f, w_f, h_f = faces[0]['box']
            kpts_roi = np.array([faces[0]['keypoints'][k] for k in ['left_eye','right_eye','nose','mouth_left','mouth_right']], dtype=np.float32)
            raw_face = roi[y_f:y_f+h_f, x_f:x_f+w_f]
            kpts_raw = kpts_roi - np.array([x_f, y_f], dtype=np.float32)
            aligned = align_face(raw_face, kpts_raw)

            cv2.imwrite(os.path.join(OUTPUT_FACE_DIR, f"raw_face_{raw_tid}_{fname}"), raw_face)
            cv2.imwrite(os.path.join(OUTPUT_FACE_DIR, f"aligned_{raw_tid}_{fname}"), aligned)

            emb_f = extract_face_emb(aligned)
            if emb_f is not None:
                pid, sim = match_face(emb_f)
                print(f"Assigned FACE: track_{raw_tid} -> {pid}, sim={sim:.4f}")
                if pid not in pid_to_canonical_tid:
                    pid_to_canonical_tid[pid] = raw_tid
                final_id[raw_tid] = pid
                continue

        emb_b = extract_body_emb(roi)
        pid, sim = match_body(emb_b)
        print(f"Assigned BODY: track_{raw_tid} -> {pid}, sim={sim:.4f}")
        if pid not in pid_to_canonical_tid:
            pid_to_canonical_tid[pid] = raw_tid
        final_id[raw_tid] = pid

    out_path = os.path.join(OUTPUT_DIR, fname)
    cv2.imwrite(out_path, frame)
    print(f"Saved {out_path}")

print('Pipeline completed.')

## 기존 알고리즘에 얼굴 정렬 알고리즘 추가(5프레임 단위 분석)
- 결과 중 가장 높은 성능을 보여줌.
- 가장 자연스러우면서, 인물에 대한 어느 정도의 정확도도 보이고 있음.
- 현재 ReID모델로 resnet50_ibn_a 모델을 사용하고 있는데, 재식별에 대한 성능은 그렇게 좋아보이진 않음.
- 하지만 얼굴이 없는 객체에 대해 이전 프레임에서 분석을 했다면, 이에 대해선 재식별 성능이 어느 정도 나오는 것을 확인할 수 있었음.

In [None]:
import os
import json
import cv2
import numpy as np
import torch
import torchreid
from mtcnn import MTCNN
from deepface import DeepFace
from tqdm import tqdm
from deep_sort_realtime.deepsort_tracker import DeepSort

# --- 설정 ---
FRAMES_DIR       = '/content/drive/MyDrive/capstone_code/frames_and_detections_5frames/frames'
DETECTIONS_JSON  = '/content/drive/MyDrive/capstone_code/frames_and_detections_5frames/content/detections.json'
OUTPUT_DIR       = '/content/output_pipeline_DeepSORT'
OUTPUT_FACE_DIR  = '/content/output_image'
FACE_THRESH      = 0.6
BODY_THRESH      = 0.7
DEVICE           = 'cuda' if torch.cuda.is_available() else 'cpu'

# --- 디렉터리 생성 ---
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(OUTPUT_FACE_DIR, exist_ok=True)

# --- 모델 및 트래커 초기화 ---
face_detector    = MTCNN()
face_model_name  = 'ArcFace'
body_model       = torchreid.models.build_model(
    name='resnet50_ibn_a', num_classes=1000, loss='softmax', pretrained=True
)
body_model.to(DEVICE).eval()
tracker = DeepSort(max_age=3, n_init=3, max_iou_distance=0.3)

# --- 칼만필터 dt 보정 ---
internal_tracker = tracker.tracker
kf = internal_tracker.kf
frame_interval = 10
for i in range(4):
    kf._motion_mat[i, i+4] = frame_interval

# --- 갤러리 및 ID 관리 ---
person_gallery = {}
next_person_id = 1
final_id       = {}
pid_to_canonical_tid = {}

# --- 얼굴 정렬 템플릿 및 함수 ---
TEMPLATE_5PTS = np.array([
    [38.2946, 51.6963], [73.5318, 51.5014], [56.0252, 71.7366],
    [41.5493, 92.3655], [70.7299, 92.2041]
], dtype=np.float32)

def align_face(img, kpts, output_size=(112,112)):
    M, _ = cv2.estimateAffinePartial2D(kpts, TEMPLATE_5PTS, method=cv2.LMEDS)
    return cv2.warpAffine(img, M, output_size, borderValue=0)

# --- 임베딩 헬퍼 ---
def extract_face_emb(img):
    rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    rep = DeepFace.represent(rgb, model_name=face_model_name, enforce_detection=False)
    if not rep:
        return None
    emb = np.array(rep[0]['embedding'], dtype=np.float32)
    return emb / np.linalg.norm(emb)

def extract_body_emb(img):
    rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    t = torch.from_numpy(rgb).permute(2,0,1).unsqueeze(0).float().to(DEVICE)/255.0
    with torch.no_grad():
        feat = body_model(t)
    emb = feat.squeeze(0).cpu().numpy()
    return emb / np.linalg.norm(emb)

def cosine(a, b):
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

def assign_person_id(emb, mode):
    global next_person_id
    pid = f"person_{next_person_id}"
    person_gallery[pid] = {'face': None, 'body': None}
    person_gallery[pid][mode] = emb
    next_person_id += 1
    return pid

# --- 매칭 헬퍼 ---
def match_face(emb, alpha=0.6):
    best_id, best_sim = 'unknown', 0.0
    for pid, embs in person_gallery.items():
        ref = embs.get('face')
        if ref is None: continue
        sim = cosine(emb, ref)
        if sim > best_sim:
            best_id, best_sim = pid, sim
    if best_sim >= FACE_THRESH:
        old = person_gallery[best_id]['face']
        updated = (1-alpha)*old + alpha*emb
        person_gallery[best_id]['face'] = updated / np.linalg.norm(updated)
        return best_id, best_sim
    return assign_person_id(emb, 'face'), 0.0

def match_body(emb, alpha=0.7):
    best_id, best_sim = 'unknown', 0.0
    for pid, embs in person_gallery.items():
        ref = embs.get('body')
        if ref is None: continue
        sim = cosine(emb, ref)
        if sim > best_sim:
            best_id, best_sim = pid, sim
    if best_sim >= BODY_THRESH:
        old = person_gallery[best_id]['body']
        updated = (1-alpha)*old + alpha*emb
        person_gallery[best_id]['body'] = updated / np.linalg.norm(updated)
        return best_id, best_sim
    return assign_person_id(emb, 'body'), 0.0

# --- 검출 결과 로드 ---
with open(DETECTIONS_JSON, 'r') as f:
    dets = json.load(f)

# --- 메인 파이프라인 ---
print('Starting pipeline with dt=', frame_interval)
for idx, fname in enumerate(tqdm(sorted(os.listdir(FRAMES_DIR)), desc='Pipeline')):
    if not fname.lower().endswith('.jpg'): continue
    frame = cv2.imread(os.path.join(FRAMES_DIR, fname))
    if frame is None: continue

    print(f"\n--- Frame {idx+1}: {fname} ---")
    boxes = dets.get(fname, [])
    dets_list = [([x1, y1, x2-x1, y2-y1], conf, 'person') for x1,y1,x2,y2,conf in boxes]
    print(f"Detections: {len(dets_list)}")

    tracks = tracker.update_tracks(dets_list, frame=frame)
    print(f"Updated tracks: {[t.track_id for t in tracks]}")

    curr_ids = {t.track_id for t in tracks}
    for old in list(final_id):
        if old not in curr_ids:
            print(f"Removing track_{old} from final_id mapping")
            final_id.pop(old)

    for t in tracks:
        raw_tid = t.track_id
        l, t_top, r, b = t.to_ltrb()
        x1, y1, x2, y2 = map(int, (l, t_top, r, b))

        # 기본 바운딩 박스와 트랙 ID (파란)
        cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2)
        cv2.putText(frame, f"track_{raw_tid}", (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1)

        roi = frame[y1:y2, x1:x2]

        if raw_tid in final_id:
            pid = final_id[raw_tid]
            print(f"track_{raw_tid} already assigned -> {pid}")
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame, pid, (x1, y1 - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
            continue

        faces = face_detector.detect_faces(roi)
        print(f"  track_{raw_tid}, faces={len(faces)}")
        if faces:
            x_f, y_f, w_f, h_f = faces[0]['box']
            kpts_roi = np.array([faces[0]['keypoints'][k] for k in ['left_eye','right_eye','nose','mouth_left','mouth_right']], dtype=np.float32)
            raw_face = roi[y_f:y_f+h_f, x_f:x_f+w_f]
            kpts_raw = kpts_roi - np.array([x_f, y_f], dtype=np.float32)
            aligned = align_face(raw_face, kpts_raw)

            cv2.imwrite(os.path.join(OUTPUT_FACE_DIR, f"raw_face_{raw_tid}_{fname}"), raw_face)
            cv2.imwrite(os.path.join(OUTPUT_FACE_DIR, f"aligned_{raw_tid}_{fname}"), aligned)

            emb_f = extract_face_emb(aligned)
            if emb_f is not None:
                pid, sim = match_face(emb_f)
                print(f"Assigned FACE: track_{raw_tid} -> {pid}, sim={sim:.4f}")
                if pid not in pid_to_canonical_tid:
                    pid_to_canonical_tid[pid] = raw_tid
                final_id[raw_tid] = pid
                continue

        emb_b = extract_body_emb(roi)
        pid, sim = match_body(emb_b)
        print(f"Assigned BODY: track_{raw_tid} -> {pid}, sim={sim:.4f}")
        if pid not in pid_to_canonical_tid:
            pid_to_canonical_tid[pid] = raw_tid
        final_id[raw_tid] = pid

    out_path = os.path.join(OUTPUT_DIR, fname)
    cv2.imwrite(out_path, frame)
    print(f"Saved {out_path}")

print('Pipeline completed.')

## 기존 알고리즘에 얼굴 정렬 알고리즘 추가(10프레임 단위 분석)
- 빠르게 성능 확인을 위해 10프레임 단위로 사람 객체를 검출한 버전으로 진행.

In [None]:
import os
import json
import cv2
import numpy as np
import torch
import torchreid
from mtcnn import MTCNN
from deepface import DeepFace
from tqdm import tqdm
from deep_sort_realtime.deepsort_tracker import DeepSort

# --- 설정 ---
FRAMES_DIR       = "/content/drive/MyDrive/capstone_code/frames_and_detections/frames"
DETECTIONS_JSON  = "/content/drive/MyDrive/capstone_code/frames_and_detections/content/detections.json"
OUTPUT_DIR       = '/content/output_pipeline_DeepSORT'
OUTPUT_FACE_DIR  = '/content/output_image'
FACE_THRESH      = 0.6
BODY_THRESH      = 0.7
DEVICE           = 'cuda' if torch.cuda.is_available() else 'cpu'

# --- 디렉터리 생성 ---
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(OUTPUT_FACE_DIR, exist_ok=True)

# --- 모델 및 트래커 초기화 ---
face_detector    = MTCNN()
face_model_name  = 'ArcFace'
body_model       = torchreid.models.build_model(
    name='resnet50_ibn_a', num_classes=1000, loss='softmax', pretrained=True
)
body_model.to(DEVICE).eval()
tracker = DeepSort(max_age=3, n_init=3, max_iou_distance=0.3)

# --- 칼만필터 dt 보정 ---
internal_tracker = tracker.tracker
kf = internal_tracker.kf
frame_interval = 10
for i in range(4):
    kf._motion_mat[i, i+4] = frame_interval

# --- 갤러리 및 ID 관리 ---
person_gallery = {}
next_person_id = 1
final_id       = {}
pid_to_canonical_tid = {}

# --- 얼굴 정렬 템플릿 및 함수 ---
TEMPLATE_5PTS = np.array([
    [38.2946, 51.6963], [73.5318, 51.5014], [56.0252, 71.7366],
    [41.5493, 92.3655], [70.7299, 92.2041]
], dtype=np.float32)

def align_face(img, kpts, output_size=(112,112)):
    M, _ = cv2.estimateAffinePartial2D(kpts, TEMPLATE_5PTS, method=cv2.LMEDS)
    return cv2.warpAffine(img, M, output_size, borderValue=0)

# --- 임베딩 헬퍼 ---
def extract_face_emb(img):
    rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    rep = DeepFace.represent(rgb, model_name=face_model_name, enforce_detection=False)
    if not rep:
        return None
    emb = np.array(rep[0]['embedding'], dtype=np.float32)
    return emb / np.linalg.norm(emb)

def extract_body_emb(img):
    rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    t = torch.from_numpy(rgb).permute(2,0,1).unsqueeze(0).float().to(DEVICE)/255.0
    with torch.no_grad():
        feat = body_model(t)
    emb = feat.squeeze(0).cpu().numpy()
    return emb / np.linalg.norm(emb)

def cosine(a, b):
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

def assign_person_id(emb, mode):
    global next_person_id
    pid = f"person_{next_person_id}"
    person_gallery[pid] = {'face': None, 'body': None}
    person_gallery[pid][mode] = emb
    next_person_id += 1
    return pid

# --- 매칭 헬퍼 ---
def match_face(emb, alpha=0.6):
    best_id, best_sim = 'unknown', 0.0
    for pid, embs in person_gallery.items():
        ref = embs.get('face')
        if ref is None: continue
        sim = cosine(emb, ref)
        if sim > best_sim:
            best_id, best_sim = pid, sim
    if best_sim >= FACE_THRESH:
        old = person_gallery[best_id]['face']
        updated = (1-alpha)*old + alpha*emb
        person_gallery[best_id]['face'] = updated / np.linalg.norm(updated)
        return best_id, best_sim
    return assign_person_id(emb, 'face'), 0.0

def match_body(emb, alpha=0.7):
    best_id, best_sim = 'unknown', 0.0
    for pid, embs in person_gallery.items():
        ref = embs.get('body')
        if ref is None: continue
        sim = cosine(emb, ref)
        if sim > best_sim:
            best_id, best_sim = pid, sim
    if best_sim >= BODY_THRESH:
        old = person_gallery[best_id]['body']
        updated = (1-alpha)*old + alpha*emb
        person_gallery[best_id]['body'] = updated / np.linalg.norm(updated)
        return best_id, best_sim
    return assign_person_id(emb, 'body'), 0.0

# --- 검출 결과 로드 ---
with open(DETECTIONS_JSON, 'r') as f:
    dets = json.load(f)

# --- 메인 파이프라인 ---
print('Starting pipeline with dt=', frame_interval)
for idx, fname in enumerate(tqdm(sorted(os.listdir(FRAMES_DIR)), desc='Pipeline')):
    if not fname.lower().endswith('.jpg'): continue
    frame = cv2.imread(os.path.join(FRAMES_DIR, fname))
    if frame is None: continue

    print(f"\n--- Frame {idx+1}: {fname} ---")
    boxes = dets.get(fname, [])
    dets_list = [([x1, y1, x2-x1, y2-y1], conf, 'person') for x1,y1,x2,y2,conf in boxes]
    print(f"Detections: {len(dets_list)}")

    tracks = tracker.update_tracks(dets_list, frame=frame)
    print(f"Updated tracks: {[t.track_id for t in tracks]}")

    curr_ids = {t.track_id for t in tracks}
    for old in list(final_id):
        if old not in curr_ids:
            print(f"Removing track_{old} from final_id mapping")
            final_id.pop(old)

    for t in tracks:
        raw_tid = t.track_id
        l, t_top, r, b = t.to_ltrb()
        x1, y1, x2, y2 = map(int, (l, t_top, r, b))

        # 기본 바운딩 박스와 트랙 ID (파란)
        cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2)
        cv2.putText(frame, f"track_{raw_tid}", (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1)

        roi = frame[y1:y2, x1:x2]

        if raw_tid in final_id:
            pid = final_id[raw_tid]
            print(f"track_{raw_tid} already assigned -> {pid}")
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame, pid, (x1, y1 - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
            continue

        faces = face_detector.detect_faces(roi)
        print(f"  track_{raw_tid}, faces={len(faces)}")
        if faces:
            x_f, y_f, w_f, h_f = faces[0]['box']
            kpts_roi = np.array([faces[0]['keypoints'][k] for k in ['left_eye','right_eye','nose','mouth_left','mouth_right']], dtype=np.float32)
            raw_face = roi[y_f:y_f+h_f, x_f:x_f+w_f]
            kpts_raw = kpts_roi - np.array([x_f, y_f], dtype=np.float32)
            aligned = align_face(raw_face, kpts_raw)

            cv2.imwrite(os.path.join(OUTPUT_FACE_DIR, f"raw_face_{raw_tid}_{fname}"), raw_face)
            cv2.imwrite(os.path.join(OUTPUT_FACE_DIR, f"aligned_{raw_tid}_{fname}"), aligned)

            emb_f = extract_face_emb(aligned)
            if emb_f is not None:
                pid, sim = match_face(emb_f)
                print(f"Assigned FACE: track_{raw_tid} -> {pid}, sim={sim:.4f}")
                if pid not in pid_to_canonical_tid:
                    pid_to_canonical_tid[pid] = raw_tid
                final_id[raw_tid] = pid
                continue

        emb_b = extract_body_emb(roi)
        pid, sim = match_body(emb_b)
        print(f"Assigned BODY: track_{raw_tid} -> {pid}, sim={sim:.4f}")
        if pid not in pid_to_canonical_tid:
            pid_to_canonical_tid[pid] = raw_tid
        final_id[raw_tid] = pid

    out_path = os.path.join(OUTPUT_DIR, fname)
    cv2.imwrite(out_path, frame)
    print(f"Saved {out_path}")

print('Pipeline completed.')

## 성능을 높일 수 있는 방법
- 임베딩 관리 전략의 수정, 얼굴 인식의 정확도를 높이기 위해선 객체가 정면을 보고 있어야 함.
- MTCNN의 경우에는 검출한 얼굴에 대한 보정을 진행하는 기능은 없음.
- 트래킹 하이퍼파라미터의 조정
- 갤러리의 구조 개선(갤러리에서 유사도가 높은 다른 ID와 병합하는 방식을 통해 구조 개선)

## ReID 모델에 사용할 수 있는 모델
| 모델 이름             | 특징                                                 |
| ----------------- | -------------------------------------------------- |
| `osnet_ain_x1_0`  | IBN-A(InPlace−ABN) 기반 변형으로, 색상·조명 변화에 더 강건         |
| `osnet_x1_0`      | 오리지널 OSNet (IBN 없이), IBN 버전보다 살짝 성능 차이가 있을 수 있음    |
| `resnet50_ibn_a`  | ResNet-50 + IBN-A, 대용량 백본으로 강력한 표현력 제공             |
| `resnet101_ibn_a` | ResNet-101 + IBN-A, 더 깊은 네트워크로 추가 성능 향상 가능         |
| `mgn`             | Multi-Granularity Network, global+part 기반 복합 특징 추출 |
| `pcb`             | Part-based Convolutional Baseline, 파트별 세분화된 임베딩    |


## 추가 수정본

In [15]:
import os
import json
import cv2
import numpy as np
import torch
import torchreid
from mtcnn import MTCNN
from deepface import DeepFace
from tqdm import tqdm
from deep_sort_realtime.deepsort_tracker import DeepSort

# --- 설정 ---
FRAMES_DIR       = '/content/drive/MyDrive/capstone_code/frames_and_detections/frames'
DETECTIONS_JSON  = '/content/drive/MyDrive/capstone_code/frames_and_detections/content/detections.json'
OUTPUT_DIR       = '/content/output_pipeline_DeepSORT'
OUTPUT_FACE_DIR  = '/content/output_image'
FACE_THRESH      = 0.6
BODY_THRESH      = 0.7
DEVICE           = 'cuda' if torch.cuda.is_available() else 'cpu'

# --- 디렉터리 생성 ---
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(OUTPUT_FACE_DIR, exist_ok=True)

# --- 모델 및 트래커 초기화 ---
face_detector    = MTCNN()
face_model_name  = 'ArcFace'
body_model       = torchreid.models.build_model(
    name='resnet50_ibn_a', num_classes=1000, loss='softmax', pretrained=True
)
body_model.to(DEVICE).eval()
tracker = DeepSort(max_age=3, n_init=3, max_iou_distance=0.3)

# --- 칼만필터 dt 보정 ---
internal_tracker = tracker.tracker
kf = internal_tracker.kf
frame_interval = 10
for i in range(4):
    kf._motion_mat[i, i+4] = frame_interval

# --- 갤러리 및 ID 관리 ---
person_gallery = {}
next_person_id = 1
final_id       = {}
pid_to_canonical_tid = {}

# --- 얼굴 정렬 템플릿 및 함수 ---
TEMPLATE_5PTS = np.array([
    [38.2946, 51.6963], [73.5318, 51.5014], [56.0252, 71.7366],
    [41.5493, 92.3655], [70.7299, 92.2041]
], dtype=np.float32)

def align_face(img, kpts, output_size=(112,112)):
    M, _ = cv2.estimateAffinePartial2D(kpts, TEMPLATE_5PTS, method=cv2.LMEDS)
    return cv2.warpAffine(img, M, output_size, borderValue=0)

# --- 임베딩 헬퍼 ---
def extract_face_emb(img):
    rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    rep = DeepFace.represent(rgb, model_name=face_model_name, enforce_detection=False)
    if not rep:
        return None
    emb = np.array(rep[0]['embedding'], dtype=np.float32)
    return emb / np.linalg.norm(emb)

def extract_body_emb(img):
    rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    t = torch.from_numpy(rgb).permute(2,0,1).unsqueeze(0).float().to(DEVICE)/255.0
    with torch.no_grad():
        feat = body_model(t)
    emb = feat.squeeze(0).cpu().numpy()
    return emb / np.linalg.norm(emb)

def cosine(a, b):
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

def assign_person_id(emb, mode):
    global next_person_id
    pid = f"person_{next_person_id}"
    person_gallery[pid] = {'face': None, 'body': None}
    person_gallery[pid][mode] = emb
    next_person_id += 1
    return pid

# --- 매칭 헬퍼 ---
def match_face(emb, alpha=0.6):
    best_id, best_sim = 'unknown', 0.0
    for pid, embs in person_gallery.items():
        ref = embs.get('face')
        if ref is None: continue
        sim = cosine(emb, ref)
        if sim > best_sim:
            best_id, best_sim = pid, sim
    if best_sim >= FACE_THRESH:
        old = person_gallery[best_id]['face']
        updated = (1-alpha)*old + alpha*emb
        person_gallery[best_id]['face'] = updated / np.linalg.norm(updated)
        return best_id, best_sim
    return assign_person_id(emb, 'face'), 0.0

def match_body(emb, alpha=0.7):
    best_id, best_sim = 'unknown', 0.0
    for pid, embs in person_gallery.items():
        ref = embs.get('body')
        if ref is None: continue
        sim = cosine(emb, ref)
        if sim > best_sim:
            best_id, best_sim = pid, sim
    if best_sim >= BODY_THRESH:
        old = person_gallery[best_id]['body']
        updated = (1-alpha)*old + alpha*emb
        person_gallery[best_id]['body'] = updated / np.linalg.norm(updated)
        return best_id, best_sim
    return assign_person_id(emb, 'body'), 0.0

# --- 검출 결과 로드 ---
with open(DETECTIONS_JSON, 'r') as f:
    dets = json.load(f)

# --- 메인 파이프라인 ---
print('Starting pipeline with dt=', frame_interval)
for idx, fname in enumerate(tqdm(sorted(os.listdir(FRAMES_DIR)), desc='Pipeline')):
    if not fname.lower().endswith('.jpg'): continue
    frame = cv2.imread(os.path.join(FRAMES_DIR, fname))
    if frame is None: continue

    print(f"\n--- Frame {idx+1}: {fname} ---")
    boxes = dets.get(fname, [])
    dets_list = [([x1, y1, x2-x1, y2-y1], conf, 'person') for x1,y1,x2,y2,conf in boxes]
    print(f"Detections: {len(dets_list)}")

    tracks = tracker.update_tracks(dets_list, frame=frame)
    print(f"Updated tracks: {[t.track_id for t in tracks]}")

    curr_ids = {t.track_id for t in tracks}
    for old in list(final_id):
        if old not in curr_ids:
            print(f"Removing track_{old} from final_id mapping")
            final_id.pop(old)

    for t in tracks:
        raw_tid = t.track_id
        l, t_top, r, b = t.to_ltrb()
        x1, y1, x2, y2 = map(int, (l, t_top, r, b))

        # 기본 바운딩 박스와 트랙 ID (파란)
        cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2)
        cv2.putText(frame, f"track_{raw_tid}", (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1)

        roi = frame[y1:y2, x1:x2]

        if raw_tid in final_id:
            pid = final_id[raw_tid]
            print(f"track_{raw_tid} already assigned -> {pid}")
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame, pid, (x1, y1 - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
            continue

        faces = face_detector.detect_faces(roi)
        print(f"  track_{raw_tid}, faces={len(faces)}")
        if faces:
            x_f, y_f, w_f, h_f = faces[0]['box']
            kpts_roi = np.array([faces[0]['keypoints'][k] for k in ['left_eye','right_eye','nose','mouth_left','mouth_right']], dtype=np.float32)
            raw_face = roi[y_f:y_f+h_f, x_f:x_f+w_f]
            kpts_raw = kpts_roi - np.array([x_f, y_f], dtype=np.float32)
            aligned = align_face(raw_face, kpts_raw)

            cv2.imwrite(os.path.join(OUTPUT_FACE_DIR, f"raw_face_{raw_tid}_{fname}"), raw_face)
            cv2.imwrite(os.path.join(OUTPUT_FACE_DIR, f"aligned_{raw_tid}_{fname}"), aligned)

            emb_f = extract_face_emb(aligned)
            if emb_f is not None:
                pid, sim = match_face(emb_f)
                print(f"Assigned FACE: track_{raw_tid} -> {pid}, sim={sim:.4f}")
                if pid not in pid_to_canonical_tid:
                    pid_to_canonical_tid[pid] = raw_tid
                final_id[raw_tid] = pid
                continue

        emb_b = extract_body_emb(roi)
        pid, sim = match_body(emb_b)
        print(f"Assigned BODY: track_{raw_tid} -> {pid}, sim={sim:.4f}")
        if pid not in pid_to_canonical_tid:
            pid_to_canonical_tid[pid] = raw_tid
        final_id[raw_tid] = pid

    out_path = os.path.join(OUTPUT_DIR, fname)
    cv2.imwrite(out_path, frame)
    print(f"Saved {out_path}")

print('Pipeline completed.')

Starting pipeline with dt= 10


Pipeline: 100%|██████████| 303/303 [05:04<00:00,  1.00s/it]

Pipeline completed.





# 최종 결과를 영상으로 제작

In [8]:
import cv2
import os

FRAMES_DIR = '/content/output_pipeline_DeepSORT'  # Re-ID 후 이미지들이 저장된 폴더
VIDEO_PATH = '/content/result_DeepSORT.mp4'   # 최종 비디오 경로
FPS = 10                             # 원본 영상과 동일하게 설정하세요

# 프레임 파일명 정렬
frame_files = sorted([f for f in os.listdir(FRAMES_DIR) if f.endswith('.jpg')])
if not frame_files:
    raise RuntimeError("재식별된 프레임 이미지가 없습니다.")

# 첫 프레임으로부터 프레임 크기(해상도) 가져오기
first = cv2.imread(os.path.join(FRAMES_DIR, frame_files[0]))
h, w = first.shape[:2]

# VideoWriter 초기화 (코덱: MP4용 H.264)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter(VIDEO_PATH, fourcc, FPS, (w, h))

# 모든 프레임을 차례로 쓰기
for fn in frame_files:
    img = cv2.imread(os.path.join(FRAMES_DIR, fn))
    writer.write(img)

writer.release()
print(f"✅ 비디오 저장 완료: {VIDEO_PATH}")


✅ 비디오 저장 완료: /content/result_DeepSORT.mp4
