In [None]:
%pip install -U insightface onnxruntime-gpu  numpy fastai pillow

In [None]:
%pip install -U gdown moviepy ffmpeg insightface natsort

In [None]:
!sudo apt install ffmpeg

In [None]:
import gdown
import cv2
import os
from moviepy.editor import VideoFileClip,AudioFileClip
import glob
import os.path as osp
import shutil
import insightface
from insightface.app import FaceAnalysis
from insightface.data import get_image as ins_get_image

In [None]:

# Define variables
drive_file_id = '1ILta1DA26-gdNK9opGtZa6NaT2RuDj1_'  # replace 'FILE_ID' with your file's ID
video_output = 'my_video.mp4'
frame_folder = '/notebooks/frame_folder'
swapped_folder = '/notebooks/swapped'
character_folder = '/notebooks/character'


In [None]:
shutil.rmtree(frame_folder)
shutil.rmtree(swapped_folder)

In [None]:
assert insightface.__version__ >= '0.7'


# Create the frame folder if it doesn't exist
if not os.path.exists(frame_folder):
    os.makedirs(frame_folder)
# Create the frame folder if it doesn't exist
if not os.path.exists(swapped_folder):
    os.makedirs(swapped_folder)
# Your face swapping script
app = FaceAnalysis(name='buffalo_l')
app.prepare(ctx_id=0, det_size=(640, 640))
swapper = insightface.model_zoo.get_model('/notebooks/inswapper_128.onnx')

In [None]:


import numpy as np

def get_first_face(image):
    # If the image is a string (presumably a file path), read the image
    if isinstance(image, str):
        image = cv2.imread(image)

    # Check if the image is a valid numpy array
    if isinstance(image, np.ndarray):
        source_faces = app.get(image)
        source_faces = sorted(source_faces, key=lambda x: x.bbox[0])
        if len(source_faces) == 0:
            print(image)
            assert False

        return source_faces[0]
    print(image)
    assert False


In [None]:
import requests

url = "http://example.com/path_to_your_file"  # replace with your file's URL
response = requests.get(url)

with open(video_output, 'wb') as f:  # replace with the path where you want to save the file
    f.write(response.content)

In [None]:
# Download video from Google Drive
url = f'https://drive.google.com/uc?id={drive_file_id}'
gdown.download(url, video_output, quiet=False)

In [None]:
# Delete if swapped folder is not empty
if  os.path.exists(character_folder):
    shutil.rmtree(character_folder)

In [None]:
import cv2
import numpy as np
import insightface
from insightface.app import FaceAnalysis
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import KMeans
import pickle

# Create the frame folder if it doesn't exist
if not os.path.exists(character_folder):
    os.makedirs(character_folder)
