In [20]:
import time
from PIL import Image
import os
from concurrent.futures import ThreadPoolExecutor
import json
import cv2
from tqdm import tqdm


def crop_image(image_path, centers, window_size, demo="", data_type=""):
    """
    image_path: 图片路径
    centers: 一系列中心坐标 (x, y) 的列表
    window_size: 切割图块的大小（正方形的边长）
    """

    # 检查图像路径是否存在
    image = cv2.imread(image_path)
    if image is None:
        raise ValueError(f"Failed to load image: {image_path}")

    height, width, _ = image.shape

    half_window = window_size // 2  # 窗口的一半

    # 记录开始的时间
    start_time = time.time()

    # 记录裁剪的图块的地址
    cropped_image_paths = []

    # 获取当前工作目录
    current_directory = os.getcwd()

    # 提取文件名（不带扩展名）
    filename_without_ext = os.path.splitext(os.path.basename(image_path))[0]

    result_dir = f'{current_directory}/croped_result_{demo}/{data_type}/{filename_without_ext}'

    # 检查目录是否存在，如果不存在则创建
    if not os.path.exists(result_dir):
        os.makedirs(result_dir)
    for i, (x, y) in enumerate(centers):
        # 裁剪图块
        cropped_image = crop_and_save(
            image, x, y, half_window, width, height, i, result_dir)
        cropped_image_paths.append(cropped_image)

    # 记录结束的时间
    end_time = time.time()
    # print(f"Time elapsed, crop img: {end_time - start_time} seconds")

    return cropped_image_paths


def crop_and_save(image, x, y, half_window, width, height, i, result_dir):

    # 确定裁剪区域的左上角和右下角
    left = max(0, x - half_window)
    top = max(0, y - half_window)
    right = min(width, x + half_window)
    bottom = min(height, y + half_window)

    left = round(left)
    top = round(top)
    right = round(right)
    bottom = round(bottom)

    # 确保裁剪区域在图像边界内
    if left < 0:
        left = 0
    if top < 0:
        top = 0
    if right > width:
        right = width
    if bottom > height:
        bottom = height

    # 裁剪图块
    cropped_image = image[top:bottom, left:right]

    # 保存裁剪的图块的坐标信息到json文件
    json_path = os.path.join(result_dir, f"cropped_image_coordinate.json")
    res = {}
    res["coordinates"] = []
    res["coordinates"].append({
        "cx": x,
        "cy": y,
        "left": left,
        "top": top,
        "right": right,
        "bottom": bottom
    })
    save = {}
    save["coordinates"] = []

    if os.path.exists(json_path):
        # 删除已经存在的json文件
        # os.remove(json_path)
        with open(json_path, 'r') as json_file:
            data = json.load(json_file)
    else:
        data = {}
        data['coordinates'] = []
    save['coordinates'] = data['coordinates'] + res['coordinates']
    with open(json_path, 'w') as json_file:
        json.dump(save, json_file, indent=4)

    # 保存裁剪的图块到指定目录
    save_path = os.path.join(result_dir, f"cropped_image_{i}.png")
    cv2.imwrite(save_path, cropped_image)

    return save_path

In [31]:
from argparse import Namespace
import torch
import numpy as np
import cv2
import rawpy
import os

from model.loftr_src.loftr.utils.cvpr_ds_config import default_cfg
from model.full_model import GeoFormer as GeoFormer_

from eval_tool.immatch.utils.data_io import load_gray_scale_tensor_cv
from model.geo_config import default_cfg as geoformer_cfg


