# Environment Setup


In [None]:
!pip uninstall -y torch torchvision torchaudio
!pip install torch==2.0.0 torchvision==0.15.1 torchaudio==2.0.0
!pip uninstall -y numpy
!pip install "numpy<2"
!pip install onnxruntime
!pip uninstall -y mediapipe
!pip install mediapipe==0.10.5
!pip install face_recognition opencv-python dlib
!pip install basicsr facexlib
!pip install insightface


# Converts Video into Frames


In [None]:
import cv2
import os

# Create output folder
os.makedirs("/content/frames", exist_ok=True)

# Open the video
video_path = "/content/video.mp4"  # <-- Change to your video path
cap = cv2.VideoCapture(video_path)

# Check rotation
rotate_code = None
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
if width > height:
    rotate_code = cv2.ROTATE_90_CLOCKWISE

frame_idx = 0

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Fix rotation if needed
    if rotate_code is not None:
        frame = cv2.rotate(frame, rotate_code)

    # Save the frame as a JPG image
    frame_filename = f"/content/frames/frame_{frame_idx:04d}.jpg"
    cv2.imwrite(frame_filename, frame)

    frame_idx += 1

cap.release();
print(frame_idx);
print("Done splitting video into frames.")


# Generate Landmarks (txt format, x,y)

In [None]:
import cv2
import mediapipe as mp
import os

# Paths
frames_folder = "/content/frames"  # ← change to your frames folder
landmarks_folder = "/content/landmarks_output"  # ← output folder for .txt files
os.makedirs(landmarks_folder, exist_ok=True)

# MediaPipe setup
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1, refine_landmarks=True)

# Loop over each frame
for filename in sorted(os.listdir(frames_folder)):
    if not filename.lower().endswith((".jpg", ".png")):
        continue

    img_path = os.path.join(frames_folder, filename)
    image = cv2.imread(img_path)
    if image is None:
        print(f"Failed to read image: {filename}")
        continue

    # Convert BGR to RGB
    rgb_img = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    results = face_mesh.process(rgb_img)

    if results.multi_face_landmarks:
        face_landmarks = results.multi_face_landmarks[0]
        h, w = image.shape[:2]

        # Create a .txt file for each frame
        txt_filename = os.path.splitext(filename)[0] + ".txt"
        txt_path = os.path.join(landmarks_folder, txt_filename)

        with open(txt_path, "w") as f:
            for landmark in face_landmarks.landmark:
                x = int(landmark.x * w)
                y = int(landmark.y * h)
                f.write(f"{x},{y}\n")

        print(f"Saved landmarks for: {filename}")
    else:
        print(f"No face detected in: {filename}")

print("✅ Landmark extraction complete.")


# OR Upload landmarks

In [None]:
from google.colab import files
import zipfile
import os

# Upload the zip file
uploaded = files.upload()

# Extract the uploaded zip file
zip_file = list(uploaded.keys())[0]
output_dir = '/content/'  # Folder where you want to extract

with zipfile.ZipFile(zip_file, 'r') as zip_ref:
    zip_ref.extractall(output_dir)

print(f"Files extracted to {output_dir}")


# Check if landmarks are fine

In [None]:
import cv2
import numpy as np
import os

folder_path = '/content/landmarks'  # Replace with your folder path
output_path = '/content/output_images'  # Where to save visualizations
canvas_size = (512, 512)  # Width, Height
radius = 0  # Dot radius
draw_connections = False  # Set True to draw mesh lines

# (Optional) Define connections based on MediaPipe's FaceMesh (partial list for demo)
FACEMESH_CONNECTIONS = [
    (10, 338), (338, 297), (297, 332), (332, 284), (284, 251),
    # Add more connections if needed...
]

# === FUNCTIONS ===
def load_landmarks(filepath):
    with open(filepath, 'r') as f:
        return [tuple(map(float, line.strip().split(','))) for line in f.readlines()]

def draw_landmarks(landmarks, size=(512, 512), radius=1, connections=False):
    canvas = np.zeros((size[1], size[0], 3), dtype=np.uint8)
    for x, y in landmarks:
        cx, cy = int(x), int(y)
        cv2.circle(canvas, (cx, cy), radius, (0, 255, 0), -1)

    if connections:
        for i, j in FACEMESH_CONNECTIONS:
            pt1 = tuple(map(int, landmarks[i]))
            pt2 = tuple(map(int, landmarks[j]))
            cv2.line(canvas, pt1, pt2, (255, 0, 0), 1)

    return canvas