# Initialize the FaceAnalysis application
app = FaceAnalysis(providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
app.prepare(ctx_id=0, det_size=(640, 640))  # Use GPU device 0 and input image size as (640, 640)

# Open the video file
cap = cv2.VideoCapture(video_output)
# Initialize a list to store embeddings and face data for each detected face
face_data = []
count = 0
# Loop through the video file frame by frame
while True:
    ret, frame = cap.read()
    if not ret:
        break
    path = os.path.join(frame_folder, f"{count}.jpg")
    cv2.imwrite(path, image)  # save frame as JPEG file
    count += 1
    # Use the FaceAnalysis application to detect faces in the frame
    faces = app.get(frame)

    # For each detected face, extract the embedding, bounding box, and face image, and add them to the list
    for face in faces:
        face_embedding = face.embedding
       
        bbox = face.bbox.astype(int)
        cropped_face = frame[bbox[1]:bbox[3], bbox[0]:bbox[2]]
        if cropped_face.size > 0:
            face_data.append({'face_embedding': face_embedding,'normed_embedding': face.normed_embedding, 'bbox': bbox, 'image': cropped_face})
            

# Save the face_data to disk
with open('face_data.pkl', 'wb') as f:
    pickle.dump(face_data, f)


In [None]:

def is_similar(face1, face2, threshold=0.5):
    # Extract normalized embeddings
    embedding1 = face1.normed_embedding
    embedding2 = face2.normed_embedding

    # Compute the dot product (cosine similarity, because embeddings are normalized)
    similarity = np.dot(embedding1, embedding2)

    # Return True if the similarity is above the threshold, False otherwise
    return similarity > threshold

In [None]:
from sklearn.cluster import DBSCAN
import pickle
if not os.path.exists(character_folder):
    os.makedirs(character_folder)
# Load face data
with open('face_data.pkl', 'rb') as f:
    face_data = pickle.load(f)

# Perform clustering
embeddings = [d['normed_embedding'] for d in face_data]
clustering = DBSCAN(eps=0.45,metric="cosine", min_samples=4).fit(embeddings)

# Create directories for each cluster (each representing a character)
labels = set(clustering.labels_)
for label in labels:
    if label == -1:
        continue  # Skip the noise
    directory_path = os.path.join(character_folder, str(label))
    if not os.path.exists(directory_path):
        os.makedirs(directory_path)


# Save faces into corresponding directories
for i, label in enumerate(clustering.labels_):
    if label == -1:
        continue  # Skip the noise
    directory_path = os.path.join(character_folder, str(label))
    face_image = face_data[i]['image']
    image_path = os.path.join(directory_path, f"{i}.jpg")

    cv2.imwrite(image_path, face_image)
    # Save embedding
    face_embedding = face_data[i]['normed_embedding']
    embedding_path = os.path.join(directory_path, f"{i}.npy")
    np.save(embedding_path, face_embedding)


In [None]:
import ipywidgets as widgets
from IPython.display import display, clear_output
from fastai.vision.all import *
from pathlib import Path
from PIL import Image as PilImage

# Load data
path = Path(character_folder)

# Only consider directories where the name is numeric
folders = [dir for dir in path.iterdir() if dir.is_dir() and dir.name.isnumeric()]

labels = [folder.name for folder in folders]

# Create a dictionary to map old labels (folders) to new labels (characters)
label_dict = {}

# Define a generator to process one folder at a time
def process_folders():
    for label in labels:
        # Get a list of all image files in the cluster
        image_paths = list((path / label).glob('*.jpg'))
        if not image_paths:
            print(f"No images found in cluster: {label}")
            continue
        print(f"Current cluster: {label}")

        # Get the resolution of each image
        image_resolution = {}
        for image_path in image_paths:
            with PilImage.open(image_path) as img:
                width, height = img.size
                resolution = width * height  # resolution is defined as width * height
                image_resolution[image_path] = resolution

        # Sort images by resolution in descending order
        sorted_image_paths = sorted(image_resolution, key=image_resolution.get, reverse=True)
        
        # Group images into rows for display, with four images per row
        rows = []
        for i in range(0, len(sorted_image_paths), 4):
            row = []
            for image_path in sorted_image_paths[i:i+4]:
                with open(image_path, "rb") as file:
                    image = file.read()
                    img_widget = widgets.Image(value=image, format='png', width=200, height=200)  # You might want to adjust the size
                    row.append(img_widget)
            rows.append(widgets.HBox(row))
        
        # Display the gallery
        display(widgets.VBox(rows))

        # Ask the user to input the name of the character
        name = widgets.Text(value='', placeholder='Enter character name', description='Name:')
        button = widgets.Button(description='Confirm')
        display(name, button)

        def on_button_clicked(b):
            # Update the label dictionary
            label_dict[label] = name.value
            print(f'Character name confirmed: {name.value}')
            
            # Rename the folder
            old_name = Path(path / label)
            new_name = Path(path / name.value)
            if new_name:  # Make sure the new_name exists
                if new_name.exists():  # If the target directory exists
                    # Copy all .jpg files in the current folder to the target folder
                    for file in old_name.glob('*.jpg'):
                        shutil.copy(file, new_name / file.name)
                    # Copy all .npy files (embeddings) in the current folder to the target folder
                    for file in old_name.glob('*.npy'):
                        shutil.copy(file, new_name / file.name)
                    # Delete the current folder
                    shutil.rmtree(old_name)
                else:
                    # If target directory does not exist, simply rename
                    old_name.rename(new_name)
            
            # Proceed to the next cluster
            clear_output(wait=True)
            next(process)

        button.on_click(on_button_clicked)
        
        # Yield control and wait for the next call
        yield

# Start the process
process = process_folders()
next(process)  # Display the first cluster



In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import insightface
from insightface.app import FaceAnalysis
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import LabelEncoder

data = []
for label in os.listdir(character_folder):
    for filename in os.listdir(os.path.join(character_folder, label)):
        if filename.endswith('.npy'):  # check if the file is an image
            # read the image
            img_path = os.path.join(character_folder, label, filename)
            embedding = np.load(img_path)
            data.append({'embedding': embedding, 'label': label})
                

# create a dataframe
df = pd.DataFrame(data)
df.sample(1)
# Separate features and target
X = np.array(df['embedding'].to_list()) # Convert list of embeddings back to numpy array
y = df['label'].values

# Encode labels
le = LabelEncoder()
y_encoded = le.fit_transform(y)

# Split data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X, y_encoded, test_size=0.2, random_state=42)

# Define the model
knn = KNeighborsClassifier(n_neighbors=3)

# Train the model
knn.fit(X_train, y_train)

# Validate the model
accuracy = knn.score(X_val, y_val)
print(f'Validation accuracy: {accuracy}')

In [None]:
""" import pickle

# Open the pickle file and load the face data
with open('face_data.pkl', 'rb') as f:
    face_data = pickle.load(f)

# Now you can use the loaded face data. For example, you can extract all embeddings:
embeddings = np.array([data['face_embedding'] for data in face_data])


# Determine the optimal number of clusters, you might want to adjust this
n_clusters = 20

# Perform KMeans clustering
kmeans = KMeans(n_clusters=n_clusters).fit(embeddings)

# For each cluster center, find the closest face (in terms of Euclidean distance in the embedding space)
# and save it as a representative of the cluster (i.e., the character)
character_index= 0
previous = []
for i, center in enumerate(kmeans.cluster_centers_):
    distances = np.linalg.norm(embeddings - center, axis=1)
    closest_face_index = np.argmin(distances)
    bbox = face_data[closest_face_index]["bbox"]
    cropped_face = face_data[closest_face_index]["image"]
    found = False
    for index,j in enumerate(previous):
        similarity = np.dot(face_data[closest_face_index]["normed_embedding"], j)
        print(i, index,similarity)
        if similarity > 0.35:
            print("found same face")
            found = True
            break
    if not found:
        cv2.imwrite(f'{character_folder}/character_{character_index}.jpg',cropped_face)   
        previous.append(face_data[closest_face_index]["normed_embedding"])
        character_index += 1
from sklearn.neighbors import NearestNeighbors

# The input for NearestNeighbors will be the cluster centers.
# These act as the "labels" for each cluster.
nn_model = NearestNeighbors(n_neighbors=3, metric='cosine')
nn_model.fit(kmeans.cluster_centers_) """
def classify_face(embedding):
    predicted_label = knn.predict([embedding])
    predicted_label_str = le.inverse_transform(predicted_label)
    return predicted_label_str[0]


In [None]:

# Extract frames from video
vidcap = cv2.VideoCapture(video_output)
success, image = vidcap.read()
count = 0
target_imgs = []  # a list of file paths to images of the target faces
while success:
    path = os.path.join(frame_folder, f"{count}.jpg")
    target_imgs.append(path)
    cv2.imwrite(path, image)  # save frame as JPEG file
    success, image = vidcap.read()
    count += 1

In [None]:
# Delete if swapped folder is not empty
if  os.path.exists(swapped_folder):
    shutil.rmtree(swapped_folder)

In [None]:
# Create the frame folder if it doesn't exist
if not os.path.exists(swapped_folder):
    os.makedirs(swapped_folder)

In [None]:
import random
import natsort

swappings ={
   'm':get_first_face('/notebooks/1629085462605.jpeg'),
   'b':get_first_face('/notebooks/bcp.png'),
   'd':get_first_face('/notebooks/DSCAAZZEA.png'),
   'l2':get_first_face('/notebooks/2023-07-03 22.46.08.jpg'),
   'la':get_first_face('/notebooks/2023-07-03 22.47.10.jpg'),
   'jo':get_first_face('/notebooks/jocp.png'),
   'g':get_first_face('/notebooks/gacp.png'),
   'je':get_first_face('/notebooks/2023-07-03 22.47.24.jpg')
}
targets={

}

source_face = get_first_face('/notebooks/DSC06729.JPG')
image_files = glob.glob(os.path.join(frame_folder, '*.jpg'))  # get all jpg files in the frame folder
# Sort the file names using natural sorting
image_files = natsort.natsorted(image_files)
print(len(image_files))
# Initialize an empty dictionary to store the attribution of faces to people
attribution = {}
unused_keys = list(swappings.keys())
for key in list(targets.keys()):
    value = targets[key]
    if value is not None:
        unused_keys.remove(value)
# Process each image and save the result
for img_file in image_files:
    img = cv2.imread(img_file)
    faces = app.get(img)
    faces = sorted(faces, key=lambda x: x.bbox[0])
    res = img.copy()
    for face in faces:
        found = False
        index = classify_face(face.normed_embedding)
        if index in targets:
            found = True
            value = targets[index]
            if value is not None:
                res = swapper.get(res, face, swappings[value], paste_back=True)
                break        
        if not found:
            """ # Check if the face is already attributed to a person
            index = classify_face(face.normed_embedding) """
            if index in attribution:
                found = True
                res = swapper.get(res, face, swappings[attribution[index]], paste_back=True)
            else:
                print("index not found :" ,index)
        if not found:
            if len(unused_keys) == 0:  # If all faces have been used, reset the list
                unused_keys = list(swappings.keys())
                for key in list(targets.keys()):
                    if key in unused_keys:
                        unused_keys.remove(key)
            random_key = random.choice(unused_keys)
            unused_keys.remove(random_key)
            """ index = classify_face(face.normed_embedding) """
            attribution[index] = random_key
            print("index ",index," attribute to ",random_key, " frame ", osp.basename(img_file))
            random_value = swappings[random_key]
            res = swapper.get(res, face, random_value, paste_back=True)
    cv2.imwrite(osp.join(swapped_folder, osp.basename(img_file)), res)



In [None]:
import subprocess

output_video = "/notebooks/output.mp4"
video = cv2.VideoCapture(video_output)
fps = video.get(cv2.CAP_PROP_FPS)
video.release()

# Get the processed images
processed_images = glob.glob(os.path.join(swapped_folder, '*.jpg'))

# Sort the processed images (this may be necessary depending on how your files are named)
processed_images.sort()
print(len(processed_images))
# Initialize the video writer
height, width, _ = cv2.imread(processed_images[0]).shape
print(height, width)

# Define the command
command = f'ffmpeg -y -r {fps} -s {width}x{height} -i {swapped_folder}/%01d.jpg -vcodec libx264 -crf 25 -pix_fmt yuv420p {output_video}'

# Execute the command
subprocess.call(command, shell=True)

In [None]:
import subprocess


temp_audio_file = '/notebooks/temp_audio.aac'

# Remove temporary audio file if it exists
if os.path.exists(temp_audio_file):
    os.remove(temp_audio_file)

# Remove output video with audio file if it exists
output_with_audio_file = '/notebooks/output_with_audio.mp4'
if os.path.exists(output_with_audio_file):
    os.remove(output_with_audio_file)

# Extract audio from original video and save it as a temporary audio file
audio_extraction_command = f'ffmpeg -y -i {video_output} -vn -acodec aac -strict -2 {temp_audio_file}'
subprocess.run(audio_extraction_command, shell=True)

# Combine swapped video with original audio
video_combination_command = f'ffmpeg -y -i {output_video} -i {temp_audio_file} -c:v copy -c:a copy -map 0:v:0 -map 1:a:0 {output_with_audio_file}'
subprocess.run(video_combination_command, shell=True)

In [None]:
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip

# path to the video file
video_path = output_with_audio_file

# specify the start and end times in seconds
# start time as (minutes, seconds)
start_time_min_sec = (0, 0)  # 2 minutes 30 seconds
start_time = start_time_min_sec[0]*60 + start_time_min_sec[1]

# end time as (minutes, seconds)
end_time_min_sec = (1, 40)  # 3 minutes 45 seconds
end_time = end_time_min_sec[0]*60 + end_time_min_sec[1]

# output file path
output_path = "/notebooks/split.mp4"

# extract subclip
ffmpeg_extract_subclip(video_path, start_time, end_time, targetname=output_path)