# üé• Real-Time Face Swapping for Google Colab

**Real-time face swapping from webcam** with optimized performance (15-20 FPS target)

üì¶ **Repository**: [SmashCodeJJ/CIS5810_FinalProject](https://github.com/SmashCodeJJ/CIS5810_FinalProject)  
üîß **Branch**: `Youxin` (real-time implementation)

---

## ‚ö†Ô∏è IMPORTANT: Setup Process

1. ‚úÖ **Enable GPU**: Runtime ‚Üí Change runtime type ‚Üí GPU (T4)
2. ‚úÖ **Run Installation Cell** ‚Üí Installs dependencies
3. üîÑ **Restart Runtime** ‚Üí Click "Runtime ‚Üí Restart runtime"
4. ‚ñ∂Ô∏è **Run Real-Time Cells** ‚Üí Start face swapping

**Do NOT skip the runtime restart!**


## Step 1: Installation (Run once, then restart runtime)

After this cell completes, go to: **Runtime ‚Üí Restart runtime**


In [None]:
# Clone repository with real-time implementation
!git clone -b Youxin https://github.com/SmashCodeJJ/CIS5810_FinalProject.git sber-swap
%cd sber-swap

# Install dependencies
%pip install -q -r requirements.txt

# Download models (if needed)
import os
if not os.path.exists('weights/G_unet_2blocks.pth'):
    print("Downloading models...")
    !bash download_models.sh 2>/dev/null || echo "Models should be downloaded separately"

print("\n" + "="*50)
print("‚úÖ Installation complete!")
print("="*50)
print("‚ö†Ô∏è  IMPORTANT: Go to Runtime ‚Üí Restart runtime")
print("    Then skip this cell and run the cells below.")
print("="*50)


## Step 2: Verify Installation (Run after restart)


In [None]:
# Change to project directory
%cd /content/sber-swap

# Verify imports
import torch
import numpy as np
import cv2
import onnxruntime as ort
import insightface

print("="*50)
print("‚úÖ Environment Verified")
print("="*50)
print(f"PyTorch version: {torch.__version__}")
print(f"NumPy version: {np.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"ONNX Runtime version: {ort.__version__}")
print(f"InsightFace version: {insightface.__version__}")

if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print("üöÄ GPU acceleration enabled!")
else:
    print("‚ö†Ô∏è  Running on CPU (slower)")
print("="*50)


## Step 3: Real-Time Face Swapping with Webcam

### Upload Source Face Image

First, upload the face image you want to swap TO (the face that will appear on the webcam feed)


In [None]:
from google.colab import files
from IPython.display import Image, display
import os

# Create directory for uploads
!mkdir -p /content/sber-swap/examples/my_images

# Upload source face image
print("Upload SOURCE face image (the face to swap onto the webcam):")
uploaded = files.upload()

source_path = None
for filename in uploaded.keys():
    source_path = f"/content/sber-swap/examples/my_images/{filename}"
    !mv "{filename}" "{source_path}"
    print(f"‚úÖ Source image saved to: {source_path}")
    display(Image(source_path))
    break

if source_path is None:
    print("‚ö†Ô∏è  Using default source image")
    source_path = "/content/sber-swap/examples/images/mark.jpg"


### JavaScript Webcam Capture Function

Colab doesn't support direct webcam access, so we use JavaScript to capture frames.


In [None]:
import base64
import io
from PIL import Image
import numpy as np
import cv2
from IPython.display import display, HTML, Javascript
from google.colab.output import eval_js

def take_photo():
    """Capture photo from webcam using JavaScript"""
    # Use eval_js directly with JavaScript string
    js_code = '''
    async function takePhoto() {
        const video = document.createElement('video');
        const stream = await navigator.mediaDevices.getUserMedia({video: true});
        video.srcObject = stream;
        video.play();
        
        await new Promise(resolve => {
            video.onloadedmetadata = () => {
                video.setAttribute('width', video.videoWidth);
                video.setAttribute('height', video.videoHeight);
                resolve();
            }
        });
        
        const canvas = document.createElement('canvas');
        canvas.width = video.videoWidth;
        canvas.height = video.videoHeight;
        const ctx = canvas.getContext('2d');
        ctx.drawImage(video, 0, 0);
        
        video.srcObject.getTracks().forEach(track => track.stop());
        return canvas.toDataURL('image/jpeg', 0.95);
    }
    '''
    
    # Execute JavaScript and get result
    data = eval_js(js_code + 'takePhoto()')
    
    # Decode base64 image
    image_bytes = base64.b64decode(data.split(',')[1])
    image = Image.open(io.BytesIO(image_bytes))
    
    # Convert to OpenCV format
    frame = np.array(image)
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
    
    return frame

print("‚úÖ Webcam capture function ready!")


In [None]:
import sys
import torch
import time
import numpy as np
import cv2
from IPython.display import display, Image as IPImage  # Use alias to avoid conflicts

# Add to path
sys.path.insert(0, '/content/sber-swap')

# Import real-time modules
from inference_realtime import init_models, load_source_face
from utils.realtime.face_tracker import FaceTracker
from utils.inference.realtime_processing import process_single_frame
from utils.realtime.performance_monitor import PerformanceMonitor

# Initialize models (only once)
print("Loading models... This may take a minute...")

class Args:
    def __init__(self):
        self.G_path = 'weights/G_unet_2blocks.pth'
        self.backbone = 'unet'
        self.num_blocks = 2
        self.fast_mode = True
        self.crop_size = 224
        self.detect_interval = 5
        self.tracker_type = 'CSRT'

args = Args()
app, G, netArc, handler = init_models(args)

# Load source face
source_embed = load_source_face(source_path, app, netArc, args.crop_size)

# Initialize tracker
tracker = FaceTracker(
    detector=app,
    detect_interval=args.detect_interval,
    tracker_type=args.tracker_type,
    confidence_threshold=0.6
)

# Initialize performance monitor
monitor = PerformanceMonitor(window_size=30)

print("\n‚úÖ Models loaded! Ready for real-time face swapping.")
print("\nüì∏ Click the cell below multiple times to capture and process frames.")


### Real-Time Processing (Continuous Loop) ‚≠ê

**What this does:**
- Continuously captures frames from your webcam in a loop
- Processes each frame with face swapping in real-time
- Displays the swapped results continuously
- Runs until you stop it (Ctrl+C or Stop button)

**Note**: Colab doesn't support true continuous webcam streaming. This solution:
- Automatically captures frames in a loop (one by one)
- Processes each frame with face swapping
- Updates display continuously with clear_output()
- Shows FPS and performance metrics

**Limitation**: Each frame requires camera permission, so there's a brief pause between frames (~0.5-1 second). This is a Colab browser security limitation.


In [None]:
"""
REAL-TIME PROCESSING LOOP
=========================
This cell implements continuous face swapping from webcam.
It captures frames automatically and processes them in a loop.

How it works:
1. Captures frame from webcam (requires permission each time)
2. Detects/tracks face in frame
3. Applies face swap using source face
4. Displays result with performance stats
5. Repeats until stopped

To stop: Click Stop button or press Ctrl+C in cell output
"""
# Continuous Real-Time Processing Loop
# This will keep capturing frames until you interrupt (Ctrl+C or Stop button)

print("üîÑ Starting continuous face swapping...")
print("üì∏ This will continuously capture and process frames")
print("‚ö†Ô∏è  Click Stop (or press Ctrl+C) to stop the loop")
print("="*60)

try:
    frame_count = 0
    max_frames = 100  # Maximum frames to process (adjust as needed)
    
    while frame_count < max_frames:
        # Capture frame from webcam
        print(f"\nüì∏ Capturing frame {frame_count + 1}... (Please allow camera access)")
        frame = take_photo()
        
        # Start monitoring
        monitor.start_frame()
        
        # Update tracker
        bbox = tracker.update(frame)
        
        # Process frame
        det_time = 0
        gen_time = 0
        result = None
        
        if bbox is not None:
            result, det_time, gen_time = process_single_frame(
                frame=frame,
                source_embed=source_embed,
                netArc=netArc,
                G=G,
                app=app,
                handler=handler,
                bbox=bbox,
                crop_size=args.crop_size,
                half=True
            )
        
        # Record metrics
        monitor.record_detection_time(det_time)
        monitor.record_generator_time(gen_time)
        total_time = (time.time() - monitor.frame_start_time) * 1000 if monitor.frame_start_time else 0
        monitor.record_processing_time(total_time)
        monitor.end_frame()
        
        # Display result
        if result is not None:
            # Draw bbox
            x, y, w, h = bbox
            cv2.rectangle(result, (x, y), (x+w, y+h), (0, 255, 0), 2)
            
            # Add text with stats
            stats = monitor.get_stats()
            cv2.putText(result, f"FPS: {stats['fps']:.1f}", (10, 30),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            cv2.putText(result, f"Latency: {stats['avg_latency_ms']:.1f}ms", (10, 60),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
            cv2.putText(result, f"Frame: {frame_count + 1}/{max_frames}", (10, 90),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
            
            display_frame = result
        else:
            display_frame = frame.copy()
            cv2.putText(display_frame, "No face detected", (10, 30),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
        
        # Save and display
        output_path = '/content/sber-swap/examples/results/realtime_frame.jpg'
        cv2.imwrite(output_path, display_frame)
        
        # Clear previous output and display new frame
        from IPython.display import clear_output
        clear_output(wait=True)
        
        # Display result
        display(IPImage(output_path))
        
        # Print stats
        stats = monitor.get_stats()
        print(f"‚úÖ Frame {frame_count + 1} | FPS: {stats['fps']:.1f} | "
              f"Latency: {stats['avg_latency_ms']:.1f}ms | "
              f"Detection: {stats['avg_detection_ms']:.1f}ms | "
              f"Generator: {stats['avg_generator_ms']:.1f}ms")
        
        frame_count += 1
        
        # Small delay to avoid overwhelming the system
        time.sleep(0.1)
    
    print(f"\n‚úÖ Processed {frame_count} frames!")
    print("üìä Final Statistics:")
    final_stats = monitor.get_stats()
    print(f"   Average FPS: {final_stats['fps']:.2f}")
    print(f"   Average Latency: {final_stats['avg_latency_ms']:.1f}ms")
    
except KeyboardInterrupt:
    print("\n\n‚èπÔ∏è  Stopped by user")
    final_stats = monitor.get_stats()
    print(f"\nüìä Processed {frame_count} frames")
    print(f"   Final FPS: {final_stats['fps']:.2f}")
    print(f"   Final Latency: {final_stats['avg_latency_ms']:.1f}ms")
    
except Exception as e:
    print(f"\n‚ùå Error: {e}")
    import traceback
    traceback.print_exc()


### Alternative: Single Frame Capture (If Loop Doesn't Work)

If the continuous loop has issues, use this cell to capture one frame at a time:


In [None]:
# Single frame capture (click this cell multiple times for multiple frames)
try:
    # Capture frame from webcam
    frame = take_photo()
    
    # Start monitoring
    monitor.start_frame()
    
    # Update tracker
    bbox = tracker.update(frame)
    
    # Process frame
    det_time = 0
    gen_time = 0
    result = None
    
    if bbox is not None:
        result, det_time, gen_time = process_single_frame(
            frame=frame,
            source_embed=source_embed,
            netArc=netArc,
            G=G,
            app=app,
            handler=handler,
            bbox=bbox,
            crop_size=args.crop_size,
            half=True
        )
    
    # Record metrics
    monitor.record_detection_time(det_time)
    monitor.record_generator_time(gen_time)
    total_time = (time.time() - monitor.frame_start_time) * 1000 if monitor.frame_start_time else 0
    monitor.record_processing_time(total_time)
    monitor.end_frame()
    
    # Display result
    if result is not None:
        # Draw bbox
        x, y, w, h = bbox
        cv2.rectangle(result, (x, y), (x+w, y+h), (0, 255, 0), 2)
        
        # Add text with stats
        stats = monitor.get_stats()
        cv2.putText(result, f"FPS: {stats['fps']:.1f}", (10, 30),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        cv2.putText(result, f"Latency: {stats['avg_latency_ms']:.1f}ms", (10, 60),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
        
        display_frame = result
    else:
        display_frame = frame.copy()
        cv2.putText(display_frame, "No face detected", (10, 30),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
    
    # Save and display
    output_path = '/content/sber-swap/examples/results/realtime_frame.jpg'
    cv2.imwrite(output_path, display_frame)
    
    # Display result
    display(IPImage(output_path))
    
    # Print stats
    stats = monitor.get_stats()
    print(f"‚úÖ Frame processed! FPS: {stats['fps']:.1f} | "
          f"Latency: {stats['avg_latency_ms']:.1f}ms | "
          f"Detection: {stats['avg_detection_ms']:.1f}ms | "
          f"Generator: {stats['avg_generator_ms']:.1f}ms")
    
    print("\nüì∏ Click this cell again to capture another frame!")
    
except Exception as e:
    print(f"‚ùå Error: {e}")
    import traceback
    traceback.print_exc()


---

## Video Processing (Upload and Process Video File)

**What this does:**
- Upload a pre-recorded video file to Colab
- Process the entire video with face swapping
- Download the result video
- Better quality and faster than frame-by-frame processing

**Advantages over real-time:**
- ‚úÖ Only one upload (no repeated permissions)
- ‚úÖ Batch processing (faster overall)
- ‚úÖ Higher quality output
- ‚úÖ Can process longer videos


In [None]:
"""
VIDEO PROCESSING SECTION
=======================
This section allows you to upload a video file and process it with face swapping.

Process:
1. Upload video file (MP4, AVI, MOV, etc.)
2. Process entire video with face swapping (batch processing)
3. Save result video
4. Download or display result

This is better than real-time for:
- Higher quality processing
- No permission delays
- Batch processing (faster overall)
- Longer videos
"""

from google.colab import files
from IPython.display import display, Image as IPImage, Video, HTML
import os

# Create directory for videos
!mkdir -p /content/sber-swap/examples/my_videos

print("="*60)
print("üìπ Video Processing - Face Swapping")
print("="*60)
print("This will process a video file with face swapping.")
print("Upload your video file below.")
print("="*60)

# Upload video file
uploaded = files.upload()

video_file = None
for filename in uploaded.keys():
    video_file = f"/content/sber-swap/examples/my_videos/{filename}"
    # Move uploaded file
    import shutil
    shutil.move(filename, video_file)
    print(f"\n‚úÖ Video uploaded: {video_file}")
    print(f"üìä File size: {os.path.getsize(video_file) / (1024*1024):.2f} MB")
    break

if video_file is None:
    print("\n‚ö†Ô∏è  No video file uploaded. Please upload a video file.")
else:
    print(f"\nüîÑ Processing video: {video_file}")
    print("This may take a few minutes depending on video length...")
    
    # Process video with face swapping
    output_video = '/content/sber-swap/examples/my_videos/swapped_result.mp4'
    
    !python inference.py \
      --target_video "{video_file}" \
      --source_paths {source_path} \
      --out_video_name {output_video}
    
    # Check if processing succeeded
    if os.path.exists(output_video):
        print("\n" + "="*60)
        print("‚úÖ Video processing complete!")
        print("="*60)
        
        # Display video
        print("\nüìπ Processed Video:")
        display(Video(output_video, width=640))
        
        # Download option
        print("\nüíæ Download processed video:")
        files.download(output_video)
        
        print(f"\nüìÅ Result saved to: {output_video}")
    else:
        print("\n‚ùå Video processing failed. Check for errors above.")


### Video Processing with Custom Settings

Process video with different settings (quality vs speed trade-offs):


In [None]:
"""
VIDEO PROCESSING WITH CUSTOM SETTINGS
=====================================
Process video with optimized settings for speed or quality.

Settings:
- num_blocks: 1 (faster) or 2 (better quality)
- batch_size: Higher = faster but uses more GPU memory
- use_sr: True for super resolution (better quality, slower)
"""

# Video processing with custom parameters
video_path = '/content/sber-swap/examples/my_videos/your_video.mp4'  # Change to your video path
output_path = '/content/sber-swap/examples/my_videos/swapped_custom.mp4'

# Custom settings
num_blocks = 2  # 1=faster, 2=better quality, 3=best quality
batch_size = 40  # Higher = faster but more GPU memory
use_sr = False  # True for super resolution (slower but better quality)

print("="*60)
print("üìπ Processing video with custom settings")
print("="*60)
print(f"Input: {video_path}")
print(f"Output: {output_path}")
print(f"Generator blocks: {num_blocks}")
print(f"Batch size: {batch_size}")
print(f"Super resolution: {use_sr}")
print("="*60)

# Check if video exists
if not os.path.exists(video_path):
    print(f"\n‚ö†Ô∏è  Video not found: {video_path}")
    print("Please upload a video first using the cell above, or update video_path.")
else:
    # Run inference with custom settings
    !python inference.py \
      --target_video "{video_path}" \
      --source_paths {source_path} \
      --out_video_name {output_path} \
      --num_blocks {num_blocks} \
      --batch_size {batch_size} \
      --use_sr {use_sr}
    
    # Check result
    if os.path.exists(output_path):
        print("\n‚úÖ Processing complete!")
        display(Video(output_path, width=640))
        files.download(output_path)
    else:
        print("\n‚ùå Processing failed. Check for errors above.")


### Video Processing Comparison

Compare processing with different quality settings:


In [None]:
"""
VIDEO PROCESSING COMPARISON
===========================
Test different settings to find the best quality/speed trade-off.

This will process the same video with different configurations:
- Fast mode: 1 block, no super resolution (fastest)
- Balanced: 2 blocks, no super resolution (recommended)
- Quality mode: 2 blocks, with super resolution (slowest, best quality)
"""

import time

video_path = '/content/sber-swap/examples/my_videos/test_video.mp4'  # Update with your video

if not os.path.exists(video_path):
    print(f"‚ö†Ô∏è  Video not found: {video_path}")
    print("Upload a video first or update the path above.")
else:
    print("="*60)
    print("üß™ Testing Different Settings")
    print("="*60)
    
    configs = [
        {"name": "Fast Mode", "num_blocks": 1, "use_sr": False, "batch_size": 60},
        {"name": "Balanced (Recommended)", "num_blocks": 2, "use_sr": False, "batch_size": 40},
        {"name": "Quality Mode", "num_blocks": 2, "use_sr": True, "batch_size": 20},
    ]
    
    results = []
    
    for config in configs:
        print(f"\nüîÑ Testing: {config['name']}")
        output_path = f'/content/sber-swap/examples/my_videos/result_{config["name"].replace(" ", "_")}.mp4'
        
        start_time = time.time()
        
        !python inference.py \
          --target_video "{video_path}" \
          --source_paths {source_path} \
          --out_video_name {output_path} \
          --num_blocks {config["num_blocks"]} \
          --batch_size {config["batch_size"]} \
          --use_sr {config["use_sr"]}
        
        elapsed = time.time() - start_time
        
        if os.path.exists(output_path):
            file_size = os.path.getsize(output_path) / (1024*1024)
            results.append({
                'name': config['name'],
                'time': elapsed,
                'size_mb': file_size,
                'path': output_path
            })
            print(f"   ‚úÖ Complete in {elapsed:.1f}s | Size: {file_size:.2f}MB")
        else:
            print(f"   ‚ùå Failed")
    
    # Display comparison
    print("\n" + "="*60)
    print("üìä Comparison Results")
    print("="*60)
    print(f"{'Configuration':<25} {'Time (s)':<12} {'Size (MB)':<12}")
    print("-" * 60)
    for r in results:
        print(f"{r['name']:<25} {r['time']:<12.1f} {r['size_mb']:<12.2f}")
    print("="*60)
    
    # Display all results
    print("\nüìπ Results:")
    for r in results:
        print(f"\n{r['name']}:")
        display(Video(r['path'], width=400))
