In [None]:
# 1. 라이브러리 임포트
import os
import json
import cv2
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
from tqdm import tqdm
import matplotlib.patches as patches

# 2. 데이터셋 로딩 함수 정의
DATASET_PATH = 'Google_Recaptcha_V2_Images_Dataset/images/'
ANNOTATION_FILE = 'annotations.json'

def load_dataset(dataset_path):
    class_names = sorted(os.listdir(dataset_path))
    image_paths = []
    
    for class_name in class_names:
        class_dir = os.path.join(dataset_path, class_name)
        if not os.path.isdir(class_dir):
            continue
        for img_name in os.listdir(class_dir):
            img_path = os.path.join(class_dir, img_name)
            if os.path.isfile(img_path) and img_name.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
                image_paths.append(img_path)
    
    return class_names, image_paths

# 3. 라벨링 함수 정의
def label_images(image_paths, class_names):
    annotations = {}
    
    for img_path in tqdm(image_paths, desc="Labeling Images"):
        img = cv2.imread(img_path)
        if img is None:
            print(f"이미지를 불러올 수 없습니다: {img_path}")
            continue
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
        clones = img_rgb.copy()
        bboxes = []
        classes = []
        
        while True:
            roi = cv2.selectROI("Image", clones, fromCenter=False, showCrosshair=True)
            if roi == (0,0,0,0):
                break
            x, y, w, h = roi
            cv2.rectangle(clones, (x, y), (x+w, y+h), (255, 0, 0), 2)
            cv2.imshow("Image", clones)
            
            while True:
                try:
                    class_idx = int(input(f"클래스 선택 (0-{len(class_names)-1}) for selected region: "))
                    if 0 <= class_idx < len(class_names):
                        break
                    else:
                        print("유효하지 않은 클래스 번호입니다.")
                except ValueError:
                    print("숫자를 입력해주세요.")
            
            bboxes.append([x, y, w, h])
            classes.append(class_names[class_idx])
        
        annotations[img_path] = {
            "bboxes": bboxes,
            "classes": classes
        }
        cv2.destroyAllWindows()
    
    with open(ANNOTATION_FILE, 'w') as f:
        json.dump(annotations, f, indent=4)
    print(f"라벨 정보가 {ANNOTATION_FILE}에 저장되었습니다.")

# 4. 데이터셋 로드 및 라벨링 실행
class_names, image_paths = load_dataset(DATASET_PATH)
print(f"클래스 목록: {class_names}")
print(f"총 이미지 수: {len(image_paths)}")

label_images(image_paths, class_names)  # 이 셀은 주석을 제거하고 실행해야 합니다.

# 5. 라벨 데이터 로드
def load_annotations(annotation_file):
    with open(annotation_file, 'r') as f:
        annotations = json.load(f)
    return annotations

annotations = load_annotations(ANNOTATION_FILE)
print(f"라벨링된 이미지 수: {len(annotations)}")

# 6. 데이터 전처리 함수 정의
def preprocess_data(image_paths, annotations, class_names, num_boxes=10, num_classes=12, input_shape=(224, 224)):
    images = []
    y_bboxes = []
    y_classes = []
    y_confidences = []
    
    for img_path in image_paths:
        img = cv2.imread(img_path)
        if img is None:
            print(f"이미지를 불러올 수 없습니다: {img_path}")
            continue
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img_resized = cv2.resize(img_rgb, input_shape)
        img_normalized = img_resized / 255.0
        images.append(img_normalized)
        
        ann = annotations.get(img_path, {})
        bboxes = ann.get('bboxes', [])
        classes = ann.get('classes', [])
        
        bboxes_padded = bboxes[:num_boxes]
        classes_padded = classes[:num_boxes]
        
        bboxes_normalized = []
        for bbox in bboxes_padded:
            x_min, y_min, w, h = bbox
            x_max = x_min + w
            y_max = y_min + h
            x_min_norm = x_min / input_shape[0]
            y_min_norm = y_min / input_shape[1]
            x_max_norm = x_max / input_shape[0]
            y_max_norm = y_max / input_shape[1]
            bboxes_normalized.extend([x_min_norm, y_min_norm, x_max_norm, y_max_norm])
        
        while len(bboxes_normalized) < num_boxes * 4:
            bboxes_normalized.extend([0.0, 0.0, 0.0, 0.0])
        
        y_bboxes.append(bboxes_normalized)
        
        classes_one_hot = []
        for cls in classes_padded:
            one_hot = [0] * num_classes
            one_hot[class_names.index(cls)] = 1
            classes_one_hot.extend(one_hot)
        
        while len(classes_one_hot) < num_boxes * num_classes:
            classes_one_hot.extend([0] * num_classes)
        
        y_classes.append(classes_one_hot)
        
        confidences = [1.0] * len(bboxes_padded)
        while len(confidences) < num_boxes:
            confidences.append(0.0)
        y_confidences.append(confidences)
    
    X = np.array(images, dtype=np.float32)
    y_bboxes = np.array(y_bboxes, dtype=np.float32)
    y_classes = np.array(y_classes, dtype=np.float32)
    y_confidences = np.array(y_confidences, dtype=np.float32)
    
    y_true = np.concatenate([y_bboxes, y_classes, y_confidences], axis=1)
    
    return X, y_true