class GeoFormer():
    def __init__(self, imsize, match_threshold, no_match_upscale=False, ckpt=None, device='cuda'):

        self.device = device
        self.imsize = imsize
        self.match_threshold = match_threshold
        self.no_match_upscale = no_match_upscale

        # Load model
        conf = dict(default_cfg)
        conf['match_coarse']['thr'] = self.match_threshold
        geoformer_cfg['coarse_thr'] = self.match_threshold
        self.model = GeoFormer_(conf)
        ckpt_dict = torch.load(ckpt, map_location=torch.device('cpu'))
        if 'state_dict' in ckpt_dict:
            ckpt_dict = ckpt_dict['state_dict']
        self.model.load_state_dict(ckpt_dict, strict=False)
        self.model = self.model.eval().to(self.device)

        # Name the method
        self.ckpt_name = ckpt.split('/')[-1].split('.')[0]
        self.name = f'GeoFormer_{self.ckpt_name}'
        if self.no_match_upscale:
            self.name += '_noms'
        print(f'Initialize {self.name}')

    def change_deivce(self, device):
        self.device = device
        self.model.to(device)

    def load_im(self, im_path, enhanced=False):
        return load_gray_scale_tensor_cv(
            im_path, self.device, imsize=self.imsize, dfactor=8, enhanced=enhanced, value_to_scale=min
        )

    def match_inputs_(self, gray1, gray2, is_draw=False):

        batch = {'image0': gray1, 'image1': gray2}
        with torch.no_grad():
            batch = self.model(batch)
        kpts1 = batch['mkpts0_f'].cpu().numpy()
        kpts2 = batch['mkpts1_f'].cpu().numpy()

        def draw():
            import matplotlib.pyplot as plt
            import cv2
            import numpy as np
            plt.figure(dpi=500)
            kp0 = kpts1
            kp1 = kpts2
            # if len(kp0) > 0:
            kp0 = [cv2.KeyPoint(int(k[0]), int(k[1]), 30) for k in kp0]
            kp1 = [cv2.KeyPoint(int(k[0]), int(k[1]), 30) for k in kp1]
            matches = [cv2.DMatch(_trainIdx=i, _queryIdx=i, _distance=1, _imgIdx=-1) for i in
                       range(len(kp0))]

            show = cv2.drawMatches((gray1.cpu()[0][0].numpy() * 255).astype(np.uint8), kp0,
                                   (gray2.cpu()[0][0].numpy() *
                                    255).astype(np.uint8), kp1, matches,
                                   None)
            plt.imshow(show)
            plt.axis('off')
            plt.show()
        if is_draw:
            draw()
        scores = batch['mconf'].cpu().numpy()
        matches = np.concatenate([kpts1, kpts2], axis=1)
        return matches, kpts1, kpts2, scores

    def match_pairs(self, im1_path, im2_path, cpu=False, is_draw=False):
        torch.cuda.empty_cache()
        tmp_device = self.device
        if cpu:
            self.change_deivce('cpu')

        gray1, sc1 = self.load_im(im1_path)
        gray2, sc2 = self.load_im(im2_path)

        upscale = np.array([sc1 + sc2])
        matches, kpts1, kpts2, scores = self.match_inputs_(
            gray1, gray2, is_draw)

        if self.no_match_upscale:
            return matches, kpts1, kpts2, scores, upscale.squeeze(0)

        # Upscale matches &  kpts
        matches = upscale * matches
        kpts1 = sc1 * kpts1
        kpts2 = sc2 * kpts2

        if cpu:
            self.change_deivce(tmp_device)

        return matches, kpts1, kpts2, scores


g = GeoFormer(640, 0.5, no_match_upscale=False,
              ckpt='saved_ckpt/geoformer.ckpt', device='cuda')

Initialize GeoFormer_geoformer


  ckpt_dict = torch.load(ckpt, map_location=torch.device('cpu'))


In [3]:
import matplotlib.pyplot as plt
import cv2
import numpy as np


