<a href="https://colab.research.google.com/github/01star01ek/01star01ek/blob/main/%EC%9D%B4%EB%AF%B8%EC%A7%80swimswap320.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# SimSwap
Reference: https://github.com/neuralchen/SimSwap

In [None]:
%cd /content
!git clone https://github.com/woctezuma/SimSwap.git
%cd /content/SimSwap/
!git checkout upgrade-insightface

In [None]:
# PyTorch + TorchVision + NumPy 안정 조합 설치
!pip install torch==2.0.1 torchvision==0.15.2 torchaudio==2.0.2 --index-url https://download.pytorch.org/whl/cu118
!pip install numpy==1.24.4


In [None]:
!pip install insightface==0.7.3 onnxruntime moviepy imageio==2.34.0 > /dev/null

In [None]:
!unzip ./checkpoints.zip  -d ./checkpoints

!unzip 512.zip -d ./checkpoints

!unzip antelope.zip -d ./insightface_func/models/

In [None]:
!sed -i 's/det_size=(640,640)/det_size=(320,320)/g' /content/SimSwap/test_wholeimage_swapsingle.py

In [None]:
import os
import cv2
import numpy as np
from PIL import Image
import insightface
import subprocess
import sys
from pathlib import Path
import shutil
import time

os.makedirs('/content/swapped_frames', exist_ok=True)

