In [28]:
import os
import cv2
import shutil
import base64
import requests
import numpy as np
import mediapipe as mp
os.makedirs("_output", exist_ok=True)

In [29]:
def expand_polygon(ls_keypoints, scale_factor):
    # Convert keypoints to a NumPy array
    points = np.array(ls_keypoints, dtype=np.float32)
    # Calculate the centroid of the polygon
    centroid = np.mean(points, axis=0)
    # Translate points to origin (subtract centroid), scale, and translate back
    expanded_points = centroid + (points - centroid) * scale_factor
    # Convert back to integer coordinates for OpenCV
    expanded_points = expanded_points.astype(np.int32)
    return expanded_points.tolist()

def get_keypoints(img_path, img_debug_path, expand_scale_factor):
    # Read image
    img_raw = cv2.imread(img_path)
    img_rgb = cv2.cvtColor(img_raw, cv2.COLOR_BGR2RGB)
    h, w, _ = img_raw.shape
    # MediaPipe: Extract face landmarks
    with mp.solutions.face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1, refine_landmarks=True) as face_mesh:
        results = face_mesh.process(img_rgb)
        if results.multi_face_landmarks:
            for facelandmarks in results.multi_face_landmarks:
                # Landmark indices
                ls_indices = [10,338,297,332,284,251,389,356,454,323,361,288,397,365,379,378,400,377,152,148,176,149,150,136,172,58,132,93,234,127,162,21,54,103,67,109]
                # Get coordinates
                ls_keypoints = []
                for idx in ls_indices:
                    ls_keypoints.append(
                        (int(facelandmarks.landmark[idx].x * w), int(facelandmarks.landmark[idx].y * h)) # A keypoint (x,y)
                    )
                # Expand polygon
                ls_keypoints_expanded = expand_polygon(ls_keypoints, expand_scale_factor)
                # Draw points
                cv2.polylines(img_raw, [np.array(ls_keypoints, dtype=np.int32)], isClosed=True, color=(0, 0, 255), thickness=5)
                cv2.polylines(img_raw, [np.array(ls_keypoints_expanded, dtype=np.int32)], isClosed=True, color=(0, 255, 0), thickness=5)
                for keypoint in ls_keypoints:
                    cv2.circle(img_raw, keypoint, 5, (0, 255, 0), -1)
                cv2.imwrite(img_debug_path, img_raw)
                # Return
                return ls_keypoints_expanded, h, w
    return None # No faces detected

def create_mask(ls_keypoints, img_h, img_w, img_mask_path):
    # Convert to numpy array (required format for OpenCV fillPoly)
    ls_keypoints = np.array(ls_keypoints, np.int32)
    ls_keypoints = ls_keypoints.reshape((-1, 1, 2)) # shape (n,1,2)
    # Create a blank black image (size depends on your image)
    img_mask = np.zeros((img_h, img_w, 3), dtype=np.uint8)
    # Fill the polygon with white
    cv2.fillPoly(img_mask, [ls_keypoints], (255, 255, 255))
    cv2.imwrite(img_mask_path, img_mask)