# 7. 데이터 전처리 실행
train_image_paths = list(annotations.keys())
train_annotations = annotations

X_train, y_train = preprocess_data(
    train_image_paths,
    train_annotations,
    class_names,
    num_boxes=num_boxes,
    num_classes=num_classes,
    input_shape=(224, 224)
)

print(f"훈련 데이터 크기: {X_train.shape}")
print(f"훈련 라벨 크기: {y_train.shape}")

# 8. Residual Block 및 Attention Layer 정의
class ResidualBlock(layers.Layer):
    def __init__(self, filters, stride=1, **kwargs):
        super(ResidualBlock, self).__init__(**kwargs)
        self.conv1 = layers.Conv2D(filters, kernel_size=3, strides=stride, padding='same', use_bias=False)
        self.bn1 = layers.BatchNormalization()
        self.relu = layers.ReLU()
        self.conv2 = layers.Conv2D(filters, kernel_size=3, strides=1, padding='same', use_bias=False)
        self.bn2 = layers.BatchNormalization()
        self.downsample = None
        if stride != 1:
            self.downsample = models.Sequential([
                layers.Conv2D(filters, kernel_size=1, strides=stride, padding='same', use_bias=False),
                layers.BatchNormalization()
            ])
    
    def call(self, inputs, training=False):
        identity = inputs
        out = self.conv1(inputs)
        out = self.bn1(out, training=training)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out, training=training)
        
        if self.downsample:
            identity = self.downsample(inputs, training=training)
        
        out = layers.add([out, identity])
        out = self.relu(out)
        return out

class AttentionLayerWrapper(layers.Layer):
    def __init__(self, filters, **kwargs):
        super(AttentionLayerWrapper, self).__init__(**kwargs)
        self.query = layers.Conv2D(filters, kernel_size=1)
        self.key = layers.Conv2D(filters, kernel_size=1)
        self.value = layers.Conv2D(filters, kernel_size=1)
        self.softmax = layers.Softmax(axis=-1)
    
    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        height = tf.shape(inputs)[1]
        width = tf.shape(inputs)[2]
        
        query = self.query(inputs)  # (batch, height, width, filters)
        key = self.key(inputs)      # (batch, height, width, filters)
        value = self.value(inputs)  # (batch, height, width, filters)
        
        # Reshape for matrix multiplication
        query = tf.reshape(query, [batch_size, height * width, -1])  # (batch, height*width, filters)
        key = tf.reshape(key, [batch_size, height * width, -1])      # (batch, height*width, filters)
        value = tf.reshape(value, [batch_size, height * width, -1])  # (batch, height*width, filters)
        
        # Compute attention scores
        attention_scores = tf.matmul(query, key, transpose_b=True)    # (batch, height*width, height*width)
        attention_weights = self.softmax(attention_scores)           # (batch, height*width, height*width)
        
        # Compute attention output
        out = tf.matmul(attention_weights, value)                   # (batch, height*width, filters)
        out = tf.reshape(out, [batch_size, height, width, -1])     # (batch, height, width, filters)
        
        return out


In [None]:
# 9. 모델 구축 함수 정의
def build_object_detection_model(num_classes, num_boxes=10, input_shape=(224, 224, 3)):
    input_layer = layers.Input(shape=input_shape)
    
    # Backbone: 간단한 CNN 블록 (Residual Connections 포함)
    x = layers.Conv2D(64, kernel_size=3, strides=1, padding='same', activation='relu')(input_layer)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)  # (112, 112, 64)
    
    # Residual Block
    x = ResidualBlock(128, stride=1)(x)  # (112, 112, 128)
    
    # Attention Layer
    x = AttentionLayerWrapper(128)(x)  # (112, 112, 128)
    
    # Additional Conv2D
    x = layers.Conv2D(128, kernel_size=3, padding='same', activation='relu')(x)
    x = layers.BatchNormalization()(x)
    
    # Feature Maps 추출
    x = layers.Conv2D(256, kernel_size=3, padding='same', activation='relu')(x)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)  # (56, 56, 256)
    
    # Flatten 및 Dense Layers
    x = layers.Flatten()(x)
    x = layers.Dense(1024, activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.5)(x)
    
    # Detection Heads
    bbox_output = layers.Dense(num_boxes * 4, activation='sigmoid', name='bbox')(x)
    class_output = layers.Dense(num_boxes * num_classes, activation='softmax', name='class')(x)
    confidence_output = layers.Dense(num_boxes, activation='sigmoid', name='confidence')(x)
    
    model = models.Model(inputs=input_layer, outputs=[bbox_output, class_output, confidence_output])
    return model


