In [46]:
import os
import cv2
import numpy as np
from tensorflow.keras.applications.resnet50 import preprocess_input
import tensorflow as tf

def preprocess_images_in_folder(folder_path, target_size=(640,640)):
    # 폴더 내 모든 파일을 순회한다.
    for filename in os.listdir(folder_path):
        # 이미지 파일 경로를 생성한다.
        image_path = os.path.join(folder_path, filename)
        
        # 이미지를 읽는다.
        img = cv2.imread(image_path)
        
        if img is None:
            print(f"Image not found at {image_path}")
            continue

        # OpenCV로 이미지를 BGR에서 RGB로 변환한다. (OpenCV는 기본적으로 BGR로 이미지를 읽기때문에 다른 도구와 호환되도록 RGB로 변환해야 한다.)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        # 이미지 크기를 재조정한다.
        img = cv2.resize(img, target_size, interpolation = cv2.INTER_LINEAR)

        # 이미지를 정규화하거나 추가 전처리를 수행한다.
        img = preprocess_input(img)

        # 네트워크 입력에 맞게 차원을 추가한다.
        img = np.expand_dims(img, axis=0)
        
        # 전처리가 완료된 이미지를 반환한다.
        yield img, image_path

In [2]:
folder_path = "E:\\Study\\TeamProject_2\\Dataset_CCTV\\Image_Data\\야간\\우천\\연출"

data = [img for img, _ in preprocess_images_in_folder(folder_path)]
data = np.array(data)

In [3]:
# 데이터셋의 모든 이미지를 저장할 빈 리스트를 생성합니다.
resized_images = []

# 원본 이미지 데이터셋을 반복하여 각 이미지를 리사이징합니다.
for img in data:
    img = img[0]  # 이미지를 추출합니다. 데이터셋이 5차원이므로 4차원으로 줄입니다.
    resized_img = cv2.resize(img, (640, 640))  # 이미지를 640x640 크기로 리사이징합니다.
    resized_images.append(resized_img)  # 리사이징된 이미지를 리스트에 추가합니다.

# 리스트를 넘파이 배열로 변환합니다.
resized_images = np.array(resized_images)

# 결과를 확인합니다.
print("원본 이미지 데이터 형태:", data.shape)
print("리사이징된 이미지 데이터 형태:", resized_images.shape)

원본 이미지 데이터 형태: (12, 1, 640, 640, 3)
리사이징된 이미지 데이터 형태: (12, 640, 640, 3)


In [35]:
import json

def load_annotations_from_folder(folder_path):
    annotations = []
    for json_file in os.listdir(folder_path):
        if json_file.endswith(".json"):
            with open(os.path.join(folder_path, json_file)) as file:
                annotation = json.load(file)
                for pm_annotation in annotation["annotations"]["PM"]:
                    bbox = pm_annotation["points"]
                    class_label = int(pm_annotation["PM_code"])  # Assuming class labels are integers
                    annotations.append((bbox, class_label))
    return annotations

folder_path = "E:\Study\TeamProject_2\Dataset_CCTV\Labeling_Data\CCTV\야간\우천\연출"
annotations_list = load_annotations_from_folder(folder_path)

In [40]:
annotations_array = []

for annotation in annotations_list:
    # 'points' list and 'class' into a single list
    combined = annotation[0] + [annotation[1]]
    annotations_array.append(combined)

# Convert the list to a numpy array
annotations_array = np.array(annotations_array)

In [49]:
def preprocess_data(image, labels):
    # 이미지 전처리
    image = tf.image.resize(image, (640, 640))
    image = image / 255.0  # 이미지 정규화

    # 라벨을 네트워크 출력 형태에 맞게 변환
    # 바운딩 박스 좌표
    bbox = labels[0]
    # 클래스 ID를 one-hot 벡터로 변환
    class_id = tf.one_hot(labels[1], depth=21)

    return image, tf.concat([bbox, class_id], axis=-1)

In [None]:
"""
def preprocess_data(image, labels):
    # 이미지 전처리 (이미지 크기 조정, 정규화 등)
    image = tf.image.resize(image, (640, 640))
    image = image / 255.0  # 이미지 정규화

    # 라벨을 네트워크 출력 형태에 맞게 변환
    # 예: 그리드 셀로 이미지를 분할하고, 각 셀의 클래스와 바운딩 박스를 계산
    # 이 부분은 네트워크 구조와 문제에 따라 다르게 구현해야 합니다.
    # TODO: labels를 적절한 형태로 변환하는 코드를 작성해야 합니다.

    return image, labels
"""

