In [None]:
import os
import sys
import numpy as np
from skimage import morphology
import mediapipe as mp
import warnings
import torch
import cv2
from torchvision.transforms import Compose
from pathlib import Path
lama_path = Path(__file__).resolve().parent.parent.parent / "AnyEdit_Collection/other_modules"
sys.path.insert(0, str(lama_path))
from AnyEdit_Collection.other_modules.depth_anything_v2.dpt import DepthAnythingV2
import matplotlib
import glob
from AnyEdit_Collection.other_modules.DPT.util import io
from AnyEdit_Collection.other_modules.DPT.dpt.transforms import Resize, NormalizeImage, PrepareForNet
from termcolor import cprint
from AnyEdit_Collection.other_modules.uniformer.mmseg.datasets.pipelines import Compose
from AnyEdit_Collection.other_modules.uniformer.mmseg.apis import init_segmentor, inference_segmentor, show_result_pyplot
from AnyEdit_Collection.other_modules.uniformer.mmseg.core.evaluation import get_palette
from AnyEdit_Collection.other_modules.HED import HEDdetector

warnings.filterwarnings("ignore")


def img2sketch(image_path, output_path):
    # Load the image
    img = cv2.imread(image_path)
    # Convert to grayscale
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    # Apply Gaussian blur
    gaussian_blur = cv2.GaussianBlur(gray, (3, 3), 0)
    # Apply edge detection
    edges = cv2.Canny(gaussian_blur, 50, 150)
    # Invert the colors of the edges
    edges = cv2.bitwise_not(edges)
    # Save the result
    cv2.imwrite(output_path, edges)

