# Pipeline for locally accessing Inpainting API by stability AI.

## 0. Install Required Libraries

In [None]:
!pip install -q pillow requests ipython

## 1. Import Required Libraries

In [7]:
import os
from PIL import Image
import requests
import time

import getpass
STABILITY_KEY = getpass.getpass('Enter your API Key: ')
# STABILITY_KEY = os.environ['STABILITY_KEY']

## 2. Define Helper Functions

In [8]:
def send_generation_request(host, params):
    headers = {
        "Accept": "image/*",
        "Authorization": f"Bearer {STABILITY_KEY}"
    }

    # Encode parameters
    files = {}
    image = params.pop("image", None)
    mask = params.pop("mask", None)
    if image:
        files["image"] = open(image, 'rb')
    if mask:
        files["mask"] = open(mask, 'rb')
    if not files:
        files["none"] = ''

    # Send request
    print(f"Sending REST request to {host}...")
    response = requests.post(
        host,
        headers=headers,
        files=files,
        data=params
    )
    if not response.ok:
        raise Exception(f"HTTP {response.status_code}: {response.text}")

    return response

def send_async_generation_request(host, params):
    headers = {
        "Accept": "application/json",
        "Authorization": f"Bearer {STABILITY_KEY}"
    }

    # Encode parameters
    files = {}
    if "image" in params:
        image = params.pop("image")
        files = {"image": open(image, 'rb')}

    # Send request
    print(f"Sending REST request to {host}...")
    response = requests.post(
        host,
        headers=headers,
        files=files,
        data=params
    )
    if not response.ok:
        raise Exception(f"HTTP {response.status_code}: {response.text}")

    # Process async response
    response_dict = response.json()
    generation_id = response_dict.get("id")
    if not generation_id:
        raise Exception("Expected 'id' in response")

    # Loop until result or timeout
    timeout = int(os.getenv("WORKER_TIMEOUT", 500))
    start = time.time()
    status_code = 202
    while status_code == 202:
        response = requests.get(
            f"{host}/result/{generation_id}",
            headers={
                **headers,
                "Accept": "image/*"
            },
        )

        if not response.ok:
            raise Exception(f"HTTP {response.status_code}: {response.text}")
        status_code = response.status_code
        time.sleep(10)
        if time.time() - start > timeout:
            raise Exception(f"Timeout after {timeout} seconds")

    return response


## 3. Perform Batch Inpainting

In [10]:
# Resize all masks to fit the corresponding images
def resize_masks_to_images(image_folder, mask_folder):
    """
    Resize all masks in the mask_folder to match the corresponding image dimensions in image_folder.
    The resized masks will overwrite the original masks.
    """
    for image_file in os.listdir(image_folder):
        image_path = os.path.join(image_folder, image_file)
        mask_name = os.path.splitext(image_file)[0] + "_mask.png"
        mask_path = os.path.join(mask_folder, mask_name)
        
        # Skip if mask doesn't exist
        if not os.path.exists(mask_path):
            print(f"Mask not found for {image_file}, skipping...")
            continue
        
        # Open image and mask
        with Image.open(image_path) as img, Image.open(mask_path) as mask:
            if img.size != mask.size:
                # Resize mask to match the image size
                mask_resized = mask.resize(img.size, Image.Resampling.NEAREST)
                mask_resized.save(mask_path)
                print(f"Resized mask for {image_file} to {img.size}")
            else:
                print(f"Mask for {image_file} already matches the image size.")

# Example usage
image_folder = r"D:\Data Downloads\Korean_Festival\original_images"
mask_folder = r"D:\Data Downloads\Korean_Festival\masks"

resize_masks_to_images(image_folder, mask_folder)