def get_homography_res(img1_path, img2_path, kpts1, kpts2, matches, is_draw=False):
    if len(matches) < 4:
        return None, None

    img1_color = cv2.imread(img1_path)
    img2_color = cv2.imread(img2_path)

    src_pts = np.float32(matches[:, :2]).reshape(-1, 1, 2)
    dst_pts = np.float32(matches[:, 2:]).reshape(-1, 1, 2)

    matrix, inliers = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0)

    height, width = img2_color.shape[:2]

    aligned_img1_color = cv2.warpPerspective(
        img1_color, matrix, (width, height))

    if is_draw:
        plt.figure(dpi=500)

        # 示例关键点和图像
        kp0 = [cv2.KeyPoint(int(k[0]), int(k[1]), 30) for k in kpts2]
        kp1 = [cv2.KeyPoint(int(k[0]), int(k[1]), 30) for k in kpts2]
        matches = [cv2.DMatch(_trainIdx=i, _queryIdx=i,
                              _distance=1, _imgIdx=-1) for i in range(len(kp0))]

        # 转换图像为 RGB 格式
        aligned_img1_rgb = cv2.cvtColor(aligned_img1_color, cv2.COLOR_BGR2RGB)
        img2_rgb = cv2.cvtColor(img2_color, cv2.COLOR_BGR2RGB)

        # 创建拼接图像
        h1, w1 = aligned_img1_rgb.shape[:2]
        h2, w2 = img2_rgb.shape[:2]
        canvas = np.zeros((max(h1, h2), w1 + w2, 3), dtype=np.uint8)
        canvas[:h1, :w1] = aligned_img1_rgb
        canvas[:h2, w1:w1 + w2] = img2_rgb

        # 绘制匹配点和随机颜色连线
        for match in matches:
            pt1 = (int(kp0[match.queryIdx].pt[0]),
                   int(kp0[match.queryIdx].pt[1]))
            pt2 = (int(kp1[match.trainIdx].pt[0]) +
                   w1, int(kp1[match.trainIdx].pt[1]))
            random_color = tuple(np.random.randint(0, 256, 3).tolist())
            cv2.line(canvas, pt1, pt2, color=random_color, thickness=2)
            cv2.circle(canvas, pt1, radius=10,
                       color=random_color, thickness=-1)
            cv2.circle(canvas, pt2, radius=10,
                       color=random_color, thickness=-1)

        # 显示结果
        print("仿射变换后的匹配结果：")
        plt.imshow(canvas)
        plt.axis('off')
        plt.show()

    return cv2.cvtColor(aligned_img1_color, cv2.COLOR_BGR2RGB), matrix

In [18]:
import numpy as np
import cv2
import json

def transform_coord(demo, i, matrix):

    # 读取 attention.json 文件
    with open(f'/home/hechunjiang/gradio/src/result/attention_area/{i}.json', 'r') as f:
        attention_data = json.load(f)

    # 提取关注区域坐标
    points = attention_data['points']

    # 变换区域坐标
    transformed_points = []
    for point in points:
        # 获取原始区域的四个角坐标
        x1, y1, x2, y2 = point['x1'], point['y1'], point['x2'], point['y2']

        # 使用变换矩阵将区域的四个角坐标变换到新图像坐标系
        corners = np.array([[x1, y1], [x2, y1], [x2, y2], [x1, y2]], dtype='float32').reshape(-1, 1, 2)
        transformed_corners = cv2.perspectiveTransform(corners, matrix)

        # 获取变换后的矩形区域的坐标
        # transformed_corners 是一个 4x1x2 的数组，需要提取并计算新的 x1, y1, x2, y2
        x_min = float(min(transformed_corners[0][0][0], transformed_corners[1][0][0], transformed_corners[2][0][0], transformed_corners[3][0][0]))
        y_min = float(min(transformed_corners[0][0][1], transformed_corners[1][0][1], transformed_corners[2][0][1], transformed_corners[3][0][1]))
        x_max = float(max(transformed_corners[0][0][0], transformed_corners[1][0][0], transformed_corners[2][0][0], transformed_corners[3][0][0]))
        y_max = float(max(transformed_corners[0][0][1], transformed_corners[1][0][1], transformed_corners[2][0][1], transformed_corners[3][0][1]))

        # 更新为变换后的矩形区域
        transformed_points.append({
            'x1': x_min,
            'y1': y_min,
            'x2': x_max,
            'y2': y_max
        })

    # 将变换后的结果保存回 JSON 文件
    attention_data['points'] = transformed_points

    if not os.path.exists(f'/home/hechunjiang/gradio/src/result/attention_area/{demo}'):
        os.makedirs(f'/home/hechunjiang/gradio/src/result/attention_area/{demo}')

    with open(f'/home/hechunjiang/gradio/src/result/attention_area/{demo}/transformed_attention_{i}.json', 'w') as f:
        json.dump(attention_data, f, indent=4)

    print(f"Transformed attention area {demo} {i} and saved to {demo}/transformed_attention_{i}.json")


In [30]:
import shutil

# 删除/home/hechunjiang/gradio/GeoFormer/croped_result_KTC/finetune_dst/19
demo_list = ["LG", "SONY", "AMAZON", "SKYWORTH", "KTC", "REDMAGIC", "HISENSE"]