class VideoFaceSwapProcessor:
    def __init__(self, base_path="/content"):
        self.base_path = Path(base_path)
        self.simswap_path = self.base_path / "SimSwap"
        self.source_face_path = self.base_path / "source.png"  # 합성할 얼굴 이미지
        self.input_video_path = self.base_path / "target.mp4"  # 입력 영상
        self.output_path = self.base_path / "output"
        self.frames_path = self.base_path / "frames"
        self.swapped_frames_path = self.base_path / "swapped_frames"

        # 얼굴 검출기는 한 번만 초기화
        self.face_detector = None

    def _init_face_detector(self):
        """얼굴 검출기 초기화 (lazy loading)"""
        if self.face_detector is None:
            self.face_detector = insightface.app.FaceAnalysis(name='buffalo_l')
            self.face_detector.prepare(ctx_id=0, det_size=(320, 320))

    def setup_environment(self):
        """환경 설정 및 의존성 설치"""
        print("🔧 환경 설정 중...")

        if self.simswap_path.exists():
            print("✅ SimSwap 이미 설치됨")
            return

        os.chdir(self.base_path)
        subprocess.run([
            "git", "clone", "https://github.com/woctezuma/SimSwap.git"
        ], check=True, stdout=subprocess.DEVNULL)

        os.chdir(self.simswap_path)
        subprocess.run([
            "git", "checkout", "upgrade-insightface"
        ], check=True, stdout=subprocess.DEVNULL)

        print("✅ 환경 설정 완료")

    def install_dependencies(self):
        """의존성 설치"""
        print("📦 의존성 설치 중...")

        packages = [
            "torch==2.0.1", "torchvision==0.15.2", "torchaudio==2.0.2",
            "--index-url", "https://download.pytorch.org/whl/cu118",
            "numpy==1.24.4", "insightface==0.7.3", "onnxruntime",
            "moviepy", "imageio==2.34.0", "opencv-python"
        ]

        subprocess.run([sys.executable, "-m", "pip", "install"] + packages,
                      check=True, stdout=subprocess.DEVNULL)
        print("✅ 의존성 설치 완료")

    def download_models(self):
        """필요한 모델 파일 다운로드"""
        print("📥 모델 파일 확인 중...")

        os.chdir(self.simswap_path)

        required_files = [
            "./checkpoints/people",
            "./arcface_model/arcface_checkpoint.tar",
            "./parsing_model/checkpoint/79999_iter.pth",
            "./insightface_func/models/antelope"
        ]

        missing_files = [f for f in required_files if not Path(f).exists()]

        if not missing_files:
            print("✅ 모든 모델 파일 존재")
            return

        print("📥 누락된 모델 다운로드 중...")

        (self.simswap_path / "arcface_model").mkdir(exist_ok=True)
        (self.simswap_path / "parsing_model" / "checkpoint").mkdir(parents=True, exist_ok=True)

        downloads = [
            ("https://github.com/woctezuma/SimSwap-colab/releases/download/antelope/antelope.zip", "antelope.zip"),
            ("https://github.com/woctezuma/SimSwap-colab/releases/download/1.0/arcface_checkpoint.tar", "arcface_model/arcface_checkpoint.tar"),
            ("https://github.com/neuralchen/SimSwap/releases/download/1.0/checkpoints.zip", "checkpoints.zip"),
            ("https://github.com/neuralchen/SimSwap/releases/download/1.0/79999_iter.pth", "parsing_model/checkpoint/79999_iter.pth"),
            ("https://github.com/neuralchen/SimSwap/releases/download/512_beta/512.zip", "512.zip")
        ]

        for url, local_path in downloads:
            if not Path(local_path).exists():
                subprocess.run(["wget", "-q", url, "-O", local_path], check=True)

        if not Path("./checkpoints/people").exists():
            subprocess.run(["unzip", "-q", "checkpoints.zip", "-d", "checkpoints"], check=True)

        if not Path("./checkpoints/512_beta").exists():
            subprocess.run(["unzip", "-q", "512.zip", "-d", "checkpoints"], check=True)

        if not Path("./insightface_func/models/antelope").exists():
            subprocess.run(["unzip", "-q", "antelope.zip", "-d", "insightface_func/models/"], check=True)

        print("✅ 모델 다운로드 완료")

    def extract_frames_from_video(self, frame_interval=5, max_frames=10):
        """영상에서 지정된 간격으로 프레임 추출 - 최대 프레임 수 제한 추가"""
        print(f"🎬 영상에서 {frame_interval}프레임 간격으로 추출 중... (최대 {max_frames}개)")

        if not self.input_video_path.exists():
            print(f"❌ 입력 영상이 없습니다: {self.input_video_path}")
            return [], 30, 1920, 1080

        # 프레임 저장 디렉토리 생성
        self.frames_path.mkdir(exist_ok=True)

        # 기존 프레임 삭제
        for frame_file in self.frames_path.glob("*.jpg"):
            frame_file.unlink()

        cap = cv2.VideoCapture(str(self.input_video_path))

        if not cap.isOpened():
            print("❌ 영상을 열 수 없습니다")
            return [], 30, 1920, 1080

        # 영상 정보 가져오기
        fps = cap.get(cv2.CAP_PROP_FPS)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

        print(f"📹 영상 정보: {width}x{height}, {fps:.1f}fps, {total_frames}프레임")

        frame_count = 0
        extracted_count = 0
        extracted_frames = []

        while extracted_count < max_frames:  # 최대 프레임 수 제한
            ret, frame = cap.read()
            if not ret:
                break

            # 지정된 간격으로 프레임 저장
            if frame_count % frame_interval == 0:
                frame_filename = self.frames_path / f"frame_{extracted_count:06d}.jpg"
                cv2.imwrite(str(frame_filename), frame)
                extracted_frames.append(frame_filename)
                extracted_count += 1

            frame_count += 1

        cap.release()

        print(f"✅ {extracted_count}개 프레임 추출 완료")
        return extracted_frames, fps, width, height

    def preprocess_source_image(self, max_size=1024):
        """소스 얼굴 이미지 전처리"""
        print("🖼️ 소스 이미지 전처리 중...")

        if not self.source_face_path.exists():
            print(f"❌ 소스 이미지가 없습니다: {self.source_face_path}")
            return False

        jpg_path = self.source_face_path.with_suffix('.jpg')

        with Image.open(self.source_face_path) as img:
            if max(img.size) > max_size:
                ratio = max_size / max(img.size)
                new_size = tuple(int(ratio * sz) for sz in img.size)
                img = img.resize(new_size, Image.Resampling.LANCZOS)

            if img.mode != 'RGB':
                img = img.convert('RGB')

            img.save(jpg_path, 'JPEG', quality=95, optimize=True)

        self.source_jpg = jpg_path
        print(f"✅ 소스 이미지 전처리 완료: {jpg_path}")
        return True

    def detect_faces(self, img_path):
        """얼굴 검출"""
        self._init_face_detector()

        img = cv2.imread(str(img_path))
        if img is None:
            return []

        return self.face_detector.get(img)

    def validate_source_face(self):
        """소스 얼굴 이미지 유효성 검사"""
        print("👤 소스 얼굴 검증 중...")

        if not self.source_jpg.exists():
            print("❌ 소스 이미지 파일이 없습니다")
            return False

        faces = self.detect_faces(self.source_jpg)
        if not faces:
            print("❌ 소스 이미지에서 얼굴을 찾을 수 없습니다")
            return False

        print(f"✅ 소스에서 {len(faces)}개 얼굴 발견")
        return True

    def debug_simswap_output(self, frame_file):
        """SimSwap 출력 디버깅용 함수"""
        print(f"🔍 디버깅: {frame_file.name}")

        # 처리 전 디렉토리 상태
        before_files = set(self.swapped_frames_path.glob("*"))
        print(f"처리 전 파일 수: {len(before_files)}")

        cmd = [
            "python", "test_wholeimage_swapsingle.py",
            "--no_simswaplogo",
            "--use_mask",
            "--crop_size", "224",  # 더 작은 크기로 빠르게 테스트
            "--name", "people",
            "--Arc_path", "arcface_model/arcface_checkpoint.tar",
            "--pic_a_path", str(self.source_jpg),
            "--pic_b_path", str(frame_file),
            "--output_path", str(self.swapped_frames_path),
            "--gpu_ids", "0"
        ]

        print(f"실행 명령: {' '.join(cmd)}")

        # 타임아웃 설정으로 실행
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
        except subprocess.TimeoutExpired:
            print("⏰ 타임아웃 발생 (60초)")
            return set()

        print(f"Return code: {result.returncode}")
        if result.stdout:
            print(f"Stdout: {result.stdout}")
        if result.stderr:
            print(f"Stderr: {result.stderr}")

        # 처리 후 디렉토리 상태
        after_files = set(self.swapped_frames_path.glob("*"))
        new_files = after_files - before_files

        print(f"처리 후 파일 수: {len(after_files)}")
        print(f"새로 생성된 파일: {[f.name for f in new_files]}")

        return new_files

    def swap_faces_batch(self, frame_files, timeout_per_frame=120):
        """배치로 얼굴 교체 실행 - 타임아웃 및 최적화 추가"""
        print(f"🔄 {len(frame_files)}개 프레임 얼굴 교체 중...")

        os.chdir(self.simswap_path)

        # 출력 디렉토리 생성
        self.swapped_frames_path.mkdir(exist_ok=True)

        # 기존 합성 이미지 삭제
        for swapped_file in self.swapped_frames_path.glob("*.jpg"):
            swapped_file.unlink()

        success_count = 0

        for i, frame_file in enumerate(frame_files):
            start_time = time.time()
            print(f"🔄 처리 중: {i+1}/{len(frame_files)} - {frame_file.name}")

            # 개별 프레임에 얼굴 교체
            output_file = self.swapped_frames_path / f"swapped_{i:06d}.jpg"

            cmd = [
                "python", "test_wholeimage_swapsingle.py",
                "--no_simswaplogo",
                "--use_mask",
                "--crop_size", "224",  # 더 작은 크기로 빠르게 처리
                "--name", "people",
                "--Arc_path", "arcface_model/arcface_checkpoint.tar",
                "--pic_a_path", str(self.source_jpg),
                "--pic_b_path", str(frame_file),
                "--output_path", str(self.swapped_frames_path),
                "--gpu_ids", "0"
            ]

            try:
                # 타임아웃 설정
                result = subprocess.run(cmd, capture_output=True, text=True,
                                      check=False, timeout=timeout_per_frame)

                elapsed_time = time.time() - start_time
                print(f"⏱️ 처리 시간: {elapsed_time:.1f}초")

                if result.returncode != 0:
                    print(f"❌ SimSwap 실행 실패: {frame_file.name}")
                    if result.stderr:
                        print(f"Error: {result.stderr[:200]}...")  # 에러 메시지 축약
                    continue

                # SimSwap 출력 파일 찾기
                result_files = list(self.swapped_frames_path.glob("*.jpg"))
                if result_files:
                    # 가장 최근에 생성된 파일 선택
                    latest_file = max(result_files, key=lambda f: f.stat().st_mtime)
                    if latest_file != output_file:
                        latest_file.rename(output_file)
                    success_count += 1
                    print(f"✅ 완료: {output_file.name}")
                else:
                    print(f"⚠️ 결과 파일을 찾을 수 없음: {frame_file.name}")

            except subprocess.TimeoutExpired:
                print(f"⏰ 타임아웃 ({timeout_per_frame}초): {frame_file.name}")
                continue
            except Exception as e:
                print(f"❌ 실패: {frame_file.name} - {str(e)}")
                continue

        print(f"✅ 얼굴 교체 완료: {success_count}/{len(frame_files)}개 성공")
        return success_count > 0

    def create_video_from_frames(self, fps=30.0, width=1920, height=1080):
        """합성된 프레임들로 영상 생성"""
        print("🎬 합성 이미지들로 영상 생성 중...")

        # 합성된 프레임 파일들 가져오기
        swapped_files = sorted(self.swapped_frames_path.glob("swapped_*.jpg"))

        if not swapped_files:
            print("❌ 합성된 프레임이 없습니다")
            return False

        # 출력 영상 경로
        output_video = self.output_path / "face_swapped_video.mp4"
        self.output_path.mkdir(exist_ok=True)

        # VideoWriter 설정
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(str(output_video), fourcc, fps, (width, height))

        if not out.isOpened():
            print("❌ VideoWriter를 열 수 없습니다")
            return False

        for i, frame_file in enumerate(swapped_files):
            img = cv2.imread(str(frame_file))
            if img is None:
                print(f"⚠️ 이미지를 읽을 수 없음: {frame_file}")
                continue

            # 크기 조정 (필요한 경우)
            if img.shape[:2] != (height, width):
                img = cv2.resize(img, (width, height))

            out.write(img)

            if (i + 1) % 5 == 0:
                print(f"📹 진행: {i+1}/{len(swapped_files)} 프레임 처리됨")

        out.release()

        print(f"🎉 영상 생성 완료: {output_video}")
        return True

    def cleanup_temp_files(self):
        """임시 파일 정리"""
        print("🧹 임시 파일 정리 중...")

        # SimSwap 압축 파일들
        temp_files = ["checkpoints.zip", "512.zip", "antelope.zip"]
        for temp_file in temp_files:
            temp_path = self.simswap_path / temp_file
            if temp_path.exists():
                temp_path.unlink()

        print("✅ 정리 완료")

    def process_video(self, frame_interval=5, max_frames=10, install_deps=False,
                     download_models=True, cleanup=True, debug_mode=False):
        """전체 영상 처리 과정 - 최적화 및 디버깅 옵션 추가"""
        try:
            print("🚀 영상 얼굴 교체 시작!")

            # 환경 설정
            if not self.simswap_path.exists():
                self.setup_environment()

            if install_deps:
                self.install_dependencies()

            if download_models:
                self.download_models()

            # 1. 소스 이미지 전처리
            if not self.preprocess_source_image():
                return False

            # 2. 소스 얼굴 검증
            if not self.validate_source_face():
                return False

            # 3. 영상에서 프레임 추출 (제한된 수량)
            frame_files, fps, width, height = self.extract_frames_from_video(
                frame_interval, max_frames)
            if not frame_files:
                return False

            # 디버깅 모드인 경우 첫 번째 프레임만 테스트
            if debug_mode and frame_files:
                print("🔍 디버깅 모드: 첫 번째 프레임만 테스트")
                new_files = self.debug_simswap_output(frame_files[0])
                return len(new_files) > 0

            # 4. 배치 얼굴 교체
            if not self.swap_faces_batch(frame_files):
                return False

            # 5. 새 영상 생성
            if not self.create_video_from_frames(fps, width, height):
                return False

            # 6. 정리
            if cleanup:
                self.cleanup_temp_files()

            print("🎉 모든 과정 완료!")
            print(f"📁 결과 영상: {self.output_path}/face_swapped_video.mp4")
            return True

        except Exception as e:
            print(f"❌ 오류 발생: {e}")
            import traceback
            traceback.print_exc()
            return False

