In [1]:
import cv2
import numpy as np
import mediapipe as mp
import os
import math
from typing import Tuple, List, Optional, Dict

# 初始化mediapipe模型
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1, refine_landmarks=True)

def get_face_landmarks(image: np.ndarray) -> Optional[Dict[int, Tuple[int, int]]]:
    """获取人脸的所有关键点坐标"""
    results = face_mesh.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    if not results.multi_face_landmarks:
        return None
    
    landmarks = results.multi_face_landmarks[0].landmark
    h, w = image.shape[:2]
    
    return {i: (int(landmark.x * w), int(landmark.y * h)) for i, landmark in enumerate(landmarks)}

def calculate_rotation_angle(point1: Tuple[int, int], point2: Tuple[int, int]) -> float:
    """计算两点连线的旋转角度（度），修正为反向旋转"""
    dx = point2[0] - point1[0]
    dy = point2[1] - point1[1]
    return -math.degrees(math.atan2(dy, dx))

def apply_clothing_overlay_with_rotation(
    face_img: np.ndarray,
    clothing_img: np.ndarray,
    clothing_center_point: Tuple[int, int],
    face_landmarks: Dict[int, Tuple[int, int]]
) -> np.ndarray:
    """带旋转的衣服替换功能"""
    if clothing_img.shape[2] == 3:
        clothing_img = cv2.cvtColor(clothing_img, cv2.COLOR_BGR2BGRA)
    
    point_152 = face_landmarks.get(152)
    point_172 = face_landmarks.get(58)
    point_397 = face_landmarks.get(288)
    
    if not all([point_152, point_172, point_397]):
        raise ValueError("缺少必要的人脸关键点")
    
    angle = calculate_rotation_angle(point_172, point_397)
    ref_distance = 300
    face_distance = math.sqrt((point_397[0] - point_172[0])**2 + (point_397[1] - point_172[1])**2)
    scale = face_distance / ref_distance
    
    h, w = face_img.shape[:2]
    clothing_h, clothing_w = clothing_img.shape[:2]
    
    new_width = int(clothing_w * scale)
    new_height = int(clothing_h * scale)
    resized_clothing = cv2.resize(clothing_img, (new_width, new_height), interpolation=cv2.INTER_LINEAR)
    
    center_x = int(clothing_center_point[0] * scale)
    center_y = int(clothing_center_point[1] * scale)
    
    rotation_matrix = cv2.getRotationMatrix2D((center_x, center_y), angle, 1)
    rotated_clothing = cv2.warpAffine(
        resized_clothing, rotation_matrix, (new_width, new_height),
        flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT, borderValue=(0,0,0,0))
    
    rot_center_x = rotation_matrix[0, 0] * center_x + rotation_matrix[0, 1] * center_y + rotation_matrix[0, 2]
    rot_center_y = rotation_matrix[1, 0] * center_x + rotation_matrix[1, 1] * center_y + rotation_matrix[1, 2]
    
    start_x = point_152[0] - int(rot_center_x)
    start_y = point_152[1] - int(rot_center_y)
    
    overlay = np.zeros((h, w, 4), dtype=np.uint8)
    
    src_x_start = max(0, -start_x)
    src_y_start = max(0, -start_y)
    src_x_end = min(rotated_clothing.shape[1], w - start_x)
    src_y_end = min(rotated_clothing.shape[0], h - start_y)
    
    dst_x_start = max(0, start_x)
    dst_y_start = max(0, start_y)
    dst_x_end = min(w, start_x + rotated_clothing.shape[1])
    dst_y_end = min(h, start_y + rotated_clothing.shape[0])
    
    if src_x_end > src_x_start and src_y_end > src_y_start:
        overlay[dst_y_start:dst_y_end, dst_x_start:dst_x_end] = \
            rotated_clothing[src_y_start:src_y_end, src_x_start:src_x_end]
    
    clothing_rgb = overlay[:, :, :3]
    clothing_alpha = overlay[:, :, 3] / 255.0
    
    if face_img.shape[2] == 3:
        face_img = cv2.cvtColor(face_img, cv2.COLOR_BGR2BGRA)
    
    result = face_img.copy()
    for c in range(0, 3):
        result[:, :, c] = (1 - clothing_alpha) * face_img[:, :, c] + clothing_alpha * clothing_rgb[:, :, c]
    
    return cv2.cvtColor(result, cv2.COLOR_BGRA2BGR)

def process_all_images(main_dir: str, output_base_dir: str, clothing_path: str):
    """
    处理主目录下所有子文件夹中的图片
    :param main_dir: 主目录路径
    :param output_base_dir: 输出基础目录
    :param clothing_path: 衣服模板图片路径
    """
    try:
        clothing_img = cv2.imread(clothing_path, cv2.IMREAD_UNCHANGED)
        if clothing_img is None:
            raise ValueError(f"无法读取衣服图片: {clothing_path}")
    except Exception as e:
        print(f"加载衣服图片错误: {str(e)}")
        return
    
    clothing_center_point = (525, 52)
    supported_extensions = ('.jpg', '.jpeg', '.png', '.bmp', '.tiff')
    
    # 获取主目录下所有包含'2'的子文件夹
    subdirs = [d for d in os.listdir(main_dir) 
              if os.path.isdir(os.path.join(main_dir, d)) and '2' in d]
    
    if not subdirs:
        print(f"主目录中没有找到符合条件的子文件夹: {main_dir}")
        return
    
    total_processed = 0
    total_images = 0
    
    for subdir in subdirs:
        input_dir = os.path.join(main_dir, subdir)
        output_dir = os.path.join(output_base_dir, subdir)
        os.makedirs(output_dir, exist_ok=True)
        
        # 获取子文件夹中所有支持的图片文件
        image_files = [f for f in os.listdir(input_dir) 
                      if f.lower().endswith(supported_extensions)]
        
        if not image_files:
            print(f"子文件夹中没有图片文件: {input_dir}")
            continue
        
        processed_count = 0
        print(f"\n正在处理文件夹: {subdir}")
        
        for filename in image_files:
            input_path = os.path.join(input_dir, filename)
            output_path = os.path.join(output_dir, filename)
            
            try:
                face_img = cv2.imread(input_path)
                if face_img is None:
                    print(f"  无法读取图片: {filename}")
                    continue
                
                face_landmarks = get_face_landmarks(face_img)
                if not face_landmarks:
                    print(f"  未检测到人脸: {filename}")
                    continue
                
                result = apply_clothing_overlay_with_rotation(
                    face_img, clothing_img, clothing_center_point, face_landmarks
                )
                
                if not cv2.imwrite(output_path, result):
                    print(f"  无法保存结果: {filename}")
                    continue
                
                processed_count += 1
                print(f"  已处理: {filename}")
                
            except Exception as e:
                print(f"  处理图片 {filename} 时出错: {str(e)}")
        
        total_processed += processed_count
        total_images += len(image_files)
        print(f"文件夹 {subdir} 处理完成: {processed_count}/{len(image_files)}")
    
    print(f"\n全部处理完成! 总计处理 {total_processed}/{total_images} 张图片")

if __name__ == "__main__":
    main_directory = "features_blur"  # 主文件夹路径
    output_directory = "clothing"  # 输出文件夹路径
    clothing_image_path = "clothing.png"  # 衣服模板图片路径
    
    print("开始批量处理所有子文件夹中的图片...")
    process_all_images(main_directory, output_directory, clothing_image_path)

开始批量处理所有子文件夹中的图片...

正在处理文件夹: 2019
  已处理: 01.jpg
文件夹 2019 处理完成: 1/1

全部处理完成! 总计处理 1/1 张图片