def run_depth(input_image, output_image, model):
    """Run MonoDepthNN to compute depth maps.

    Args:
        input_path (str): path to input image
        output_path (str): path to output image
        model_path (str): path to saved model
    """

    # select device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    net_w = net_h = 384
    normalization = NormalizeImage(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
    transform = Compose(
        [
            Resize(
                net_w,
                net_h,
                resize_target=None,
                keep_aspect_ratio=True,
                ensure_multiple_of=32,
                resize_method="minimal",
                image_interpolation_method=cv2.INTER_CUBIC,
            ),
            normalization,
            PrepareForNet(),
        ]
    )
    model.eval()

    if device == torch.device("cuda"):
        model = model.to(memory_format=torch.channels_last)
        model = model.half()

    model.to(device)
    img = io.read_image(input_image)

    img_input = transform({"image": img})["image"]

    # compute
    with torch.no_grad():
        sample = torch.from_numpy(img_input).to(device).unsqueeze(0)

        if device == torch.device("cuda"):
            sample = sample.to(memory_format=torch.channels_last)
            sample = sample.half()

        prediction = model.forward(sample)
        prediction = (
            torch.nn.functional.interpolate(
                prediction.unsqueeze(1),
                size=img.shape[:2],
                mode="bicubic",
                align_corners=False,
            )
            .squeeze()
            .cpu()
            .numpy()
        )
    cprint(output_image, 'red')
    io.write_depth(output_image, prediction, bits=2, absolute_depth=False)


def img2depth(image_path, output_path, depth_anything = None):
    '''
    change to depth anything V2
    '''
    

    if os.path.isfile(image_path):
        if image_path.endswith('txt'):
            with open(image_path, 'r') as f:
                filenames = f.read().splitlines()
        else:
            filenames = [image_path]
    else:
        filenames = glob.glob(os.path.join(image_path, '**/*'), recursive=True)

    cmap = matplotlib.colormaps.get_cmap('Spectral_r')

    for k, filename in enumerate(filenames):
        raw_image = cv2.imread(filename)
        depth = depth_anything.infer_image(raw_image)
        depth = (depth - depth.min()) / (depth.max() - depth.min()) * 255.0
        depth = depth.astype(np.uint8)
        depth = (cmap(depth)[:, :, :3] * 255)[:, :, ::-1].astype(np.uint8)
        cv2.imwrite(output_path, depth)


def img2seg(image_path, output_path, model):
    # Load the image
    img = cv2.imread(image_path)
    result = inference_segmentor(model, img)
    res_img = show_result_pyplot(model, img, result, get_palette('ade'), opacity=1)
    # Save the result
    cv2.imwrite(output_path, res_img)




# Visual Segmentation

In [None]:
seg_model = init_segmentor(config='./AnyEdit_Collection/other_modules/uniformer', device='cuda',
                               checkpoint='./checkpoints/visual_models/annotator/ckpts/upernet_global_small.pth')
    

In [None]:
import json
from AnyEdit_Collection.adaptive_editing_pipelines.tools.tool import prepare_output_dir
from tqdm import tqdm
import random

task = 'segment'
action = 'remove'

# place to hold edited images
sketch_root = f'./AnyEdit/data/{action}/edited_img'

# place to hold segment
output_dir = f'./AnyEdit/data/visual_{task}'
prepare_output_dir(output_dir)

# successful instructions
json_path = f'./AnyEdit/data/{action}/edit_success_-1_-1.json'
with open(json_path, 'r') as f:
    instructions = json.load(f)

# final results
final_instructions = []
if os.path.exists(os.path.join(output_dir,'edit_result.json')):
    with open(os.path.join(output_dir,'edit_result.json'), 'r') as f:
        final_instructions = json.load(f)
data = {}
iter = 0

# for image in os.listdir(scribble_root):
for ins in tqdm(instructions):
    
    iter += 1
    ins['edit_type'] = 'sketch'
    ins['edit'] = random.choice(['Follow ', 'Refer to ', 'Watch ']) + f'the given sketch [V*] to {action} {ins["edited object"]} '
    image_path = os.path.join(sketch_root, ins['image_file'])
    
    if not os.path.exists(image_path):
        print(f'{image_path} not exists')
        continue
    
    output_path = os.path.join(output_dir, 'edited_img',ins['image_file'])

    img2seg(image_path=image_path,
                output_path=output_path, model=seg_model)
    final_instructions.append(ins)
    if iter % 1000 == 0:
        with open(os.path.join(output_dir,'edit_result.json'), 'w') as f:
            json.dump(final_instructions, f)
with open(os.path.join(output_dir,'edit_result.json'), 'w') as f:
            json.dump(final_instructions, f)

# Visual Scribble

In [None]:
hed_model = HEDdetector(path='./checkpoints/visual_models/ControlNetHED.pth')

In [None]:
import json
from AnyEdit_Collection.adaptive_editing_pipelines.tools.tool import prepare_output_dir
from tqdm import tqdm
import random

task = 'scribble'
action = 'remove'

# place to hold edited images
sketch_root = f'./AnyEdit/data/{action}/edited_img'

# place to hold scribbles
output_dir = f'./AnyEdit/data/visual_{task}'
prepare_output_dir(output_dir)

# successful instructions
json_path = f'./AnyEdit/data/{action}/edit_success_-1_-1.json'
with open(json_path, 'r') as f:
    instructions = json.load(f)

# final results
final_instructions = []
if os.path.exists(os.path.join(output_dir,'edit_result.json')):
    with open(os.path.join(output_dir,'edit_result.json'), 'r') as f:
        final_instructions = json.load(f)
data = {}
iter = 0

# for image in os.listdir(scribble_root):
for ins in tqdm(instructions):
    
    iter += 1
    ins['edit_type'] = 'sketch'
    ins['edit'] = random.choice(['Follow ', 'Refer to ', 'Watch ']) + f'the given sketch [V*] to {action} {ins["edited object"]} '
    image_path = os.path.join(sketch_root, ins['image_file'])
    
    if not os.path.exists(image_path):
        image_path = image_path.replace('.jpg', '_0.png')
    if not os.path.exists(image_path):
        print(f'{image_path} not exists')
        continue
    
    output_path = os.path.join(output_dir, 'edited_img',ins['image_file'])

    hed_model(image_path, output_path)
    final_instructions.append(ins)
with open(os.path.join(output_dir,'edit_result.json'), 'w') as f:
    json.dump(final_instructions, f)

# Visual Sketch

In [None]:
import json
from AnyEdit_Collection.adaptive_editing_pipelines.tools.tool import prepare_output_dir
from tqdm import tqdm
import random

task = 'sketch'
action = 'remove'

# place to hold edited images
sketch_root = f'./AnyEdit/data/{action}/edited_img'

# place to hold sketch
output_dir = f'./AnyEdit/data/visual_{task}'
prepare_output_dir(output_dir)

# successful instructions
json_path = f'./AnyEdit/data/{action}/edit_success_-1_-1.json'
with open(json_path, 'r') as f:
    instructions = json.load(f)

# final results
final_instructions = []
if os.path.exists(os.path.join(output_dir,'edit_result.json')):
    with open(os.path.join(output_dir,'edit_result.json'), 'r') as f:
        final_instructions = json.load(f)
data = {}
iter = 0
action = 'remove'
# for image in os.listdir(scribble_root):
for ins in tqdm(instructions):
    
    iter += 1
    ins['edit_type'] = 'sketch'
    ins['edit'] = random.choice(['Follow ', 'Refer to ', 'Watch ']) + f'the given sketch [V*] to {action} {ins["edited object"]} '
    image_path = os.path.join(sketch_root, ins['image_file'])
    
    if not os.path.exists(image_path):
        image_path = image_path.replace('.jpg', '_0.png')
    if not os.path.exists(image_path):
        print(f'{image_path} not exists')
        continue
    
    output_path = os.path.join(output_dir, 'edited_img',ins['image_file'])
    output_path = output_path.replace('jpg', 'png')
    img2sketch(image_path=image_path,
                output_path=output_path)
    final_instructions.append(ins)
    if iter % 1000 == 0:
        with open(os.path.join(output_dir,'edit_result.json'), 'w') as f:
            json.dump(final_instructions, f)
with open(os.path.join(output_dir,'edit_result.json'), 'w') as f:
            json.dump(final_instructions, f)

# Visual Depth

In [None]:
DEVICE = 'cuda'
depth_anything = DepthAnythingV2(encoder='vitl', features=256, out_channels=[256, 512, 1024, 1024])
depth_anything.load_state_dict(
    torch.load(f'./checkpoints/visual_models/depth_anything_v2/depth_anything_v2_vitl.pth',
                map_location='cpu'))
depth_anything = depth_anything.to(DEVICE).eval()

In [None]:
import json
from AnyEdit_Collection.adaptive_editing_pipelines.tools.tool import prepare_output_dir
from tqdm import tqdm
import random

task = 'depth'
action = 'remove'

# place to hold edited images
depth_root = f'./AnyEdit/data/{action}/edited_img'

# place to hold depth
output_dir = f'./AnyEdit/data/visual_{task}'
prepare_output_dir(output_dir)

# successful instructions
json_path = f'./AnyEdit/data/{action}/edit_success_-1_-1.json'
with open(json_path, 'r') as f:
    instructions = json.load(f)

# final results
final_instructions = []
if os.path.exists(os.path.join(output_dir,'edit_result.json')):
    with open(os.path.join(output_dir,'edit_result.json'), 'r') as f:
        final_instructions = json.load(f)
data = {}
iter = 0

# for image in os.listdir(scribble_root):
for ins in tqdm(instructions):
    
    iter += 1
    ins['edit_type'] = 'depth image'
    ins['edit'] = random.choice(['Follow ', 'Refer to ', 'Watch ']) + f'the given depth image [V*] to {action} {ins["edited object"]} '
    image_path = os.path.join(sketch_root, ins['image_file'])
    
    if not os.path.exists(image_path):
        image_path = image_path.replace('.jpg', '_0.png')
    if not os.path.exists(image_path):
        print(f'{image_path} not exists')
        continue
    
    output_path = os.path.join(output_dir, 'edited_img',ins['image_file'])
    output_path = output_path.replace('jpg', 'png')
    img2depth(image_path=image_path,
                output_path=output_path, depth_anything = depth_anything)
    final_instructions.append(ins)
    if iter % 1000 == 0:
        with open(os.path.join(output_dir,'edit_result.json'), 'w') as f:
            json.dump(final_instructions, f)
with open(os.path.join(output_dir,'edit_result.json'), 'w') as f:
            json.dump(final_instructions, f)

# Bbox 区域

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = '0'
import torch
from PIL import Image
import random
import GroundingDINO.groundingdino.datasets.transforms as T
from GroundingDINO.groundingdino.models import build_model
from GroundingDINO.groundingdino.util.slconfig import SLConfig
from GroundingDINO.groundingdino.util.utils import clean_state_dict, get_phrases_from_posmap
from segment_anything import build_sam, SamPredictor
import cv2
import numpy as np
import warnings
import json
from tqdm import tqdm
import argparse
import spacy
# from tool import is_human_variant, return_parameters, maskgeneration
from termcolor import cprint  
from AnyEdit_Collection.adaptive_editing_pipelines.tools.tool import maskgeneration, get_bbox_from_mask     

In [None]:
def load_tool_model():
    config_file = './GroundingDINO/groundingdino/config/GroundingDINO_SwinB_cfg.py' #'GroundingDINO/groundingdino/config/GroundingDINO_SwinB_cfg.py' 
    grounded_checkpoint = './checkpoints/foundation_models/groundingDINO/groundingdino_swinb_cogcoor.pth' #'checkpoints/groundingdino_swinb_cogcoor.pth'  
    sam_checkpoint = './checkpoints/foundation_models/sam_vit_h_4b8939.pth' #'checkpoints/sam_vit_h_4b8939.pth'
    
    device = 'cuda'

    det_model = load_model(config_file, grounded_checkpoint, device=device)
    sam_model = SamPredictor(build_sam(checkpoint=sam_checkpoint).to(device))

   
    return det_model, sam_model
def load_model(model_config_path, model_checkpoint_path, device):
    args = SLConfig.fromfile(model_config_path)
    args.device = device
    model = build_model(args)
    checkpoint = torch.load(model_checkpoint_path, map_location="cuda")
    load_res = model.load_state_dict(clean_state_dict(checkpoint["model"]), strict=False)
    _ = model.eval()
    return model

def draw_bbox(image, output_path, bbox):
    if isinstance(image, str):
        image = cv2.imread(image_path)
    
    cv2.rectangle(image, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 0, 255), 2)
    
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    cv2.imwrite(output_path, image)
    
