In [None]:
import os
import shutil
import random
from pathlib import Path
from typing import Dict, List, Set, Tuple
from collections import defaultdict

def get_class_files(root_dir: str) -> Dict[str, Dict[str, Set[str]]]:
    """
    각 클래스별로 EO와 SAR에 공통으로 존재하는 파일들을 수집합니다.
    
    Returns:
        Dict[class_name, Dict['eo_files'|'sar_files', Set[filenames]]]
    """
    class_files = defaultdict(lambda: {'eo_files': set(), 'sar_files': set()})
    
    # EO 파일 수집
    eo_path = os.path.join(root_dir, 'EO_Train')
    for class_name in os.listdir(eo_path):
        class_path = os.path.join(eo_path, class_name)
        if os.path.isdir(class_path):
            class_files[class_name]['eo_files'] = set(os.listdir(class_path))
    
    # SAR 파일 수집
    sar_path = os.path.join(root_dir, 'SAR_Train')
    for class_name in os.listdir(sar_path):
        class_path = os.path.join(sar_path, class_name)
        if os.path.isdir(class_path):
            class_files[class_name]['sar_files'] = set(os.listdir(class_path))
    
    return class_files

def create_directory_structure(output_dir: str, class_names: List[str]):
    """지정된 디렉토리 구조를 생성합니다."""
    for split in ['train', 'val']:
        for domain in ['EO_Train', 'SAR_Train']:
            for class_name in class_names:
                os.makedirs(os.path.join(output_dir, split, domain, class_name), exist_ok=True)

def copy_files(
    root_dir: str,
    output_dir: str,
    class_name: str,
    files: List[str],
    split: str
):
    """파일들을 지정된 위치로 복사합니다."""
    for file_name in files:
        # EO 파일 복사
        shutil.copy2(
            os.path.join(root_dir, 'EO_Train', class_name, file_name),
            os.path.join(output_dir, split, 'EO_Train', class_name, file_name)
        )
        
        # SAR 파일 복사
        shutil.copy2(
            os.path.join(root_dir, 'SAR_Train', class_name, file_name),
            os.path.join(output_dir, split, 'SAR_Train', class_name, file_name)
        )

def create_balanced_split_dataset(
    root_dir: str,
    output_dir: str,
    samples_per_class: int = 350,  # semi truck with trailer 기준
    train_ratio: float = 0.8,
    seed: int = 42
):
    """
    Balanced dataset을 생성하고 train/val로 분할합니다.
    
    Args:
        root_dir: 원본 데이터셋 경로
        output_dir: 생성될 데이터셋 경로
        samples_per_class: 각 클래스당 선택할 총 샘플 수
        train_ratio: 학습 데이터 비율 (0과 1 사이)
        seed: 랜덤 시드
    """
    random.seed(seed)
    
    # 클래스별 파일 수집
    class_files = get_class_files(root_dir)
    
    # 디렉토리 구조 생성
    create_directory_structure(output_dir, list(class_files.keys()))
    
    print("\n=== 클래스별 샘플링 및 분할 통계 ===")
    print(f"{'클래스명':<25} {'원본 파일 수':>12} {'선택된 총 샘플 수':>15} {'학습':>8} {'검증':>8}")
    print("-" * 75)
    
    total_stats = {'original': 0, 'selected': 0, 'train': 0, 'val': 0}
    
    # 각 클래스별로 처리
    for class_name, files in class_files.items():
        # EO와 SAR에 모두 존재하는 파일만 선택
        common_files = files['eo_files'] & files['sar_files']
        
        # 요청된 수만큼 랜덤 샘플링
        n_samples = min(samples_per_class, len(common_files))
        selected_files = random.sample(list(common_files), n_samples)
        
        # train/val 분할
        n_train = int(n_samples * train_ratio)
        train_files = selected_files[:n_train]
        val_files = selected_files[n_train:]
        
        # 파일 복사
        copy_files(root_dir, output_dir, class_name, train_files, 'train')
        copy_files(root_dir, output_dir, class_name, val_files, 'val')
        
        # 통계 출력
        print(f"{class_name:<25} {len(common_files):>12} {n_samples:>15} {len(train_files):>8} {len(val_files):>8}")
        
        # 전체 통계 업데이트
        total_stats['original'] += len(common_files)
        total_stats['selected'] += n_samples
        total_stats['train'] += len(train_files)
        total_stats['val'] += len(val_files)
    
    # 전체 통계 출력
    print("-" * 75)
    print(f"{'전체':<25} {total_stats['original']:>12} {total_stats['selected']:>15} {total_stats['train']:>8} {total_stats['val']:>8}")