# === MAIN ===
os.makedirs(output_path, exist_ok=True)
landmark_files = sorted([f for f in os.listdir(folder_path) if f.endswith('.txt')])

for idx, filename in enumerate(landmark_files):
    path = os.path.join(folder_path, filename)
    landmarks = load_landmarks(path)
    image = draw_landmarks(landmarks, size=canvas_size, radius=radius, connections=draw_connections)

    output_filename = os.path.join(output_path, f"frame_{idx:04d}.png")
    cv2.imwrite(output_filename, image)
    print(f"Saved: {output_filename}")


# Batch Reconstruction using GFPGAN

In [None]:
!git clone https://github.com/TencentARC/GFPGAN.git
%cd GFPGAN


In [None]:
import os
import cv2
import pandas as pd
import numpy as np
from gfpgan import GFPGANer
from basicsr.utils.download_util import load_file_from_url
from insightface.app import FaceAnalysis
from insightface.utils import face_align
from IPython.display import Image, display

# 1️⃣ Load GFPGAN Model
model = load_file_from_url(
    'https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.3.pth',
    model_dir='gfpgan/weights', progress=True)

restorer = GFPGANer(model_path=model,
                    upscale=1, arch='clean',
                    channel_multiplier=2, bg_upsampler=None)

# 2️⃣ Set Paths for Input Folders
image_folder = '/content/frames'  # Folder with image frames
landmark_folder = '/content/landmarks'  # Folder with txt landmark files

# Create output folders
os.makedirs('/content/restored_images', exist_ok=True)

# 3️⃣ Align & Crop Function (for one image)
def align_and_crop(image_path, landmarks_path):
    img = cv2.imread(image_path)
    lm = pd.read_csv(landmarks_path, header=None)  # read raw txt format
    lm.columns = ['X', 'Y']  # manually assign column names
    lm = lm[['X', 'Y']].values


    # Align & crop based on landmarks
    h, w = img.shape[:2]
    pts = (lm * [w, h]).astype(int)
    x0, y0 = np.clip(pts.min(0) - 20, 0, [w, h])
    x1, y1 = np.clip(pts.max(0) + 20, 0, [w, h])
    crop = img[y0:y1, x0:x1]

    return crop, img, lm

# Process All Images in the Folder
for filename in os.listdir(image_folder):
    if filename.endswith('.jpg') or filename.endswith('.png'):
        image_path = os.path.join(image_folder, filename)
        base_name = os.path.splitext(filename)[0]
        landmark_filename = f"{base_name}_face0_landmarks.txt"

        landmark_path = os.path.join(landmark_folder, landmark_filename)

        if not os.path.exists(landmark_path):
            print(f"Warning: No landmarks found for {filename}. Skipping.")
            continue

        # Align and crop face
        crop, full_img, lm = align_and_crop(image_path, landmark_path)

        # GFPGAN Enhancement
        out = restorer.enhance(full_img, has_aligned=False, only_center_face=True)
        if len(out) == 3:
            _, faces, restored_img = out
        else:
            faces, restored_img = out

        # Save restored image
        restored_filename = os.path.join('restored_images', f"restored_{filename}")
        cv2.imwrite(restored_filename, restored_img)

        # Reload landmarks for drawing on restored image
        h, w = restored_img.shape[:2]
        pts = (lm * [w, h]).astype(int)

        # Draw landmarks on restored image
        for (x, y) in pts:
            cv2.circle(restored_img, (x, y), 1, (0, 255, 0), -1)  # Green dots

        # Save and display the image with landmarks
        restored_with_landmarks_filename = os.path.join('restored_images', f"restored_with_landmarks_{filename}")
        cv2.imwrite(restored_with_landmarks_filename, restored_img)

        # Display the image (optional for debugging)
        display(Image(restored_with_landmarks_filename))

print("Batch reconstruction complete!")


# Convert Reconstructed Frames into Video

In [None]:
import cv2
import os

# Set the path to the folder containing the processed frames
frames_folder = "/content/GFPGAN/restored_images"  # Path to your folder of frames
output_video_path = "/content/reconstructed_video.mp4"  # Name of the output video file

# Get all the image files in the frames folder and sort them numerically
frame_files = sorted([f for f in os.listdir(frames_folder) if f.endswith(".jpg") and "landmarks" not in f], key=lambda x: int(x.split("_")[2].split(".")[0]))