def imagen_head_replace(img_path, msk_path, out_path, sd_params, instance_url):

    with open(img_path, "rb") as f_img, open(msk_path, "rb") as f_msk:
        img_base64 = base64.b64encode(f_img.read()).decode('utf-8')
        msk_base64 = base64.b64encode(f_msk.read()).decode('utf-8')
    img_h, img_w, _ = cv2.imread(img_path).shape

    payload = {
        "prompt": sd_params["prompt"],
        "negative_prompt": "",
        "styles": [],
        "seed": -1,
        "subseed": -1,
        "subseed_strength": 0,
        "seed_resize_from_h": -1,
        "seed_resize_from_w": -1,
        "sampler_name": sd_params["sampler"],
        "scheduler": sd_params["scheduler"],
        "batch_size": 1,
        "n_iter": 1,
        "steps": sd_params["steps"],
        "cfg_scale": sd_params["cfg"],
        "distilled_cfg_scale": 3.5,
        "width": img_w,
        "height": img_h,
        "restore_faces": False,
        "tiling": False,
        "do_not_save_samples": False,
        "do_not_save_grid": False,
        "eta": 0,
        "denoising_strength": sd_params["denoising"],
        "s_min_uncond": 0.0,
        "s_churn": 0.0,
        "s_tmax": None,
        "s_tmin": 0.0,
        "s_noise": 1.0,
        "override_settings": {"sd_model_checkpoint": sd_params["model"], "CLIP_stop_at_last_layers": 1},
        "override_settings_restore_afterwards": False,
        "refiner_checkpoint": None,
        "refiner_switch_at": 0,
        "disable_extra_networks": False,
        "firstpass_image": None,
        "comments": {},
        "init_images": [img_base64],
        "resize_mode": 0,
        "image_cfg_scale": 1.5,
        "mask": msk_base64,
        "mask_blur_x": 4,
        "mask_blur_y": 4,
        "mask_blur": 4,
        "mask_round": True,
        "inpainting_fill": 1,
        "inpaint_full_res": 0,
        "inpaint_full_res_padding": 32,
        "inpainting_mask_invert": 0,
        "initial_noise_multiplier": 1.0,
        "latent_mask": None,
        "force_task_id": None,
        "hr_distilled_cfg": 3.5,
        "sampler_index": "Euler",
        "include_init_images": False,
        "script_name": None,
        "script_args": [],
        "send_images": True,
        "save_images": False,
        "alwayson_scripts": {},
        "infotext": None
    }

    def make_request_to_forge_api(payload, savepath, instance_url):
        try:
            with requests.post(url=instance_url, json=payload) as req:
                with open(savepath, "wb") as f:
                    f.write(base64.b64decode(req.json()["images"][0]))
        except Exception as e:
            print(f"⚠️ Error > {e}")
            print(f"{req.json()}")
    make_request_to_forge_api(payload, out_path, instance_url+"/sdapi/v1/img2img")

def upscale_image(img_path, out_path, instance_url):

    with open(img_path, "rb") as f_img:
        img_base64 = base64.b64encode(f_img.read()).decode('utf-8')
    img_h, img_w, _ = cv2.imread(img_path).shape

    payload = {
        "resize_mode": 0,
        "show_extras_results": False,
        "gfpgan_visibility": 0.5,
        "codeformer_visibility": 0,
        "codeformer_weight": 0,
        "upscaling_resize": 2,
        # "upscaling_resize_w": img_w * 4,
        # "upscaling_resize_h": img_h * 4,
        "upscaling_crop": False,
        "upscaler_1": "R-ESRGAN 4x+",
        "upscaler_2": "None",
        "extras_upscaler_2_visibility": 0,
        "upscale_first": False,
        "image": img_base64
    }

    def make_request_to_forge_api(payload, savepath, instance_url):
        try:
            with requests.post(url=instance_url, json=payload) as req:
                with open(savepath, "wb") as f:
                    f.write(base64.b64decode(req.json()["image"]))
        except Exception as e:
            print(f"⚠️ Error > {e}")
            print(f"{req.json()}")
    make_request_to_forge_api(payload, out_path, instance_url+"/sdapi/v1/extra-single-image")

def resize_image(img_path, out_path):

    img = cv2.imread(img_path)
    h, w = img.shape[:2]

    new_width = 1080
    new_height = int(h * new_width / w)

    img_resized = cv2.resize(img, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4)
    cv2.imwrite(out_path, img_resized)

In [None]:
# Variables
img_path = "_test/img1.jpg"
instance_url = "https://2kh5pnf1qsokxx-1234.proxy.runpod.net"
# Constants
ups_path = "_output/img_1_upscaled.jpg"
rsz_path = "_output/img_2_resized.jpg"
kyp_path = "_output/img_3_keypoints.jpg"
msk_path = "_output/img_4_mask.jpg"
out_path = "_output/img_5_output.jpg"

sd_params = {
    "prompt": "best quality, masterpiece, ultra high res, photorealistic, 1girl, <lora:LORA_KoreaDL:0.5>",
    "cfg": 7,
    "denoising": 0.7,
    "steps": 20,
    "sampler": "Euler a",
    "scheduler": "Automatic",
    "model": "MJX_V07",
}

shutil.copy2(img_path, f"_output/img_0_raw{os.path.splitext(img_path)[1]}")
upscale_image(img_path, ups_path, instance_url)
resize_image(ups_path, rsz_path)
ls_keypoints, img_h, img_w = get_keypoints(rsz_path, kyp_path, expand_scale_factor=1.5)
create_mask(ls_keypoints, img_h, img_w, msk_path)
imagen_head_replace(rsz_path, msk_path, out_path, sd_params, instance_url)