Resized mask for Korea_Festival_1.jpg to (960, 640)
Resized mask for Korea_Festival_10.jpg to (408, 612)
Resized mask for Korea_Festival_11.jpg to (387, 640)
Resized mask for Korea_Festival_12.jpg to (1242, 1554)
Resized mask for Korea_Festival_13.jpg to (408, 431)
Resized mask for Korea_Festival_14.jpg to (512, 339)
Resized mask for Korea_Festival_15.JPG to (293, 261)
Resized mask for Korea_Festival_16.jpg to (612, 408)
Resized mask for Korea_Festival_17.jpg to (465, 419)
Resized mask for Korea_Festival_18.jpg to (193, 309)
Resized mask for Korea_Festival_19.jpg to (406, 537)
Resized mask for Korea_Festival_2.jpg to (400, 600)
Resized mask for Korea_Festival_20.jpg to (500, 500)
Resized mask for Korea_Festival_21.jpg to (495, 246)
Resized mask for Korea_Festival_22.jpg to (208, 545)
Resized mask for Korea_Festival_23.jpg to (245, 407)
Resized mask for Korea_Festival_24.jpg to (300, 450)
Resized mask for Korea_Festival_25.jpg to (512, 341)
Resized mask for Korea_Festival_26.jpg to (640

### Important: Change the naming scheme for output images (i.e. at the bottom of this code, change race name)

In [11]:
def process_images(input_folder, mask_folder, output_folder, prompt, negative_prompt="", seed=0, output_format="png", target_race = "Undefined"):
    # Maximum allowed dimensions
    MAX_PIXELS = 9437184  # 3072 x 3072 pixels

    # Ensure output folder exists
    os.makedirs(output_folder, exist_ok=True)

    # Iterate through all files in the input folder
    for image_file in os.listdir(input_folder):
        try:
            # Construct the paths for the input image and the corresponding mask
            image_path = os.path.join(input_folder, image_file)
            mask_path = os.path.join(mask_folder, os.path.splitext(image_file)[0] + "_mask.png")

            # Prepare the output file name to check if it already exists
            filename_list = image_file.split('_')
            # Change file name here please.
            filename_list.insert(1, target_race)
            edited_filename = '_'.join(filename_list)
            edited_path = os.path.join(output_folder, edited_filename)

            # Skip if output image already exists
            if os.path.exists(edited_path):
                print(f"Output already exists for {image_file}, skipping...")
                continue

            # Skip if mask doesn't exist
            if not os.path.exists(mask_path):
                print(f"Mask not found for {image_file}, skipping...")
                continue

            # Open the image to check dimensions
            with Image.open(image_path) as img:
                width, height = img.size
                if width * height > MAX_PIXELS:
                    scale_factor = (MAX_PIXELS / (width * height)) ** 0.5
                    new_width = int(width * scale_factor)
                    new_height = int(height * scale_factor)
                    img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
                    resized_image_path = os.path.join(output_folder, f"resized_{image_file}")
                    img.save(resized_image_path)
                    print(f"Resized {image_file} to {new_width}x{new_height}")
                else:
                    resized_image_path = image_path

            # Parameters for the API
            params = {
                "image": resized_image_path,
                "mask": mask_path,
                "negative_prompt": negative_prompt,
                "seed": seed,
                "mode": "mask",
                "output_format": output_format,
                "prompt": prompt,
                "denoising_strength": 0.6
            }

            # Call the API
            response = send_generation_request("https://api.stability.ai/v2beta/stable-image/edit/inpaint", params)
            if not response or response.status_code != 200:
                print(f"API request failed for {image_file} with status code {response.status_code if response else 'No Response'}")
                continue

            # Decode response
            output_image = response.content
            finish_reason = response.headers.get("finish-reason")

            if finish_reason == 'CONTENT_FILTERED':
                print(f"Generation for {image_file} failed due to NSFW classification.")
                continue

            # Save the result image
            with open(edited_path, "wb") as f:
                f.write(output_image)

            print(f"Saved image {edited_filename} to {edited_path}")

        except Exception as e:
            print(f"Error processing {image_file}: {e}")
            continue

In [25]:
import random
# input_folder = "../images/Myanmar_Festivals/original_images"
# mask_folder = "../images/Myanmar_Festivals/masks"
# output_folder = "../images/Myanmar_Festivals/synthesized_images/White"
input_folder = r"D:\Data Downloads\Korean_Festival\original_images"
mask_folder = r"D:\Data Downloads\Korean_Festival\masks"
output_folder = r"D:\Data Downloads\Korean_Festival\synthesized_images\Indian"
prompt = "Indian Person(s) wearing the clothes"
target_race = "Indian"
seed = random.randint(0, 1000000)
negative_prompt = "blurry, grey, monochrome, low quality, low detail, deformed, washed out, masks"
output_format = "png"

process_images(input_folder, mask_folder, output_folder, prompt, negative_prompt, seed, output_format, target_race)

Sending REST request to https://api.stability.ai/v2beta/stable-image/edit/inpaint...
Saved image Korea_Indian_Festival_1.jpg to D:\Data Downloads\Korean_Festival\synthesized_images\Indian\Korea_Indian_Festival_1.jpg
Sending REST request to https://api.stability.ai/v2beta/stable-image/edit/inpaint...
Saved image Korea_Indian_Festival_10.jpg to D:\Data Downloads\Korean_Festival\synthesized_images\Indian\Korea_Indian_Festival_10.jpg
Sending REST request to https://api.stability.ai/v2beta/stable-image/edit/inpaint...
Saved image Korea_Indian_Festival_11.jpg to D:\Data Downloads\Korean_Festival\synthesized_images\Indian\Korea_Indian_Festival_11.jpg
Sending REST request to https://api.stability.ai/v2beta/stable-image/edit/inpaint...
Saved image Korea_Indian_Festival_12.jpg to D:\Data Downloads\Korean_Festival\synthesized_images\Indian\Korea_Indian_Festival_12.jpg
Sending REST request to https://api.stability.ai/v2beta/stable-image/edit/inpaint...
Saved image Korea_Indian_Festival_13.jpg to D

## Individual Inpainting

In [24]:
import os

input_folder = r"D:\Data Downloads\Korean_Festival\original_images"
mask_folder = r"D:\Data Downloads\Korean_Festival\masks"
output_folder = r"D:\Data Downloads\Korean_Festival\synthesized_images\White"
prompt = "White person(s) wearing the clothes"

negative_prompt = "blurry, grey, monochrome, low quality, low detail, deformed, washed out, covered"
output_format = "png"
seed = random.randint(0, 1000000)
image_file = "Korea_Festival_29.jpg"
MAX_PIXELS = 9437184  # 3072 x 3072 pixels
# seed = 50
# Construct the paths for the input image and the corresponding mask
image_path = os.path.join(input_folder, image_file)
mask_path = os.path.join(mask_folder, os.path.splitext(image_file)[0] + "_mask.png")  # Assuming mask has same name with "_mask" suffix

# # Skip if mask doesn't exist
# if not os.path.exists(mask_path):
#     print(f"Mask not found for {image_file}, skipping...")
#     continue

# Open the image to check dimensions
with Image.open(image_path) as img:
    width, height = img.size
    if width * height > MAX_PIXELS:
        scale_factor = (MAX_PIXELS / (width * height)) ** 0.5
        new_width = int(width * scale_factor)
        new_height = int(height * scale_factor)
        img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)

        # Save the resized image temporarily
        resized_image_path = os.path.join(output_folder, f"resized_{image_file}")
        img.save(resized_image_path)
        print(f"Resized {image_file} to {new_width}x{new_height}")
    else:
        resized_image_path = image_path  # No resizing needed