# 사용 예시
if __name__ == "__main__":
    processor = VideoFaceSwapProcessor()

    # 디버깅 모드로 먼저 테스트
    print("=== 디버깅 모드 테스트 ===")
    processor.process_video(debug_mode=True)

    # 정상 모드 (소량 프레임으로 테스트)
    print("\n=== 소량 프레임 테스트 ===")
    processor.process_video(
        frame_interval=5,       # 10프레임 간격으로 추출
        max_frames=300,           # 최대 5개 프레임만
        install_deps=False,     # 최초 실행시만 True
        download_models=True,   # 모델 다운로드 여부
        cleanup=True           # 임시 파일 정리 여부
    )

## Prepare code

In [None]:
%cd /content/SimSwap

## Prepare models

In [None]:
!wget https://github.com/woctezuma/SimSwap-colab/releases/download/antelope/antelope.zip
!wget -P ./arcface_model https://github.com/woctezuma/SimSwap-colab/releases/download/1.0/arcface_checkpoint.tar
!wget https://github.com/neuralchen/SimSwap/releases/download/1.0/checkpoints.zip
!wget -P ./parsing_model/checkpoint https://github.com/neuralchen/SimSwap/releases/download/1.0/79999_iter.pth
!wget https://github.com/neuralchen/SimSwap/releases/download/512_beta/512.zip

