# CrowdFace: Neural-Adaptive Crowd Segmentation with Contextual Pixel-Space Advertisement Integration

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/BlackBoyZeus/CrowdFace/blob/main/CrowdFace_Demo.ipynb)

This notebook demonstrates how to use CrowdFace for post-production video processing with contextual advertisement integration using GPU acceleration.

## Setup Environment

First, let's set up our environment and install the necessary dependencies.

In [None]:
# Check if running in Colab
import sys
import os
IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
    print("Running in Google Colab, installing dependencies...")
    !pip install torch torchvision opencv-python transformers diffusers huggingface_hub
    !git clone https://github.com/BlackBoyZeus/CrowdFace.git
    %cd CrowdFace
    
    # Set up Hugging Face access
    from google.colab import userdata
    try:
        hf_token = userdata.get('HF_TOKEN')
        if hf_token:
            print("Found Hugging Face token in Colab secrets!")
        else:
            hf_token = input("Please enter your Hugging Face token: ")
        os.environ["HUGGING_FACE_HUB_TOKEN"] = hf_token
    except:
        print("To access gated models, please provide your Hugging Face token when prompted")
        hf_token = input("Please enter your Hugging Face token: ")
        os.environ["HUGGING_FACE_HUB_TOKEN"] = hf_token
else:
    print("Running locally, make sure you have the required dependencies installed.")
    # For local execution, you can set your HF token here or via environment variables
    if "HUGGING_FACE_HUB_TOKEN" not in os.environ:
        hf_token = input("Please enter your Hugging Face token: ")
        os.environ["HUGGING_FACE_HUB_TOKEN"] = hf_token

## Check GPU Availability

Let's verify that we have GPU access for accelerated processing.

In [None]:
import torch

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name(0)}")
    print(f"CUDA version: {torch.version.cuda}")

## Download SAM2 and RVM Models

CrowdFace uses SAM2 (Segment Anything Model 2) for segmentation and RVM (Robust Video Matting) for high-quality alpha matte generation.

In [None]:
# Download and set up SAM2
from transformers import SamModel, SamProcessor
from huggingface_hub import login

# Login to Hugging Face
if "HUGGING_FACE_HUB_TOKEN" in os.environ and os.environ["HUGGING_FACE_HUB_TOKEN"]:
    login(token=os.environ["HUGGING_FACE_HUB_TOKEN"])
    print("Logged in to Hugging Face Hub")
    use_auth = True
else:
    print("No Hugging Face token provided, will try to download models without authentication")
    use_auth = False

# Load SAM2 model
model_id = "facebook/sam2-large"
try:
    if use_auth:
        sam_model = SamModel.from_pretrained(model_id, use_auth_token=os.environ["HUGGING_FACE_HUB_TOKEN"]).to("cuda" if torch.cuda.is_available() else "cpu")
        sam_processor = SamProcessor.from_pretrained(model_id, use_auth_token=os.environ["HUGGING_FACE_HUB_TOKEN"])
    else:
        sam_model = SamModel.from_pretrained(model_id).to("cuda" if torch.cuda.is_available() else "cpu")
        sam_processor = SamProcessor.from_pretrained(model_id)
    print("SAM2 model loaded successfully!")
except Exception as e:
    print(f"Error loading SAM2 model: {e}")
    print("Please make sure you have a valid Hugging Face token with access to this model.")
    print("You can get a token at https://huggingface.co/settings/tokens")

In [None]:
# Set up RVM for video matting
# Clone the official RVM repository
!git clone https://github.com/PeterL1n/RobustVideoMatting
%cd RobustVideoMatting

# Download the pre-trained model checkpoint (mobilenetv3)
!pip install -q gdown

# Try different model checkpoints if one fails
# First try the official mobilenetv3 checkpoint
!gdown https://drive.google.com/uc?id=1rbSTGKAE-MTxBYHd-51l2hMOQPT_7EPy -O rvm_mobilenetv3.pth

# Return to the main directory
%cd ..