# Parameters for the API
params = {
    "image": resized_image_path,
    "mask": mask_path,
    "grow_mask":20,
    "seed": seed,
    "steps":60,
    "negative_prompt": negative_prompt,
    "mode": "mask",
    "output_format": output_format,
    "prompt": prompt
}

# Call the API
response = send_generation_request("https://api.stability.ai/v2beta/stable-image/edit/inpaint", params)

# Decode response
output_image = response.content
finish_reason = response.headers.get("finish-reason")
seed = response.headers.get("seed")

# # Check for NSFW classification
# if finish_reason == 'CONTENT_FILTERED':
#     print(f"Generation for {image_file} failed due to NSFW classification.")
#     continue

# Save the result image
filename, _ = os.path.splitext(image_file)
edited_filename = f"{filename}_revised.{output_format}"
edited_path = os.path.join(output_folder, edited_filename)

with open(edited_path, "wb") as f:
    f.write(output_image)

print(f"Saved image to {edited_path}")

Sending REST request to https://api.stability.ai/v2beta/stable-image/edit/inpaint...
Saved image to D:\Data Downloads\Korean_Festival\synthesized_images\White\Korea_Festival_29_revised.png


## Converting all images to PNG files in case they were not

In [None]:
import os
from PIL import Image

file_path = r"D:\Data Downloads\Relevant"

def convert_images_to_png(root_folder):
    # Define supported image extensions
    image_extensions = {".jpg", ".jpeg", ".bmp", ".gif", ".tiff", ".webp", ".PNG", ".JPG"}

    # Walk through the directory and its subdirectories
    for dirpath, dirnames, filenames in os.walk(root_folder):
        for filename in filenames:
            file_path = os.path.join(dirpath, filename)
            file_root, file_ext = os.path.splitext(filename)
            
            # Skip if it's already a PNG or not an image
            if file_ext.lower() == ".png" or file_ext.lower() not in image_extensions:
                continue
            
            try:
                with Image.open(file_path) as img:
                    # Convert and save as PNG
                    png_path = os.path.join(dirpath, f"{file_root}.png")
                    img.save(png_path, "PNG")
                    print(f"Converted: {file_path} -> {png_path}")
                    
                    # Optionally, remove the original file
                    os.remove(file_path)
                    print(f"Removed original file: {file_path}")
            except Exception as e:
                print(f"Failed to convert {file_path}: {e}")



convert_images_to_png(file_path)


Converted: D:\Data Downloads\Relevant\Korean_Clothes\original_images\Korea_Clothes_1.jpg -> D:\Data Downloads\Relevant\Korean_Clothes\original_images\Korea_Clothes_1.png
Removed original file: D:\Data Downloads\Relevant\Korean_Clothes\original_images\Korea_Clothes_1.jpg
Converted: D:\Data Downloads\Relevant\Korean_Clothes\original_images\Korea_Clothes_10.jpg -> D:\Data Downloads\Relevant\Korean_Clothes\original_images\Korea_Clothes_10.png
Removed original file: D:\Data Downloads\Relevant\Korean_Clothes\original_images\Korea_Clothes_10.jpg
Converted: D:\Data Downloads\Relevant\Korean_Clothes\original_images\Korea_Clothes_11.jpg -> D:\Data Downloads\Relevant\Korean_Clothes\original_images\Korea_Clothes_11.png
Removed original file: D:\Data Downloads\Relevant\Korean_Clothes\original_images\Korea_Clothes_11.jpg
Converted: D:\Data Downloads\Relevant\Korean_Clothes\original_images\Korea_Clothes_12.jpg -> D:\Data Downloads\Relevant\Korean_Clothes\original_images\Korea_Clothes_12.png
Removed o