In [None]:
# 10. 커스텀 손실 함수 정의
def custom_object_detection_loss(y_true, y_pred, num_boxes, num_classes):
    true_bbox = y_true[:, :num_boxes * 4]
    true_class = y_true[:, num_boxes * 4:num_boxes * 4 + num_boxes * num_classes]
    true_confidence = y_true[:, num_boxes * 4 + num_boxes * num_classes:]
    
    pred_bbox = y_pred[0]
    pred_class = y_pred[1]
    pred_confidence = y_pred[2]
    
    bbox_loss = tf.reduce_mean(tf.square(true_bbox - pred_bbox))
    
    class_loss = tf.reduce_mean(
        tf.keras.losses.categorical_crossentropy(true_class, pred_class)
    )
    
    confidence_loss = tf.reduce_mean(
        tf.keras.losses.binary_crossentropy(true_confidence, pred_confidence)
    )
    
    total_loss = bbox_loss + class_loss + confidence_loss
    return total_loss


In [None]:
# 11. 모델 생성 및 컴파일
model = build_object_detection_model(num_classes=num_classes, num_boxes=num_boxes, input_shape=(224, 224, 3))
model.summary()

def loss_function(y_true, y_pred):
    return custom_object_detection_loss(y_true, y_pred, num_boxes, num_classes)

model.compile(optimizer='adam',
              loss=loss_function,
              metrics=['accuracy'])


In [None]:
# 12. 모델 훈련
model.fit(
    X_train, 
    [
        y_train[:, :num_boxes * 4], 
        y_train[:, num_boxes * 4:num_boxes * 4 + num_boxes * num_classes], 
        y_train[:, num_boxes * 4 + num_boxes * num_classes:]
    ],
    epochs=20,
    batch_size=16,
    validation_split=0.1
)


In [None]:
# NMS 적용된 예측 및 시각화 실행
sample_image_path = train_image_paths[0]  # 샘플 이미지 경로
predict_and_visualize_with_nms(
    model, 
    sample_image_path, 
    class_names, 
    num_boxes=num_boxes, 
    num_classes=num_classes, 
    input_shape=(224, 224), 
    max_output_size=10, 
    iou_threshold=0.5
)


In [None]:
# 모델 저장
model.save('object_detection_model.keras')
print("모델이 'object_detection_model.h5'에 저장되었습니다.")


In [None]:
# 모델 로드
model = tf.keras.models.load_model('object_detection_model.keras', custom_objects={
    'ResidualBlock': ResidualBlock,
    'AttentionLayerWrapper': AttentionLayerWrapper
})
print("모델이 성공적으로 로드되었습니다.")


In [6]:
import numpy as np
import cv2

class BoundingBoxEnv:
    def __init__(self, image_paths, input_shape=(64, 64)):
        self.image_paths = image_paths
        self.input_shape = input_shape
        self.current_image_idx = 0
        self.current_image = None
        self.done = False

        self.reset()

    def reset(self):
        """환경을 초기화하고 새 이미지를 로드합니다."""
        if self.current_image_idx >= len(self.image_paths):
            self.done = True
            return None
        
        # 새로운 이미지를 불러옵니다.
        img_path = self.image_paths[self.current_image_idx]
        self.current_image = cv2.imread(img_path)
        
        self.current_image_idx += 1
        return self.current_image

    def step(self, action):
        """
        에이전트가 바운딩 박스를 설정하면 보상을 계산하여 반환합니다.
        action은 [x_min, y_min, x_max, y_max]로 표현됩니다.
        """
        predicted_bbox = action
        true_bbox = [20, 20, 40, 40]  # 간단하게 실제 바운딩 박스를 임의로 설정
        iou = self._iou(predicted_bbox, true_bbox)
        reward = iou  # IoU를 보상으로 사용
        done = True  # 한 번 예측하면 종료
        return self.current_image, reward, done

    def _iou(self, bbox1, bbox2):
        """두 바운딩 박스 사이의 IoU를 계산합니다."""
        x1_min, y1_min, x1_max, y1_max = bbox1
        x2_min, y2_min, x2_max, y2_max = bbox2

        inter_x_min = max(x1_min, x2_min)
        inter_y_min = max(y1_min, y2_min)
        inter_x_max = min(x1_max, x2_max)
        inter_y_max = min(y1_max, y2_max)

        inter_area = max(0, inter_x_max - inter_x_min) * max(0, inter_y_max - inter_y_min)
        bbox1_area = (x1_max - x1_min) * (y1_max - y1_min)
        bbox2_area = (x2_max - x2_min) * (y2_max - y2_min)
        union_area = bbox1_area + bbox2_area - inter_area
        return inter_area / union_area if union_area != 0 else 0