def main():
    root_dir = "/home/whisper2024/PBVS/Unicorn_Dataset"  # 원본 데이터셋 경로
    output_dir = "/home/whisper2024/PBVS/Unicorn_Split_Dataset"  # 생성될 데이터셋 경로
    
    create_balanced_split_dataset(
        root_dir=root_dir,
        output_dir=output_dir,
        samples_per_class=350,  # semi truck with trailer 기준
        train_ratio=0.8,  # 8:2 비율로 분할
        seed=42
    )

if __name__ == "__main__":
    main()

In [None]:
import os
import shutil
import random
import glob
from tqdm import tqdm

def extract_fixed_samples(base_dir, samples_per_class=30, seed=42):
    """
    각 클래스에서 정확히 samples_per_class 개수만큼의 샘플을 추출하여 검증 세트로 이동합니다.
    
    Args:
        base_dir (str): 데이터셋의 기본 디렉토리 경로 (train 폴더를 포함하는 경로)
        samples_per_class (int): 각 클래스에서 추출할 샘플 수
        seed (int): 랜덤 시드
    """
    # 경로 설정
    train_dir = os.path.join(base_dir, "train")
    val_dir = os.path.join(base_dir, "val")
    
    # SAR와 EO 폴더 경로
    train_sar_dir = os.path.join(train_dir, "SAR_Train")
    train_eo_dir = os.path.join(train_dir, "EO_Train")
    
    # 검증 폴더 생성
    val_sar_dir = os.path.join(val_dir, "SAR_Train")
    val_eo_dir = os.path.join(val_dir, "EO_Train")
    
    os.makedirs(val_dir, exist_ok=True)
    os.makedirs(val_sar_dir, exist_ok=True)
    os.makedirs(val_eo_dir, exist_ok=True)
    
    # 랜덤 시드 설정
    random.seed(seed)
    
    # 클래스 목록
    classes = [d for d in os.listdir(train_sar_dir) if os.path.isdir(os.path.join(train_sar_dir, d))]
    
    print(f"데이터셋 분리 시작. 각 클래스에서 {samples_per_class}개 샘플 추출")
    print(f"처리할 클래스: {classes}")
    
    # 각 클래스별 통계
    stats = {}
    
    for class_name in classes:
        print(f"\n클래스 '{class_name}' 처리 중...")
        
        # SAR 이미지 목록
        sar_images = glob.glob(os.path.join(train_sar_dir, class_name, "*"))
        
        # EO 이미지 경로
        train_eo_class_dir = os.path.join(train_eo_dir, class_name)
        
        # Paired 이미지 찾기 (SAR와 EO 모두 존재하는 이미지)
        paired_images = []
        
        for sar_path in sar_images:
            filename = os.path.basename(sar_path)
            eo_path = os.path.join(train_eo_class_dir, filename)
            
            if os.path.exists(eo_path):
                paired_images.append(filename)
        
        print(f"  - 총 이미지 수: {len(sar_images)}")
        print(f"  - Paired 이미지 수: {len(paired_images)}")
        
        # 검증 세트로 이동할 이미지 선택 (paired 이미지에서 samples_per_class개 선택)
        if len(paired_images) >= samples_per_class:
            val_images = random.sample(paired_images, samples_per_class)
            print(f"  - 검증 세트로 이동할 이미지 수: {len(val_images)}")
        else:
            val_images = paired_images
            print(f"  - 경고: 요청한 {samples_per_class}개보다 적은 {len(val_images)}개의 이미지만 가능함")
        
        # 검증 세트를 위한 폴더 생성
        val_sar_class_dir = os.path.join(val_sar_dir, class_name)
        val_eo_class_dir = os.path.join(val_eo_dir, class_name)
        
        os.makedirs(val_sar_class_dir, exist_ok=True)
        os.makedirs(val_eo_class_dir, exist_ok=True)
        
        # 검증 세트로 이미지 이동
        for filename in tqdm(val_images, desc=f"{class_name} 이동 중"):
            # SAR 이미지 이동
            src_sar_path = os.path.join(train_sar_dir, class_name, filename)
            dst_sar_path = os.path.join(val_sar_class_dir, filename)
            shutil.move(src_sar_path, dst_sar_path)
            
            # EO 이미지 이동
            src_eo_path = os.path.join(train_eo_dir, class_name, filename)
            dst_eo_path = os.path.join(val_eo_class_dir, filename)
            shutil.move(src_eo_path, dst_eo_path)
        
        # 통계 저장
        stats[class_name] = {
            "total": len(sar_images),
            "paired": len(paired_images),
            "train": len(paired_images) - len(val_images),
            "val": len(val_images)
        }
    
    # 최종 통계 출력
    print("\n======== 데이터셋 분리 완료 ========")
    print("\n클래스별 통계:")
    total_train, total_val = 0, 0
    
    for class_name, class_stats in stats.items():
        print(f"{class_name}:")
        print(f"  - 학습 세트: {class_stats['train']} 이미지")
        print(f"  - 검증 세트: {class_stats['val']} 이미지")
        total_train += class_stats['train']
        total_val += class_stats['val']
    
    print("\n전체 통계:")
    print(f"  - 총 학습 세트: {total_train} 이미지")
    print(f"  - 총 검증 세트: {total_val} 이미지")
    print(f"  - 총 이미지: {total_train + total_val} 이미지")
    print(f"  - 검증 세트 평균 크기: {total_val / len(classes):.1f} 이미지/클래스")