def img2bbox(input_path, output_path, target_obj, det_model, sam_model):
    image = cv2.imread(input_path)
    mask_pil,image_pil,bbox_pil,union = maskgeneration(det_model, sam_model, input_path, target_obj)
    if mask_pil is None:
        print(f'Can not find {target_obj} in {input_path}')
        return False, None
    y1, y2, x1, x2 = get_bbox_from_mask(np.array(mask_pil))
    image_pil = np.array(image_pil)
    draw_bbox(image_pil, output_path, [x1, y1, x2, y2])
    
    return True, mask_pil

def draw_bbox(image, output_path, bbox):
    if isinstance(image, str):
        image = cv2.imread(image_path)
    
    cv2.rectangle(image, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 0, 255), 2)
    
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    cv2.imwrite(output_path, image)
    
def img2bbox(input_path, output_path, target_obj, det_model, sam_model):
    image = cv2.imread(input_path)
    mask_pil,image_pil,bbox_pil,union = maskgeneration(det_model, sam_model, input_path, target_obj)
    if mask_pil is None:
        print(f'Can not find {target_obj} in {input_path}')
        return False, None
    y1, y2, x1, x2 = get_bbox_from_mask(np.array(mask_pil))
    image_pil = np.array(image_pil)
    draw_bbox(image_pil, output_path, [x1, y1, x2, y2])
    
    return True, mask_pil