for demo in demo_list:
    for i in [19, 20, 21]:
        if os.path.exists(f'/home/hechunjiang/gradio/GeoFormer/croped_result_{demo}/finetune_dst/{i}'):
            shutil.rmtree(f'/home/hechunjiang/gradio/GeoFormer/croped_result_{demo}/finetune_dst/{i}')
        if os.path.exists(f'/home/hechunjiang/gradio/GeoFormer/croped_result_{demo}/finetune_ref/{i}'):
            shutil.rmtree(f'/home/hechunjiang/gradio/GeoFormer/croped_result_{demo}/finetune_ref/{i}')

In [None]:
from tqdm import tqdm

demo_list = ["LG", "SONY", "AMAZON", "SKYWORTH", "KTC", "REDMAGIC", "HISENSE"]

img2_path_list = [
    "/home/hechunjiang/gradio/样品1 LG 65UF8580/华为P50手机采集图像/样品1采集图像/",
    "/home/hechunjiang/gradio/样品2 SONY 43吋/华为P50手机采集图像/",
    "/home/hechunjiang/gradio/样品3 亚马逊 43吋/华为P50手机采集图像/",
    "/home/hechunjiang/gradio/样品4 创维 F32D80U/华为P50采集图像/",
    "/home/hechunjiang/gradio/样品5 KTC M27P20P/华为P50采集图像/",
    "/home/hechunjiang/gradio/样品6 红魔 GM001S/华为P50采集图像/",
    "/home/hechunjiang/gradio/样品7 海信 27G7K-PRO/华为P50采集图像/"
]

all_croped_image_counts = [[] for _ in range(len(demo_list))]

for idx, (demo, image2_path) in enumerate(zip(demo_list, img2_path_list)):
    for i in range(19, 22):
    # for i in [9, 10, 18]:
        img1_path = f"/home/hechunjiang/gradio/监视器采集图像/{i}.jpg"
        img2_path = f"{image2_path}{i}.jpg"

        matches, kpts1, kpts2, scores = g.match_pairs(
            img1_path, img2_path, is_draw=False)
        res = {"matches": matches.tolist(), "kpts1": kpts1.tolist(),
               "kpts2": kpts2.tolist(), "scores": scores.tolist()}

        # 将monitor向样品对齐
        aligned_img, matrix = get_homography_res(
            img1_path, img2_path, kpts1, kpts2, matches, is_draw=False)
        
        if aligned_img is None:
            print(f"{demo}下的第{i}张图 : 匹配点数过少")
            continue
        
        # transform_coord(demo, i, matrix)

        # 保存aligned_img
        save_path = f"/home/hechunjiang/gradio/aligned_imgs/{demo}/{i}.jpg"
        if not os.path.exists(os.path.dirname(save_path)):
            os.makedirs(os.path.dirname(save_path))

        cv2.imwrite(save_path, cv2.cvtColor(aligned_img, cv2.COLOR_RGB2BGR))

        croped_image_path1 = crop_image(
            save_path, res['kpts2'], 300, demo, "finetune_ref")
        croped_image_path2 = crop_image(
            img2_path, res['kpts2'], 300, demo, "finetune_dst")
        print(f"{demo} : {i}.png done, num of croped_image: {len(croped_image_path1)}")
        all_croped_image_counts[idx].append(len(croped_image_path1))

In [None]:
i = 20

image2_path = img2_path_list[4]

img1_path = f"/home/hechunjiang/gradio/监视器采集图像/{i}.jpg"
img2_path = f"{image2_path}{i}.jpg"

g = GeoFormer(640, 0.5, no_match_upscale=False,
              ckpt='saved_ckpt/geoformer.ckpt', device='cuda')
matches, kpts1, kpts2, scores = g.match_pairs(
    img1_path, img2_path, is_draw=True)
res = {"matches": matches.tolist(), "kpts1": kpts1.tolist(),
        "kpts2": kpts2.tolist(), "scores": scores.tolist()}

# 将monitor向样品对齐
aligned_img, matrix = get_homography_res(
    img1_path, img2_path, kpts1, kpts2, matches, is_draw=True)

plt.figure(dpi=500)
plt.imshow(aligned_img)
plt.axis('off')
plt.show()
