In [2]:
import os
import shutil
import keras
import keras_hub
import numpy as np
import tensorflow as tf
from keras import ops
from sklearn.cluster import KMeans
from tqdm import tqdm
from tensorflow.keras.applications.efficientnet import EfficientNetB4, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from PIL import Image

os.environ["KERAS_BACKEND"] = "jax"
# 이미지 경로 설정

IMAGE_DIR = "/Users/aohus/Workspaces/github/image-cluster/2차"
# 이미지 목록 로딩
image_paths = [
    os.path.join(IMAGE_DIR, fname)
    for fname in os.listdir(IMAGE_DIR)
    if fname.lower().endswith(("png", "jpg", "jpeg"))
]


  from .autonotebook import tqdm as notebook_tqdm


In [26]:
from tensorflow.keras.applications.efficientnet import EfficientNetB4, preprocess_input
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
import keras_hub

# 분류 모델 로딩
model = keras_hub.models.DeepLabV3ImageSegmenter.from_preset(
    "deeplab_v3_plus_resnet50_pascalvoc"
)
image_converter = keras_hub.layers.DeepLabV3ImageConverter(
    image_size=(512, 512),
    interpolation="bilinear",
)
preprocessor = keras_hub.models.DeepLabV3ImageSegmenterPreprocessor(image_converter)


# CNN 모델 로딩
effnet_model = EfficientNetB4(weights="imagenet", include_top=False, pooling="avg")
resnet_model = ResNet50(weights="imagenet", include_top=False, pooling="avg")
effnetv2_model = keras_hub.models.Backbone.from_preset(
    "efficientnet_b5_sw_ft_imagenet",
)
resnetv2_model = keras_hub.models.Backbone.from_preset(
    "resnet_vd_200_imagenet",
)

  saveable.load_own_variables(weights_store.get(inner_path))


Downloading from https://www.kaggle.com/api/v1/models/keras/resnet_vd/keras/resnet_vd_200_imagenet/2/download/config.json...


100%|██████████| 910/910 [00:00<00:00, 2.09MB/s]


Downloading from https://www.kaggle.com/api/v1/models/keras/resnet_vd/keras/resnet_vd_200_imagenet/2/download/model.weights.h5...


100%|██████████| 280M/280M [00:25<00:00, 11.5MB/s] 


In [15]:
def masking_human(image_path):
    image = keras.utils.load_img(image_path)
    image = np.array(image)


    image = preprocessor(image)
    image = keras.ops.expand_dims(np.array(image), axis=0)
    seg = ops.argmax(model.predict(image, verbose=0), axis=-1)
    human_mask = ops.expand_dims(seg, axis=-1)[0] == 15  # numpy로 마스크 생성

    human_mask_expanded = tf.expand_dims(human_mask, axis=0)
    human_mask_expanded = tf.broadcast_to(human_mask, tf.shape(image))

    # 마스크 위치를 0으로 만든 새 텐서 생성
    masked_image = tf.where(human_mask_expanded, tf.zeros_like(image), image)
    return masked_image

# 수정된 함수 (마스킹된 이미지를 특징 벡터로 변환)
def batch_extract_background_features(img_paths, cnn_model, batch_size=16):
    features = []

    for i in tqdm(range(0, len(img_paths), batch_size)):
        batch_paths = img_paths[i : i + batch_size]

        batch_imgs = []
        for p in batch_paths:
            preds = masking_human(p)  # 사람 마스킹 이미지 반환 (Tensor 또는 np.array)
            preds = np.squeeze(preds)
            # preds가 TensorFlow tensor면 numpy로 변환
            if hasattr(preds, 'numpy'):
                preds = preds.numpy()

            # PIL 이미지 변환 및 크기 조정 (224x224)
            img_pil = Image.fromarray(preds.astype('uint8')).resize((224, 224))
            img_array = img_to_array(img_pil)

            batch_imgs.append(img_array)

        # CNN 입력 형태로 변환 (전처리)
        batch_imgs_array = preprocess_input(np.array(batch_imgs))

        # EfficientNet으로 특징 벡터 추출
        batch_features = cnn_model.predict(batch_imgs_array, verbose=0)
        if len(batch_features.shape) != 2:
            batch_features = batch_features.reshape(batch_features.shape[0], -1)  # (batch_size, feature_dim)
        features.extend(batch_features)

    return np.array(features)


from tensorflow.keras.preprocessing import image
# 배치로 특징 추출 (속도 최적화)
def batch_extract_features(img_paths, cnn_model, batch_size=32):
    features = []
    num_images = len(img_paths)

    for i in tqdm(range(0, num_images, batch_size)):
        batch_paths = img_paths[i:i+batch_size]
        
        # 배치 단위로 이미지 로드 및 전처리
        imgs = [image.img_to_array(image.load_img(p, target_size=(224,224))) for p in batch_paths]
        imgs_array = np.array(imgs)
        imgs_array = preprocess_input(imgs_array)

        # 배치 예측 (한 번에 처리)
        batch_features = cnn_model.predict(imgs_array, verbose=0)
        if len(batch_features.shape) != 2:
            batch_features = batch_features.reshape(batch_features.shape[0], -1)  # (batch_size, feature_dim)
        features.extend(batch_features)
    return np.array(features)