In [None]:
det_model, sam_model = load_tool_model()

In [None]:
import json
from AnyEdit_Collection.adaptive_editing_pipelines.tools.tool import prepare_output_dir
from tqdm import tqdm
import random

task = 'boundingbox'
action = 'remove'

# place to hold edited images
boundingbox_root = f'./AnyEdit/data/{action}/edited_img'

# place to hold boundingbox
output_dir = f'./AnyEdit/data/visual_{task}'
prepare_output_dir(output_dir)

# successful instructions
json_path = f'./AnyEdit/data/{action}/edit_success_-1_-1.json'
with open(json_path, 'r') as f:
    instructions = json.load(f)

# final results
final_instructions = []
if os.path.exists(os.path.join(output_dir,'edit_result.json')):
    with open(os.path.join(output_dir,'edit_result.json'), 'r') as f:
        final_instructions = json.load(f)
pre_state= 0
if os.path.exists(os.path.join(output_dir,'state.json')):
    with open(os.path.join(output_dir,'state.json'), 'r') as f:
        data = json.load(f)
        pre_state = data['iter']
data = {}

# for image in os.listdir(scribble_root):
for ins in tqdm(instructions):
    iter+=1
    if iter < pre_state:
        continue
    
    ins['edit_type'] = 'visual_boundingbox'
    ins['edit'] = random.choice(['Follow ', 'Refer to ', 'Watch ']) + f'the given boundingbox [V*] to change {ins["edited object"]} '
    image_path = os.path.join(root, ins['image_file'])
    image_path = image_path.replace('jpg', 'png')
    output_path = os.path.join(output_dir, 'edited_img',ins['image_file'])
    output_path = output_path.replace('jpg', 'png')
    success,mask = img2bbox(input_path=image_path,
                output_path=output_path, target_obj = ins['ref_object'], det_model = det_model, sam_model = sam_model)
    if success:
        final_instructions.append(ins)
    else:
        print(f'Can not find {ins["edited object"]} in {image_path}')
    if iter % 50 == 0:
        with open(os.path.join(output_dir,'edit_result.json'), 'w') as f:
            json.dump(final_instructions, f)
        with open(os.path.join(output_dir,'state.json'), 'w') as f:
            json.dump({'iter':iter}, f)
        
        
with open(os.path.join(output_dir,'edit_result.json'), 'w') as f:
        json.dump(final_instructions, f)