# Read the first frame to get the video frame size (width, height)
first_frame = cv2.imread(os.path.join(frames_folder, frame_files[0]))
height, width, _ = first_frame.shape

# Define the video writer object
fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # You can change 'mp4v' to another codec if needed
fps = 30  # Set the frames per second of the output video

out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

# Loop through all the frame files and write them to the video
for frame_file in frame_files:
    frame_path = os.path.join(frames_folder, frame_file)
    frame = cv2.imread(frame_path)

    # Write the frame to the video
    out.write(frame)

# Release the video writer and finalize the video
out.release()

print(f"Video successfully saved as {output_video_path}")


# Face Swap (Optional - using Delauney's Triangulation and warping)



In [None]:
!pip install face_recognition opencv-python dlib

In [None]:
import cv2
import numpy as np
import face_recognition
import os

def extract_face_landmarks(image):
    # Returns landmarks dict for first face detected
    face_landmarks_list = face_recognition.face_landmarks(image)
    if len(face_landmarks_list) == 0:
        return None
    return face_landmarks_list[0]

def create_face_mask(image, landmarks):
    mask = np.zeros(image.shape[:2], dtype=np.uint8)
    points = []
    for feature in landmarks.values():
        points += feature
    points = np.array(points, dtype=np.int32)
    cv2.fillConvexPoly(mask, cv2.convexHull(points), 255)
    return mask

def warp_face(src_face, src_points, dst_points, dst_shape):
    src_points = np.array(src_points, dtype=np.float32)
    dst_points = np.array(dst_points, dtype=np.float32)
    # Compute affine transform
    M, _ = cv2.estimateAffinePartial2D(src_points, dst_points)
    warped = cv2.warpAffine(src_face, M, (dst_shape[1], dst_shape[0]), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT_101)
    return warped

def swap_face(src_img, dst_img):
    src_landmarks = extract_face_landmarks(src_img)
    dst_landmarks = extract_face_landmarks(dst_img)

    if src_landmarks is None or dst_landmarks is None:
        print("Face not detected in one of the images. Skipping frame.")
        return dst_img

    # Create face masks
    src_mask = create_face_mask(src_img, src_landmarks)
    dst_mask = create_face_mask(dst_img, dst_landmarks)

    # Get points for warp (using chin, eyes, nose, mouth approx)
    src_points = []
    dst_points = []
    for feature in ['chin', 'left_eye', 'right_eye', 'nose_bridge', 'top_lip', 'bottom_lip']:
        src_points += src_landmarks[feature]
        dst_points += dst_landmarks[feature]

    # Warp source face to destination shape
    warped_src_face = warp_face(src_img, src_points, dst_points, dst_img.shape)

    # Combine warped face into destination image using masks
    # Mask of warped face on destination
    warped_mask = warp_face(src_mask, src_points, dst_points, dst_img.shape)
    warped_mask = cv2.merge([warped_mask, warped_mask, warped_mask])

    # Blend faces
    warped_mask = warped_mask / 255.0
    output = dst_img * (1 - warped_mask) + warped_src_face * warped_mask
    output = output.astype(np.uint8)

    return output

src_img_path = "/content/OG.jpg" # <-- Change to your source image path
dst_folder = "/content/frames" # <-- Change to your dst image folder
output_folder = "/content/swapped_frames"
os.makedirs(output_folder, exist_ok=True)

# Load source image once
src_img = cv2.imread(src_img_path)
if src_img is None:
    raise ValueError(f"Could not load source image: {src_img_path}")

# Extract landmarks for source image once (optional optimization)
src_landmarks = extract_face_landmarks(src_img)
if src_landmarks is None:
    raise ValueError("No face detected in source image!")

# Get destination frames list
dst_files = sorted([f for f in os.listdir(dst_folder) if f.endswith(('.png', '.jpg'))])

for f in dst_files:
    dst_img_path = os.path.join(dst_folder, f)
    dst_img = cv2.imread(dst_img_path)
    if dst_img is None:
        print(f"Skipping {f}: failed to load destination image.")
        continue

    # Modify swap_face to optionally accept precomputed src_landmarks to speed up
    swapped = swap_face(src_img, dst_img)

    output_path = os.path.join(output_folder, f)
    cv2.imwrite(output_path, swapped)
    print(f"Swapped and saved frame: {f}")

print("Batch face swap with single source image complete!")