import cv2
# 녹색 및 갈색 마스크 생성 함수
def extract_green_brown(img_array):
    # HSV로 변환
    hsv_img = cv2.cvtColor(img_array, cv2.COLOR_RGB2HSV)

    # 녹색 범위 설정 (HSV)
    green_lower = np.array([35, 40, 40])
    green_upper = np.array([85, 255, 255])
    green_mask = cv2.inRange(hsv_img, green_lower, green_upper)

    # 갈색 범위 설정 (HSV)
    brown_lower = np.array([10, 50, 50])
    brown_upper = np.array([30, 255, 200])
    brown_mask = cv2.inRange(hsv_img, brown_lower, brown_upper)

    # 녹색과 갈색 마스크 결합
    combined_mask = cv2.bitwise_or(green_mask, brown_mask)

    # 원본 이미지에 마스크 적용
    masked_img = cv2.bitwise_and(img_array, img_array, mask=combined_mask)

    return masked_img

# 특징 추출 함수
def batch_extract_colored_features(img_paths, cnn_model, batch_size=32):

    features = []
    num_images = len(img_paths)

    for i in tqdm(range(0, num_images, batch_size)):
        batch_paths = img_paths[i:i+batch_size]
        
        # 배치 단위로 이미지 로드 및 전처리
        imgs = [ Image.open(p).convert('RGB').resize((224,224)) for p in batch_paths ]
        gb_imgs = [ extract_green_brown(np.array(img)) for img in imgs ]
        imgs_array = np.array(gb_imgs)
        imgs_array = preprocess_input(imgs_array)

        # 배치 예측 (한 번에 처리)
        batch_features = cnn_model.predict(imgs_array, verbose=0)
        if len(batch_features.shape) != 2:
            batch_features = batch_features.reshape(batch_features.shape[0], -1)  # (batch_size, feature_dim)
        features.extend(batch_features)

    
    return np.array(features)


In [34]:
# 특징 추출 실행
print("특징 추출 중...")

# batch_extract_background_features
# batch_extract_features
# batch_extract_colored_features

cnn_model = resnetv2_model
extract_features = batch_extract_features
features = extract_features(image_paths, cnn_model)


print("특징 추출 완료:", features.shape)


특징 추출 중...


100%|██████████| 8/8 [00:29<00:00,  3.71s/it]

특징 추출 완료: (244, 100352)





In [36]:
OUTPUT_DIR = IMAGE_DIR + f"_{cnn_model.name}_{extract_features.__name__}_kmeans"

# 클러스터링 (KMeans, GPS 정보 포함)
num_clusters = 60
kmeans = KMeans(n_clusters=num_clusters, random_state=42, n_init=10)
labels = kmeans.fit_predict(features)

# 클러스터링 결과 저장
print("클러스터별 이미지 저장 중...")
os.makedirs(OUTPUT_DIR, exist_ok=True)

for idx, label in enumerate(labels):
    cluster_folder = os.path.join(OUTPUT_DIR, f"cluster_{label}")
    os.makedirs(cluster_folder, exist_ok=True)
    shutil.copy(image_paths[idx], cluster_folder)

print(f"분류가 완료되었습니다. 결과는 '{OUTPUT_DIR}'에 저장되었습니다.")

클러스터별 이미지 저장 중...
분류가 완료되었습니다. 결과는 '/Users/aohus/Workspaces/github/image-cluster/2차_res_net_backbone_9_batch_extract_features_kmeans'에 저장되었습니다.


# 코사인 유사도

In [31]:
from sklearn.metrics.pairwise import cosine_similarity
# 유사도 기반 그룹핑 함수
def group_similar_images(features, image_paths, threshold):
    similarity_matrix = cosine_similarity(features)
    used = np.zeros(len(features), dtype=bool)
    groups = []
    
    for idx in range(len(features)):
        if not used[idx]:
            # 현재 이미지와 유사도 높은 이미지 인덱스 찾기
            similar_idxs = np.where(similarity_matrix[idx] >= threshold)[0]
            groups.append(similar_idxs)
            used[similar_idxs] = True

    return groups


In [33]:
OUTPUT_DIR = IMAGE_DIR + f"_{cnn_model.name}_{extract_features.__name__}_cosine_similarity"

# 유사도 기반 그룹 생성
groups = group_similar_images(features, image_paths, threshold=0.6)

# 그룹별 폴더 저장
print("비슷한 이미지 그룹 저장 중...")
for group_id, group_idxs in enumerate(groups):
    group_folder = os.path.join(OUTPUT_DIR, f"group_{group_id}")
    os.makedirs(group_folder, exist_ok=True)
    for idx in group_idxs:
        shutil.copy(image_paths[idx], group_folder)

print(f"이미지 분류 완료, 결과 폴더: '{OUTPUT_DIR}'")

비슷한 이미지 그룹 저장 중...
이미지 분류 완료, 결과 폴더: '/Users/aohus/Workspaces/github/image-cluster/2차_efficient_net_backbone_batch_extract_features_cosine_similarity'