if __name__ == "__main__":
    # 기본 디렉토리 경로 (이 경로는 'train' 폴더를 포함하는 경로여야 함)
    base_dir = "/home/whisper2024/PBVS/resampled"
    
    # 각 클래스에서 추출할 샘플 수
    samples_per_class = 30
    
    extract_fixed_samples(base_dir, samples_per_class)

In [5]:
import os
import numpy as np
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import cv2
from tqdm import tqdm
import shutil

def extract_features_from_sar(image_path):
    """SAR 이미지에서 특성 추출"""
    # 이미지 로드
    img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    if img is None:
        return None
    
    # 이미지 리사이징 (옵션)
    img = cv2.resize(img, (128, 128))
    
    # 특성 추출 방법 1: 이미지 자체를 평탄화
    # features = img.flatten() / 255.0
    
    # 특성 추출 방법 2: 통계적 특성 + 질감 특성
    features = []
    # 통계적 특성
    features.extend([
        np.mean(img),
        np.std(img),
        np.median(img),
        np.percentile(img, 10),
        np.percentile(img, 90)
    ])
    
    # 질감 특성 (GLCM)
    try:
        from skimage.feature import graycomatrix, graycoprops
        glcm = graycomatrix(img, [1], [0, np.pi/4, np.pi/2, 3*np.pi/4], 256, symmetric=True, normed=True)
        features.extend([
            graycoprops(glcm, 'contrast').mean(),
            graycoprops(glcm, 'dissimilarity').mean(),
            graycoprops(glcm, 'homogeneity').mean(),
            graycoprops(glcm, 'energy').mean(),
            graycoprops(glcm, 'correlation').mean()
        ])
    except:
        # skimage가 없는 경우 간단한 대체 특성
        for i in range(5):
            features.append(0)
    
    return np.array(features)