## Prepare data

### Download

In [None]:
input_fname = '/content/source.png'
output_fname = '/content/target.png'

### Convert to JPG

Images should not be too large, hence the (arbitrary) limitation of 1024 length.

In [None]:
def get_new_size(img_size,
                 max_allowed_length = 1024):

  if any(max_allowed_length < sz for sz in img_size):
    long_length = max(img_size)
    ratio = max_allowed_length / long_length
  else:
    ratio = 1.0

  new_img_size = [
                  int(ratio*sz)
                  for sz in img_size
                  ]

  return tuple(new_img_size)

In [None]:
from PIL import Image

allow_resize = False

for fname in [input_fname, output_fname]:
  jpg_fname = fname.replace('.png', '.jpg')

  try:
    img = Image.open(fname)
  except FileNotFoundError:
    continue

  new_size = get_new_size(img.size, max_allowed_length = 1024)
  if allow_resize:
    print(f'Resizing from {img.size} to {new_size}')
    img.resize(new_size)

  print(f'Saving to {jpg_fname}')
  img.convert('RGB').save(jpg_fname)

jpg_input = input_fname.replace('.png', '.jpg')
jpg_output = output_fname.replace('.png', '.jpg')

In [None]:
from PIL import Image
import cv2
import numpy as np
import insightface