# Import the model
import sys
sys.path.append('RobustVideoMatting')
from model import MattingNetwork

# Load RVM model with robust error handling
device = "cuda" if torch.cuda.is_available() else "cpu"
rvm_model = MattingNetwork('mobilenetv3').to(device)

# Try loading with strict=True first, then fall back to strict=False if needed
try:
    rvm_model.load_state_dict(torch.load("RobustVideoMatting/rvm_mobilenetv3.pth", map_location=device))
    print("RVM model loaded successfully with strict=True!")
except Exception as e:
    print(f"Error loading RVM model with strict=True: {e}")
    print("Attempting to load with strict=False to handle key mismatches...")
    try:
        rvm_model.load_state_dict(torch.load("RobustVideoMatting/rvm_mobilenetv3.pth", map_location=device), strict=False)
        print("RVM model loaded with strict=False. Some weights may not be loaded correctly.")
    except Exception as e2:
        print(f"Error loading RVM model even with strict=False: {e2}")
        print("Trying alternative model checkpoint...")
        
        # Try alternative checkpoint URL if available
        !gdown https://github.com/PeterL1n/RobustVideoMatting/releases/download/v1.0.0/rvm_mobilenetv3.pth -O RobustVideoMatting/rvm_mobilenetv3_alt.pth
        
        try:
            rvm_model.load_state_dict(torch.load("RobustVideoMatting/rvm_mobilenetv3_alt.pth", map_location=device), strict=False)
            print("RVM model loaded with alternative checkpoint and strict=False.")
        except Exception as e3:
            print(f"Failed to load RVM model with all attempts: {e3}")
            print("Proceeding with uninitialized model. Results may be poor.")

# Set model to evaluation mode
rvm_model.eval()
print("RVM model set to evaluation mode.")

## Upload Test Video

Upload a video file to process with CrowdFace.

In [None]:
from google.colab import files
import cv2
import numpy as np
import os

if IN_COLAB:
    print("Please upload a video file:")
    uploaded = files.upload()
    video_path = list(uploaded.keys())[0]
else:
    # For local testing, specify your video path
    video_path = "path/to/your/video.mp4"

# Display video information
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()

print(f"Video loaded: {video_path}")
print(f"Resolution: {width}x{height}")
print(f"FPS: {fps}")
print(f"Frame count: {frame_count}")
print(f"Duration: {frame_count/fps:.2f} seconds")

## Upload Advertisement Image

Upload an image to be used as an advertisement in the video.

In [None]:
if IN_COLAB:
    print("Please upload an advertisement image:")
    uploaded = files.upload()
    ad_path = list(uploaded.keys())[0]
else:
    # For local testing, specify your ad image path
    ad_path = "path/to/your/ad.png"

# Load and display the ad image
ad_image = cv2.imread(ad_path)
ad_image = cv2.cvtColor(ad_image, cv2.COLOR_BGR2RGB)

from IPython.display import display
from PIL import Image

print(f"Advertisement image loaded: {ad_path}")
print(f"Image size: {ad_image.shape[1]}x{ad_image.shape[0]}")
display(Image.fromarray(ad_image))

## Implement CrowdFace Pipeline

Now let's implement the core functionality of CrowdFace for video processing.