def filter_sar_dataset(base_dir, output_dir, target_classes=['sedan', 'SUV', 'pickup_truck', 'van'],
                      eps_values=[0.3, 0.4, 0.4, 0.4], min_samples_values=[5, 5, 5, 5]):
    """
    SAR 데이터셋에서 유사 샘플 필터링
    
    Parameters:
    -----------
    base_dir : SAR 데이터셋 기본 디렉토리
    output_dir : 필터링된 데이터셋을 저장할 디렉토리
    target_classes : 필터링할 클래스 리스트
    eps_values : 각 클래스별 DBSCAN eps 값
    min_samples_values : 각 클래스별 DBSCAN min_samples 값
    """
    # 출력 디렉토리 생성
    os.makedirs(output_dir, exist_ok=True)
    
    # 각 클래스별로 처리
    for i, class_name in enumerate(target_classes):
        class_dir = os.path.join(base_dir, class_name)
        if not os.path.exists(class_dir):
            print(f"경로를 찾을 수 없음: {class_dir}")
            continue
        
        output_class_dir = os.path.join(output_dir, class_name)
        os.makedirs(output_class_dir, exist_ok=True)
        
        # 이미지 파일 목록 가져오기
        image_files = [f for f in os.listdir(class_dir) if f.endswith(('.jpg', '.png', '.tif'))]
        print(f"\n{class_name} 클래스 처리 중... 총 {len(image_files)}개 이미지")
        
        if len(image_files) == 0:
            continue
        
        # 특성 추출
        features = []
        valid_files = []
        
        for img_file in tqdm(image_files, desc=f"특성 추출 - {class_name}"):
            img_path = os.path.join(class_dir, img_file)
            feature = extract_features_from_sar(img_path)
            
            if feature is not None:
                features.append(feature)
                valid_files.append(img_file)
        
        if len(features) == 0:
            print(f"{class_name} 클래스에서 유효한 특성을 추출할 수 없음")
            continue
        
        # 특성 배열로 변환
        features = np.array(features)
        
        # 특성 정규화
        scaler = StandardScaler()
        features_scaled = scaler.fit_transform(features)
        
        # DBSCAN 파라미터 설정
        eps = eps_values[i] if i < len(eps_values) else eps_values[-1]
        min_samples = min_samples_values[i] if i < len(min_samples_values) else min_samples_values[-1]
        
        print(f"{class_name} DBSCAN 파라미터: eps={eps}, min_samples={min_samples}")
        
        # DBSCAN 적용
        dbscan = DBSCAN(eps=eps, min_samples=min_samples)
        cluster_labels = dbscan.fit_predict(features_scaled)
        
        # 클러스터 통계
        n_clusters = len(set(cluster_labels)) - (1 if -1 in cluster_labels else 0)
        n_noise = list(cluster_labels).count(-1)
        print(f"{class_name} 클래스: {n_clusters}개 클러스터, {n_noise}개 노이즈 포인트")
        
        # 선택된 샘플 인덱스
        selected_indices = []
        
        # 노이즈 포인트는 모두 선택
        noise_indices = np.where(cluster_labels == -1)[0]
        selected_indices.extend(noise_indices)
        
        # 각 클러스터에서 중심에 가장 가까운 샘플 선택
        for cluster_id in set(cluster_labels):
            if cluster_id == -1:
                continue
                
            # 현재 클러스터의 포인트들
            cluster_points_indices = np.where(cluster_labels == cluster_id)[0]
            cluster_points = features_scaled[cluster_points_indices]
            
            # 클러스터 중심 계산
            centroid = np.mean(cluster_points, axis=0)
            
            # 중심에서 가장 가까운 포인트 찾기
            distances = np.linalg.norm(cluster_points - centroid, axis=1)
            closest_point_idx = cluster_points_indices[np.argmin(distances)]
            
            selected_indices.append(closest_point_idx)
        
        # 선택된 이미지 복사
        for idx in selected_indices:
            src_path = os.path.join(class_dir, valid_files[idx])
            dst_path = os.path.join(output_class_dir, valid_files[idx])
            shutil.copy2(src_path, dst_path)
        
        print(f"{class_name} 필터링 결과: {len(image_files)}개 -> {len(selected_indices)}개 ({len(selected_indices)/len(image_files)*100:.2f}%)")