In [55]:
# tf.data.Dataset을 생성
folder_path_images = "E:\\Study\\TeamProject_2\\Dataset_CCTV\\Image_Data\\야간\\우천\\연출"
images = [img for img, _ in preprocess_images_in_folder(folder_path_images)]

folder_path_label = "E:\Study\TeamProject_2\Dataset_CCTV\Labeling_Data\CCTV\야간\우천\연출"
labels = load_annotations_from_folder(folder_path_label)

dataset = tf.data.Dataset.from_tensor_slices((images, annotations_array))
dataset = dataset.map(preprocess_data)

# 데이터셋 셔플, 배치 사이즈 설정, 프리페치(성능 최적화)
dataset = dataset.shuffle(1024).batch(512).prefetch(tf.data.experimental.AUTOTUNE)

TypeError: in user code:

    File "C:\Users\k10dh\AppData\Local\Temp\ipykernel_40832\205698403.py", line 10, in preprocess_data  *
        class_id = tf.one_hot(labels[1], depth=21)

    TypeError: Value passed to parameter 'indices' has DataType float64 not in list of allowed values: uint8, int8, int32, int64


In [42]:
from tensorflow.keras.models import Model
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Conv2D, Input, TimeDistributed, Dense, Flatten, Dropout

def create_model(num_classes): # 감지하려는 클래스의 종류
    input_tensor = Input(shape=(640, 640, 3))

    # Backbone Network
    base_model = ResNet50(include_top=False, input_tensor=input_tensor)

    # RPN Layer
    rpn_layer = Conv2D(512, (3, 3), padding='same', activation='relu', 
                       kernel_initializer='normal', name='rpn_conv1')(base_model.output)
    
    # Classifier Layer
    classification_layer = TimeDistributed(Dense(num_classes, activation='softmax'), name='classification_layer')(rpn_layer)
    bbox_regression_layer = TimeDistributed(Dense(num_classes * 4, activation='linear'), name='bbox_regression_layer')(rpn_layer)
    
    return Model(inputs=input_tensor, outputs=[classification_layer, bbox_regression_layer])

In [51]:
model = create_model(21)

model.compile(optimizer='adam', 
              loss={'classification_layer': 'categorical_crossentropy', 
                    'bbox_regression_layer': 'mean_squared_error'}, 
              metrics={'classification_layer': 'accuracy',
                       'bbox_regression_layer': 'mse'})

In [52]:
model.fit(resized_images, annotations_array, epochs=5)

Epoch 1/5


ValueError: in user code:

    File "c:\Users\k10dh\anaconda3\envs\tf\lib\site-packages\keras\engine\training.py", line 1284, in train_function  *
        return step_function(self, iterator)
    File "c:\Users\k10dh\anaconda3\envs\tf\lib\site-packages\keras\engine\training.py", line 1268, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "c:\Users\k10dh\anaconda3\envs\tf\lib\site-packages\keras\engine\training.py", line 1249, in run_step  **
        outputs = model.train_step(data)
    File "c:\Users\k10dh\anaconda3\envs\tf\lib\site-packages\keras\engine\training.py", line 1051, in train_step
        loss = self.compute_loss(x, y, y_pred, sample_weight)
    File "c:\Users\k10dh\anaconda3\envs\tf\lib\site-packages\keras\engine\training.py", line 1109, in compute_loss
        return self.compiled_loss(
    File "c:\Users\k10dh\anaconda3\envs\tf\lib\site-packages\keras\engine\compile_utils.py", line 265, in __call__
        loss_value = loss_obj(y_t, y_p, sample_weight=sw)
    File "c:\Users\k10dh\anaconda3\envs\tf\lib\site-packages\keras\losses.py", line 142, in __call__
        losses = call_fn(y_true, y_pred)
    File "c:\Users\k10dh\anaconda3\envs\tf\lib\site-packages\keras\losses.py", line 268, in call  **
        return ag_fn(y_true, y_pred, **self._fn_kwargs)
    File "c:\Users\k10dh\anaconda3\envs\tf\lib\site-packages\keras\losses.py", line 1984, in categorical_crossentropy
        return backend.categorical_crossentropy(
    File "c:\Users\k10dh\anaconda3\envs\tf\lib\site-packages\keras\backend.py", line 5559, in categorical_crossentropy
        target.shape.assert_is_compatible_with(output.shape)

    ValueError: Shapes (None, 5) and (None, 20, 20, 21) are incompatible