In [None]:
class CrowdFacePipeline:
    def __init__(self, sam_model, sam_processor, rvm_model):
        self.sam_model = sam_model
        self.sam_processor = sam_processor
        self.rvm_model = rvm_model
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        
    def segment_people(self, frame):
        """Segment people in the frame using SAM2"""
        # Convert BGR to RGB
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Process the image with SAM
        inputs = self.sam_processor(rgb_frame, return_tensors="pt").to(self.device)
        
        # Generate automatic masks
        with torch.no_grad():
            outputs = self.sam_model(**inputs, multimask_output=True)
        
        # Get the best mask
        masks = self.sam_processor.image_processor.post_process_masks(
            outputs.pred_masks.cpu(),
            inputs["original_sizes"].cpu(),
            inputs["reshaped_input_sizes"].cpu()
        )
        scores = outputs.iou_scores.cpu()
        
        # Filter masks to only include people (assuming higher scores for people)
        best_mask_idx = torch.argmax(scores)
        best_mask = masks[0][best_mask_idx].numpy()
        
        return best_mask
    
    def generate_matte(self, frame, prev_frame=None, prev_fgr=None, prev_pha=None, prev_state=None):
        """Generate alpha matte using RVM"""
        # Convert BGR to RGB
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Convert to tensor
        frame_tensor = torch.from_numpy(rgb_frame).float().permute(2, 0, 1).unsqueeze(0) / 255.0
        frame_tensor = frame_tensor.to(self.device)
        
        # Process with RVM - handle different return value formats
        with torch.no_grad():
            try:
                if prev_frame is None:
                    # Try the format that returns (fgr, pha, *rec)
                    outputs = self.rvm_model(frame_tensor, None, None)
                    if isinstance(outputs, tuple) and len(outputs) >= 3:
                        fgr, pha, *rec = outputs
                        state = rec[0] if rec else None
                    else:
                        # Handle case where model returns different format
                        fgr, pha, state = outputs[0], outputs[1], outputs[2] if len(outputs) > 2 else None
                else:
                    # Try with previous state
                    outputs = self.rvm_model(frame_tensor, prev_frame, prev_state)
                    if isinstance(outputs, tuple) and len(outputs) >= 3:
                        fgr, pha, *rec = outputs
                        state = rec[0] if rec else None
                    else:
                        # Handle case where model returns different format
                        fgr, pha, state = outputs[0], outputs[1], outputs[2] if len(outputs) > 2 else None
            except Exception as e:
                print(f"Error in RVM inference: {e}")
                # Provide fallback values if RVM fails
                h, w = frame.shape[:2]
                fgr = frame_tensor  # Use original frame as foreground
                pha = torch.ones((1, 1, h, w), device=self.device)  # Full alpha
                state = None
        
        # Convert back to numpy
        alpha_matte = pha[0].cpu().numpy().transpose(1, 2, 0)
        foreground = fgr[0].cpu().numpy().transpose(1, 2, 0)
        
        return alpha_matte, foreground, frame_tensor, fgr, pha, state
    
    def find_ad_placement(self, mask, min_size=1000):
        """Find suitable locations for ad placement"""
        # Convert mask to binary
        binary_mask = (mask > 0.5).astype(np.uint8)
        
        # Find contours in the mask
        contours, _ = cv2.findContours(binary_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        # Find the largest contour (assuming it's a person)
        if not contours:
            return None
        
        largest_contour = max(contours, key=cv2.contourArea)
        if cv2.contourArea(largest_contour) < min_size:
            return None
        
        # Get bounding box
        x, y, w, h = cv2.boundingRect(largest_contour)
        
        # Find a suitable location near the person
        # For simplicity, we'll place it to the right of the person
        ad_x = x + w + 20  # 20 pixels padding
        ad_y = y
        
        return (ad_x, ad_y)
    
    def place_ad(self, frame, ad_image, position, scale=0.5):
        """Place the ad in the frame at the specified position"""
        # Resize ad image
        h, w = ad_image.shape[:2]
        new_h, new_w = int(h * scale), int(w * scale)
        resized_ad = cv2.resize(ad_image, (new_w, new_h))
        
        # Create a copy of the frame
        result = frame.copy()
        
        # Get ad position
        x, y = position
        
        # Check if the ad fits within the frame
        frame_h, frame_w = frame.shape[:2]
        if x + new_w > frame_w:
            x = frame_w - new_w - 10
        if y + new_h > frame_h:
            y = frame_h - new_h - 10
        
        # Create a region of interest
        roi = result[y:y+new_h, x:x+new_w]
        
        # Create a mask for the ad
        if resized_ad.shape[2] == 4:  # If ad has alpha channel
            ad_mask = resized_ad[:, :, 3] / 255.0
            ad_rgb = resized_ad[:, :, :3]
            
            # Blend the ad with the frame
            for c in range(3):
                roi[:, :, c] = roi[:, :, c] * (1 - ad_mask) + ad_rgb[:, :, c] * ad_mask
        else:  # No alpha channel
            result[y:y+new_h, x:x+new_w] = resized_ad
        
        return result

In [None]:
    def process_video(self, video_path, ad_image, output_path, max_frames=None):
        """Process the entire video"""
        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        if max_frames is not None:
            frame_count = min(frame_count, max_frames)
        
        # Create video writer
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
        
        # Initialize variables for RVM
        prev_frame = None
        prev_fgr = None
        prev_pha = None
        prev_state = None
        
        # Process each frame
        frame_idx = 0
        ad_position = None
        
        from tqdm.notebook import tqdm
        for _ in tqdm(range(frame_count), desc="Processing video"):
            ret, frame = cap.read()
            if not ret:
                break
                
            # Every 30 frames, re-detect people and ad placement
            if frame_idx % 30 == 0:
                # Segment people
                mask = self.segment_people(frame)
                
                # Find ad placement
                ad_position = self.find_ad_placement(mask)
            
            # If we found a position for the ad, place it
            if ad_position is not None:
                frame = self.place_ad(frame, ad_image, ad_position)
            
            # Generate matte for the next frame
            if frame_idx % 5 == 0:  # Only update matte every 5 frames to save computation
                alpha_matte, foreground, curr_frame, curr_fgr, curr_pha, curr_state = self.generate_matte(
                    frame, prev_frame, prev_fgr, prev_pha, prev_state
                )
                
                # Update previous frame variables
                prev_frame = curr_frame
                prev_fgr = curr_fgr
                prev_pha = curr_pha
                prev_state = curr_state
            
            # Write the frame
            out.write(frame)
            frame_idx += 1
        
        # Release resources
        cap.release()
        out.release()
        
        return output_path

## Process Video with CrowdFace

Now let's process the uploaded video with our CrowdFace pipeline.

In [None]:
# Initialize the pipeline
pipeline = CrowdFacePipeline(sam_model, sam_processor, rvm_model)

# Set output path
output_path = "output_video.mp4"

# Process a subset of frames for demonstration (adjust max_frames as needed)
processed_video = pipeline.process_video(video_path, ad_image, output_path, max_frames=300)

print(f"Video processing complete! Output saved to {output_path}")

## Download Processed Video

Download the processed video with the advertisement integrated.

In [None]:
if IN_COLAB:
    from google.colab import files
    files.download(output_path)
else:
    print(f"Video saved locally at {output_path}")

## Display Video Preview

Let's display a preview of the processed video.

In [None]:
from IPython.display import HTML
from base64 import b64encode

def show_video(video_path):
    video_file = open(video_path, "rb")
    video_bytes = video_file.read()
    video_b64 = b64encode(video_bytes).decode()
    video_tag = f'<video width="640" height="360" controls><source src="data:video/mp4;base64,{video_b64}" type="video/mp4"></video>'
    return HTML(video_tag)

show_video(output_path)

## Conclusion

This notebook demonstrated how to use CrowdFace for neural-adaptive crowd segmentation with contextual pixel-space advertisement integration. The system uses state-of-the-art models:

1. **SAM2 (Segment Anything Model 2)** for precise crowd detection and segmentation
2. **RVM (Robust Video Matting)** for high-quality alpha matte generation
3. **BAGEL** (ByteDance Ad Generation and Embedding Library) concepts for intelligent ad placement

The pipeline leverages GPU acceleration for real-time processing and can be extended with additional features such as:

- Multiple advertisement placement
- Dynamic ad content based on scene context
- Improved tracking for consistent ad placement
- Custom ad styles and animations

For more information, visit the [CrowdFace GitHub repository](https://github.com/BlackBoyZeus/CrowdFace).