filter_sar_dataset(
    base_dir='/home/whisper2024/PBVS/Unicorn_Dataset/EO_Train',
    output_dir='/home/whisper2024/PBVS/Unicorn_Dataset/EO_Train_Filtered',
    target_classes=['box_truck',  'bus' , 'flatbed_truck' , 'motorcycle'  ,'pickup_truck' , 'pickup_truck_w_trailer'  ,'sedan'  ,'semi_w_trailer'  ,'SUV'  ,'van'],
    eps_values=[0.4, 0.4,0.4, 0.4,0.4, 0.4,0.4, 0.4,],
    min_samples_values=[5, 5,5, 5,5, 5,5, 5,5, 5]
)


box_truck 클래스 처리 중... 총 2896개 이미지


특성 추출 - box_truck:   0%|          | 0/2896 [00:00<?, ?it/s]

특성 추출 - box_truck: 100%|██████████| 2896/2896 [00:01<00:00, 1656.00it/s]


box_truck DBSCAN 파라미터: eps=0.4, min_samples=5
box_truck 클래스: 7개 클러스터, 21개 노이즈 포인트
box_truck 필터링 결과: 2896개 -> 28개 (0.97%)

bus 클래스 처리 중... 총 612개 이미지


특성 추출 - bus: 100%|██████████| 612/612 [00:00<00:00, 1581.16it/s]


bus DBSCAN 파라미터: eps=0.4, min_samples=5
bus 클래스: 3개 클러스터, 6개 노이즈 포인트
bus 필터링 결과: 612개 -> 9개 (1.47%)

flatbed_truck 클래스 처리 중... 총 898개 이미지


특성 추출 - flatbed_truck: 100%|██████████| 898/898 [00:00<00:00, 1638.72it/s]


flatbed_truck DBSCAN 파라미터: eps=0.4, min_samples=5
flatbed_truck 클래스: 3개 클러스터, 15개 노이즈 포인트
flatbed_truck 필터링 결과: 898개 -> 18개 (2.00%)

motorcycle 클래스 처리 중... 총 1441개 이미지


특성 추출 - motorcycle: 100%|██████████| 1441/1441 [00:00<00:00, 1597.73it/s]


motorcycle DBSCAN 파라미터: eps=0.4, min_samples=5
motorcycle 클래스: 11개 클러스터, 39개 노이즈 포인트
motorcycle 필터링 결과: 1441개 -> 50개 (3.47%)

pickup_truck 클래스 처리 중... 총 24158개 이미지


특성 추출 - pickup_truck: 100%|██████████| 24158/24158 [00:14<00:00, 1631.66it/s]


pickup_truck DBSCAN 파라미터: eps=0.4, min_samples=5
pickup_truck 클래스: 14개 클러스터, 144개 노이즈 포인트
pickup_truck 필터링 결과: 24158개 -> 158개 (0.65%)

pickup_truck_w_trailer 클래스 처리 중... 총 695개 이미지


특성 추출 - pickup_truck_w_trailer: 100%|██████████| 695/695 [00:00<00:00, 1604.54it/s]


pickup_truck_w_trailer DBSCAN 파라미터: eps=0.4, min_samples=5
pickup_truck_w_trailer 클래스: 4개 클러스터, 10개 노이즈 포인트
pickup_truck_w_trailer 필터링 결과: 695개 -> 14개 (2.01%)

sedan 클래스 처리 중... 총 364291개 이미지


특성 추출 - sedan:   7%|▋         | 26412/364291 [00:16<03:28, 1617.84it/s]


KeyboardInterrupt: 