In [11]:
import random
from collections import deque
import tensorflow as tf
from tensorflow.keras import layers, models

class DDQNAgent:
    def __init__(self, input_shape, num_actions, lr=0.001, gamma=0.99, epsilon=1.0, epsilon_min=0.1, epsilon_decay=0.995):
        self.input_shape = input_shape
        self.num_actions = num_actions  # 바운딩 박스 설정을 위한 행동 수 (x_min, y_min, x_max, y_max)
        self.lr = lr
        self.gamma = gamma
        self.epsilon = epsilon  # 탐색과 활용 사이의 균형
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay

        self.memory = deque(maxlen=2000)
        self.batch_size = 32
        self.update_target_network_steps = 1000
        self.steps = 0
        
        self.q_network = self.build_network()
        self.target_network = self.build_network()
        self.update_target_network()

    def build_network(self):
        """간단한 Conv2D 기반의 Q-Network 생성"""
        model = models.Sequential()
        model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=self.input_shape))
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Conv2D(64, (3, 3), activation='relu'))
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Flatten())
        model.add(layers.Dense(128, activation='relu'))
        model.add(layers.Dense(self.num_actions, activation='linear'))
        model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=self.lr), loss='mse')
        return model

    def update_target_network(self):
        """Q-Network의 가중치를 Target Network로 복사"""
        self.target_network.set_weights(self.q_network.get_weights())

    def remember(self, state, action, reward, next_state, done):
        """경험을 메모리에 저장"""
        self.memory.append((state, action, reward, next_state, done))

    def choose_action(self, state):
        """epsilon-greedy 정책으로 행동 선택"""
        if np.random.rand() <= self.epsilon:
            return np.random.randint(0, self.num_actions)
        q_values = self.q_network.predict(np.expand_dims(state, axis=0))
        return np.argmax(q_values[0])

    def replay(self):
        """Q-Network 학습"""
        if len(self.memory) < self.batch_size:
            return
        minibatch = random.sample(self.memory, self.batch_size)

        states = np.array([transition[0] for transition in minibatch])
        actions = np.array([transition[1] for transition in minibatch])
        rewards = np.array([transition[2] for transition in minibatch])
        next_states = np.array([transition[3] for transition in minibatch])
        dones = np.array([transition[4] for transition in minibatch])

        q_values = self.q_network.predict(states)
        target_q_values = self.target_network.predict(next_states)

        for i in range(self.batch_size):
            if dones[i]:
                q_values[i][actions[i]] = rewards[i]
            else:
                q_values[i][actions[i]] = rewards[i] + self.gamma * np.amax(target_q_values[i])

        self.q_network.train_on_batch(states, q_values)

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

        if self.steps % self.update_target_network_steps == 0:
            self.update_target_network()

    def train(self, env, episodes=1000):
        """환경에서 에이전트를 학습"""
        for episode in range(episodes):
            state = env.reset()
            total_reward = 0
            done = False
            self.steps = 0

            while not done:
                self.steps += 1
                action = self.choose_action(state)
                next_state, reward, done = env.step(action)
                self.remember(state, action, reward, next_state, done)
                state = next_state
                total_reward += reward

                self.replay()

            print(f"Episode: {episode+1}/{episodes}, Total Reward: {total_reward}")


In [12]:
# 예시 이미지 경로
image_paths = ['Google_Recaptcha_V2_Images_Dataset/images/Bicycle/Bicycle (1).png','Google_Recaptcha_V2_Images_Dataset/images/Bicycle/Bicycle (3).png']  # 실제 이미지 경로로 변경


In [14]:
env = BoundingBoxEnv(image_paths=image_paths, input_shape=(120, 120, 3))
agent = DDQNAgent(input_shape=(120, 120, 3), num_actions=4)  # x_min, y_min, x_max, y_max

# 에이전트 학습 실행
agent.train(env, episodes=100)

TypeError: cannot unpack non-iterable int object