# SCRFD + ArcFace 모델 로드 (더 강력함)
face_detector = insightface.app.FaceAnalysis(name='buffalo_l')  # ← 기존 'antelope'보다 좋음
face_detector.prepare(ctx_id=0, det_size=(320, 320))

img_path = '/content/source.png'
img = cv2.imread(img_path)

faces = face_detector.get(img)

if len(faces) > 0:
    face = faces[0]
    x1, y1, x2, y2 = face.bbox.astype(int)
    face_crop = img[y1:y2, x1:x2]
    cropped_path = '/content/cpc_ackboo_crop.jpg'
    cv2.imwrite(cropped_path, face_crop)
    print(f'✅ 얼굴 감지 성공! → {cropped_path}')
else:
    print('❌ 얼굴을 찾지 못했습니다.')


In [None]:
import cv2
import insightface

face_detector = insightface.app.FaceAnalysis(name='buffalo_l')
face_detector.prepare(ctx_id=0, det_size=(640, 640))

img = cv2.imread(jpg_output)  # ← 타겟 이미지
faces = face_detector.get(img)

if not faces:
    print("❌ 타겟 이미지에서 얼굴 인식 실패 → SimSwap 실패 원인")
else:
    print(f"✅ 타겟에서 얼굴 {len(faces)}개 인식됨")


In [None]:
input_fname = '/content/cpc_ackboo_crop.jpg'

## Run

### Single

In [None]:
cd /content/SimSwap

In [None]:
%cd /content/SimSwap
%mkdir -p /content/output/single/

!python test_wholeimage_swapsingle.py \
 --no_simswaplogo \
 --use_mask \
 --crop_size 512 \
 --isTrain false \
 --name people \
 --Arc_path arcface_model/arcface_checkpoint.tar \
 --pic_a_path {jpg_input} \
 --pic_b_path {jpg_output} \
 --output_path /content/output/single/ \
 --gpu_ids 0 > /dev/null


### Multi

In [None]:
%cd /content/SimSwap
%mkdir -p /content/output/multi/

!python test_wholeimage_swapmulti.py \
 --no_simswaplogo \
 --use_mask \
 --crop_size 512 \
 --isTrain false \
 --name people \
 --Arc_path arcface_model/arcface_checkpoint.tar \
 --pic_a_path {jpg_input} \
 --pic_b_path {jpg_output} \
 --output_path /content/output/multi/ \
 --gpu_ids 0 > /dev/null


In [None]:
import os
import insightface

# 현재 디렉토리 확인
print("현재 위치:", os.getcwd())

# insightface_func/models 디렉토리로 이동
os.chdir('./insightface_func/models/')
print("변경된 위치:", os.getcwd())

# buffalo_l 모델 다운로드
print("buffalo_l 모델 다운로드 중...")
app = insightface.app.FaceAnalysis(name='buffalo_l', root='./')
app.prepare(ctx_id=-1)  # CPU 모드로 준비 (다운로드만 목적)

print("✅ buffalo_l 다운로드 완료!")

# 다운로드된 파일 확인
print("\n다운로드된 파일들:")
!ls -la ./buffalo_l/

# 원래 디렉토리로 돌아가기
os.chdir('../../')
print("원래 위치로 복귀:", os.getcwd())

In [None]:
ls -la ./insightface_func/models/

In [None]:
%cd /content/SimSwap
!head -70 test_wholeimage_swapsingle.py | grep -A 10 -B 10 "Face_detect_crop\|app.get"