In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
import tensorflow as tf
import tensorflow_datasets as tfds
import numpy as np
import cv2
import matplotlib.pyplot as plt
import os
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from tqdm import tqdm

# 랜덤 시드 설정
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
# 셀 2: Waymo Open Dataset 다운로드 함수
def download_waymo_dataset():
    """
    Waymo Open Dataset을 다운로드하고 접근합니다.
    라이선스 등록이 완료된 상태여야 합니다.
    """
    print("Waymo Open Dataset을 다운로드하는 중...")
    
    try:
        # Waymo Open Dataset v1.2 (기본 구성) 접근
        dataset = tfds.load(
            'waymo_open_dataset/v1.2'
            # 다른 경로가 필요하면 아래 주석을 해제하고 경로를 수정하세요
            # data_dir='gs://waymo_open_dataset_v_1_2_0_individual_files/tensorflow_datasets'
        )
        
        print("데이터셋 로드 성공!")
        return dataset
    
    except Exception as e:
        print(f"데이터셋 로드 중 오류 발생: {e}")
        
        # 대체 방법: v1.0 버전으로 시도
        try:
            print("v1.0 버전으로 다시 시도합니다...")
            dataset = tfds.load(
                'waymo_open_dataset/v1.0',
                data_dir='gs://waymo_open_dataset_v_1_0_0_individual_files/tensorflow_datasets'
            )
            print("v1.0 데이터셋 로드 성공!")
            return dataset
        except Exception as e2:
            print(f"v1.0 데이터셋 로드 중 오류 발생: {e2}")
            print("라이선스 등록 상태를 다시 확인하거나 GCS 접근 권한을 확인하세요.")
            return None

In [None]:
# 셀 3: 데이터셋 탐색 함수
def explore_dataset(dataset):
    """
    데이터셋 구조를 탐색합니다.
    """
    if dataset is None:
        return
    
    print("\n데이터셋 구조 탐색:")
    print("사용 가능한 분할:", dataset.keys())
    
    # 훈련 데이터셋의 첫 번째 샘플 확인
    train_dataset = dataset['train']
    
    for example in train_dataset.take(1):
        print("\n데이터셋 키:", list(example.keys()))
        
        # 카메라 이미지 정보 확인
        for camera in ['camera_FRONT', 'camera_FRONT_LEFT', 'camera_FRONT_RIGHT', 
                       'camera_SIDE_LEFT', 'camera_SIDE_RIGHT']:
            if camera in example:
                print(f"\n{camera} 이미지 형태:", example[camera]['image'].shape)
                labels = example[camera]['labels']
                if 'type' in labels:
                    print(f"{camera} 레이블 개수:", len(labels['type']))
                    # 레이블이 있는 경우 레이블 값 확인
                    if len(labels['type']) > 0:
                        print(f"{camera} 첫 번째 레이블 타입:", labels['type'][0].numpy())

In [None]:
# 셀 4: 샘플 이미지 시각화 함수
def visualize_sample(dataset):
    """
    데이터셋의 샘플 이미지를 시각화합니다.
    """
    if dataset is None:
        return
    
    train_dataset = dataset['train']
    
    for example in train_dataset.take(1):
        # 전방 카메라 이미지 시각화
        plt.figure(figsize=(15, 10))
        
        cameras = ['camera_FRONT', 'camera_FRONT_LEFT', 'camera_FRONT_RIGHT', 
                   'camera_SIDE_LEFT', 'camera_SIDE_RIGHT']
        
        for i, camera in enumerate(cameras):
            if camera in example:
                plt.subplot(2, 3, i+1)
                plt.imshow(example[camera]['image'])
                plt.title(camera)
                
                # 바운딩 박스 추가
                if len(example[camera]['labels']['bbox']) > 0:
                    for bbox in example[camera]['labels']['bbox']:
                        y1, x1, y2, x2 = bbox.numpy()
                        height, width = example[camera]['image'].shape[0:2]
                        
                        # 정규화된 좌표를 실제 픽셀 좌표로 변환
                        x1 = int(x1 * width)
                        y1 = int(y1 * height)
                        x2 = int(x2 * width)
                        y2 = int(y2 * height)
                        
                        # 바운딩 박스 그리기
                        plt.gca().add_patch(plt.Rectangle((x1, y1), x2-x1, y2-y1, 
                                                         fill=False, edgecolor='red', linewidth=2))
        
        plt.tight_layout()
        plt.show()

In [None]:
# 셀 5: 3D CNN 모델을 위한 데이터 전처리 함수
# 3D CNN 모델의 입력 형태 정의
INPUT_DEPTH = 16    # 시간축 길이 (연속된 프레임 수)
INPUT_HEIGHT = 32   # 이미지 높이
INPUT_WIDTH = 32    # 이미지 너비
NUM_CLASSES = 5     # Waymo 데이터셋의 클래스 수

def preprocess_waymo_data(dataset, max_samples=1000, save_dir='processed_data'):
    """
    Waymo 데이터셋을 3D CNN 모델에 맞게 전처리합니다.
    
    Args:
        dataset: Waymo 데이터셋
        max_samples: 처리할 최대 샘플 수
        save_dir: 처리된 데이터를 저장할 디렉토리
    
    Returns:
        X_train, y_train, X_val, y_val, X_test, y_test
    """
    os.makedirs(save_dir, exist_ok=True)
    
    # 데이터 저장 경로
    X_path = os.path.join(save_dir, 'X_data.npy')
    y_path = os.path.join(save_dir, 'y_data.npy')
    
    # 이미 처리된 데이터가 있는지 확인
    if os.path.exists(X_path) and os.path.exists(y_path):
        print("로드된 전처리 데이터를 사용합니다.")
        X_data = np.load(X_path)
        y_data = np.load(y_path)
    else:
        print("데이터 전처리를 시작합니다...")
        X_data = []
        y_data = []
        
        train_dataset = dataset['train']
        count = 0
        
        for example in tqdm(train_dataset):
            if count >= max_samples:
                break
                
            # 전방 카메라 이미지와 레이블 사용
            front_camera = example['camera_FRONT']
            images = front_camera['image']
            labels = front_camera['labels']['type']
            
            # 레이블이 있는 경우만 처리
            if len(labels) > 0:
                # 첫 번째 객체 레이블 사용 (단순화를 위해)
                label = labels[0].numpy()
                
                # 이미지 전처리
                img = images.numpy()
                img_gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
                img_resized = cv2.resize(img_gray, (INPUT_WIDTH, INPUT_HEIGHT))
                
                # 샘플 추가
                X_data.append(img_resized)
                y_data.append(label)
                
                count += 1
        
        # 3D 데이터 생성 (가상의 시간 차원 생성)
        # 실제 시나리오에서는 연속된 프레임을 사용해야 합니다
        print("3D 데이터 생성 중...")
        X_3d = []
        
        # 충분한 샘플이 있는지 확인
        if len(X_data) >= INPUT_DEPTH:
            for i in range(len(X_data) - INPUT_DEPTH + 1):
                # INPUT_DEPTH 개수의 연속된 프레임을 하나의 3D 샘플로 구성
                sequence = np.array(X_data[i:i+INPUT_DEPTH])
                sequence = sequence.reshape(INPUT_DEPTH, INPUT_HEIGHT, INPUT_WIDTH, 1)
                X_3d.append(sequence)
            
            # 레이블 조정 (첫 번째 프레임의 레이블을 시퀀스 레이블로 사용)
            y_3d = y_data[:len(X_3d)]
            
            # numpy 배열로 변환
            X_data = np.array(X_3d)
            y_data = np.array(y_3d)
            
            # 저장
            np.save(X_path, X_data)
            np.save(y_path, y_data)
        else:
            print(f"경고: 샘플이 충분하지 않습니다. 필요: {INPUT_DEPTH}, 실제: {len(X_data)}")
            return None, None, None, None, None, None
    
    # 데이터 정보 출력
    print(f"데이터 형태: {X_data.shape}")
    print(f"레이블 형태: {y_data.shape}")
    print(f"고유 클래스: {np.unique(y_data)}")
    
    # 데이터 분할 (훈련/검증/테스트)
    # 먼저 훈련+검증 데이터와 테스트 데이터 분할
    X_temp, X_test, y_temp, y_test = train_test_split(
        X_data, y_data, test_size=0.2, random_state=42, stratify=y_data
    )
    
    # 훈련 데이터와 검증 데이터 분할
    X_train, X_val, y_train, y_val = train_test_split(
        X_temp, y_temp, test_size=0.25, random_state=42, stratify=y_temp
    )
    
    # 레이블을 one-hot 인코딩으로 변환
    y_train = to_categorical(y_train, NUM_CLASSES)
    y_val = to_categorical(y_val, NUM_CLASSES)
    y_test = to_categorical(y_test, NUM_CLASSES)
    
    print(f"훈련 데이터: {X_train.shape}, {y_train.shape}")
    print(f"검증 데이터: {X_val.shape}, {y_val.shape}")
    print(f"테스트 데이터: {X_test.shape}, {y_test.shape}")
    
    return X_train, y_train, X_val, y_val, X_test, y_test

In [None]:
# 셀 6: 시간적 정보가 있는 경우의 대체 전처리 방법
def preprocess_temporal_data(dataset, max_samples=1000, temporal_window=16, save_dir='processed_temporal_data'):
    """
    시간적 정보가 있는 경우 Waymo 데이터셋을 3D CNN 모델에 맞게 전처리합니다.
    
    Args:
        dataset: Waymo 데이터셋
        max_samples: 처리할 최대 샘플 수
        temporal_window: 시간 창 크기
        save_dir: 처리된 데이터를 저장할 디렉토리
    """
    os.makedirs(save_dir, exist_ok=True)
    
    X_path = os.path.join(save_dir, 'X_temporal_data.npy')
    y_path = os.path.join(save_dir, 'y_temporal_data.npy')
    
    if os.path.exists(X_path) and os.path.exists(y_path):
        print("로드된 시간적 데이터를 사용합니다.")
        X_data = np.load(X_path)
        y_data = np.load(y_path)
        return X_data, y_data
    
    # 시간순으로 정렬된 프레임 저장을 위한 사전
    temporal_frames = {}
    
    train_dataset = dataset['train']
    count = 0
    
    print("시간 정보를 사용하여 데이터 수집 중...")
    for example in tqdm(train_dataset):
        if count >= max_samples:
            break
            
        # 타임스탬프 추출
        timestamp = example['timestamp_micros'].numpy()
        context_name = example['context']['name'].numpy().decode('utf-8')
        
        # 컨텍스트별로 프레임 그룹화
        if context_name not in temporal_frames:
            temporal_frames[context_name] = []
        
        # 전방 카메라 이미지와 레이블 사용
        front_camera = example['camera_FRONT']
        image = front_camera['image'].numpy()
        
        # 레이블이 있는 경우만 처리
        if len(front_camera['labels']['type']) > 0:
            label = front_camera['labels']['type'][0].numpy()
            
            # 이미지 전처리
            img_gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
            img_resized = cv2.resize(img_gray, (INPUT_WIDTH, INPUT_HEIGHT))
            
            # 프레임 정보 저장
            temporal_frames[context_name].append({
                'timestamp': timestamp,
                'image': img_resized,
                'label': label
            })
            
            count += 1
    
    print(f"수집된 컨텍스트 수: {len(temporal_frames)}")
    
    # 3D 데이터 생성
    X_3d = []
    y_3d = []
    
    for context, frames in temporal_frames.items():
        # 타임스탬프로 정렬
        frames.sort(key=lambda x: x['timestamp'])
        
        # 충분한 프레임이 있는 경우만 처리
        if len(frames) >= temporal_window:
            for i in range(len(frames) - temporal_window + 1):
                # temporal_window 크기의 연속된 프레임을 하나의 3D 샘플로 구성
                sequence = np.array([frame['image'] for frame in frames[i:i+temporal_window]])
                sequence = sequence.reshape(temporal_window, INPUT_HEIGHT, INPUT_WIDTH, 1)
                
                # 시퀀스의 마지막 프레임 레이블 사용
                label = frames[i+temporal_window-1]['label']
                
                X_3d.append(sequence)
                y_3d.append(label)
    
    # numpy 배열로 변환
    if X_3d:
        X_data = np.array(X_3d)
        y_data = np.array(y_3d)
        
        # 저장
        np.save(X_path, X_data)
        np.save(y_path, y_data)
        
        print(f"시간적 데이터 형태: {X_data.shape}")
        print(f"시간적 레이블 형태: {y_data.shape}")
        
        return X_data, y_data
    else:
        print("충분한 시간적 데이터를 생성할 수 없습니다.")
        return None, None

In [None]:
# 셀 7: 메인 실행 함수
def main():
    # Waymo 데이터셋 가져오기
    dataset = download_waymo_dataset()
    
    if dataset is None:
        print("데이터셋을 불러올 수 없습니다.")
        return
        
    # 데이터셋 구조 탐색
    explore_dataset(dataset)
    
    # 샘플 시각화
    visualize_sample(dataset)
    
    # 기본 전처리 방법
    X_train, y_train, X_val, y_val, X_test, y_test = preprocess_waymo_data(
        dataset, max_samples=1000
    )
    
    # X_train이 None이면 데이터 처리에 실패한 것
    if X_train is None:
        print("데이터 전처리 실패. 프로그램을 종료합니다.")
        return
    
    # 시간적 정보를 활용한 전처리 방법 (선택 사항)
    # X_temporal, y_temporal = preprocess_temporal_data(dataset)
    
    return X_train, y_train, X_val, y_val, X_test, y_test

In [None]:
class MCDropout3DCNN(tf.keras.Model):
    def __init__(self, input_shape, num_classes=5, dropout_rate=0.3):
        super(MCDropout3DCNN, self).__init__()
        
        # 첫 번째 3D 합성곱 블록
        self.conv1_1 = layers.Conv3D(32, kernel_size=3, activation='relu', padding='same')
        self.bn1_1 = layers.BatchNormalization()
        self.conv1_2 = layers.Conv3D(32, kernel_size=3, activation='relu', padding='same')
        self.bn1_2 = layers.BatchNormalization()
        self.pool1 = layers.MaxPool3D(pool_size=2)
        self.drop1 = layers.Dropout(dropout_rate)
        
        # 두 번째 3D 합성곱 블록
        self.conv2_1 = layers.Conv3D(64, kernel_size=3, activation='relu', padding='same')
        self.bn2_1 = layers.BatchNormalization()
        self.conv2_2 = layers.Conv3D(64, kernel_size=3, activation='relu', padding='same')
        self.bn2_2 = layers.BatchNormalization()
        self.pool2 = layers.MaxPool3D(pool_size=2)
        self.drop2 = layers.Dropout(dropout_rate)
        
        # 세 번째 3D 합성곱 블록
        self.conv3_1 = layers.Conv3D(128, kernel_size=3, activation='relu', padding='same',
                           kernel_regularizer=regularizers.l2(1e-4))
        self.bn3_1 = layers.BatchNormalization()
        self.conv3_2 = layers.Conv3D(128, kernel_size=3, activation='relu', padding='same',
                           kernel_regularizer=regularizers.l2(1e-4))
        self.bn3_2 = layers.BatchNormalization()
        self.pool3 = layers.MaxPool3D(pool_size=2)
        self.drop3 = layers.Dropout(dropout_rate)
        
        # 완전 연결 계층
        self.flatten = layers.Flatten()
        self.dense1 = layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(1e-4))
        self.bn4 = layers.BatchNormalization()
        self.drop4 = layers.Dropout(dropout_rate)
        self.dense2 = layers.Dense(num_classes, activation='softmax')
    
    def call(self, inputs, training=False):
        # 첫 번째 블록
        x = self.conv1_1(inputs)
        x = self.bn1_1(x, training=training)
        x = self.conv1_2(x)
        x = self.bn1_2(x, training=training)
        x = self.pool1(x)
        x = self.drop1(x, training=training)
        
        # 두 번째 블록
        x = self.conv2_1(x)
        x = self.bn2_1(x, training=training)
        x = self.conv2_2(x)
        x = self.bn2_2(x, training=training)
        x = self.pool2(x)
        x = self.drop2(x, training=training)
        
        # 세 번째 블록
        x = self.conv3_1(x)
        x = self.bn3_1(x, training=training)
        x = self.conv3_2(x)
        x = self.bn3_2(x, training=training)
        x = self.pool3(x)
        x = self.drop3(x, training=training)
        
        # 완전 연결 계층
        x = self.flatten(x)
        x = self.dense1(x)
        x = self.bn4(x, training=training)
        x = self.drop4(x, training=training)
        x = self.dense2(x)
        
        return x
    
    def build_graph(self):
        x = tf.keras.Input(shape=(16, 32, 32, 1))
        return tf.keras.Model(inputs=[x], outputs=self.call(x))

In [None]:
input_shape = (16, 32, 32, 1)  # (depth, height, width, channels)
mc_dropout_model = MCDropout3DCNN(input_shape)
mc_dropout_graph = mc_dropout_model.build_graph()
print("MC Dropout 모델 요약:")
mc_dropout_graph.summary()
    

In [None]:
mc_dropout_model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

In [None]:
def mc_dropout_predict(model, X, num_samples=30):
    predictions = []
    
    # 학습 모드로 설정하여 드롭아웃 활성화
    for _ in range(num_samples):
        y_pred = model(X, training=True)
        predictions.append(y_pred.numpy())
    
    # 예측값의 평균과 표준편차
    mean_prediction = np.mean(predictions, axis=0)
    std_prediction = np.std(predictions, axis=0)
    
    return mean_prediction, std_prediction

In [None]:
def plot_uncertainty(mean_pred, std_pred, sample_idx=0):
    plt.figure(figsize=(12, 6))
    
    # 각 클래스별 예측 확률과 불확실성
    plt.subplot(1, 2, 1)
    num_classes = mean_pred.shape[1]
    classes = range(num_classes)
    
    plt.bar(classes, mean_pred[sample_idx], yerr=std_pred[sample_idx], capsize=10)
    plt.xlabel('Class')
    plt.ylabel('Probability')
    plt.title('Prediction Probability with Uncertainty')
    plt.xticks(classes)
    
    # 불확실성 분포
    plt.subplot(1, 2, 2)
    plt.hist(np.mean(std_pred, axis=1), bins=30)
    plt.axvline(std_pred[sample_idx].mean(), color='r', linestyle='--', 
                label=f'Sample #{sample_idx} uncertainty')
    plt.xlabel('Mean Uncertainty (Std)')
    plt.ylabel('Frequency')
    plt.title('Uncertainty Distribution')
    plt.legend()
    
    plt.tight_layout()
    plt.show()

In [None]:
def plot_uncertainty_vs_accuracy(mean_pred, std_pred, y_true):
    # 예측 클래스와 실제 클래스
    pred_classes = np.argmax(mean_pred, axis=1)
    true_classes = np.argmax(y_true, axis=1)
    
    # 정확한 예측과 잘못된 예측 구분
    correct = pred_classes == true_classes
    
    # 각 샘플의 평균 불확실성
    mean_uncertainty = np.mean(std_pred, axis=1)
    
    plt.figure(figsize=(10, 6))
    plt.scatter(mean_uncertainty[correct], np.ones(np.sum(correct)), 
                label='Correct Predictions', alpha=0.5, color='blue')
    plt.scatter(mean_uncertainty[~correct], np.zeros(np.sum(~correct)), 
                label='Wrong Predictions', alpha=0.5, color='red')
    
    plt.xlabel('Mean Uncertainty (Std)')
    plt.ylabel('Prediction Correctness')
    plt.yticks([0, 1], ['Wrong', 'Correct'])
    plt.title('Relationship Between Uncertainty and Prediction Accuracy')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()


In [None]:
def train_model(model, X_train, y_train, X_val, y_val, epochs=50, batch_size=16):
    # 콜백 정의
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=10,
        restore_best_weights=True
    )

    reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=5,
        min_lr=1e-6,
        verbose=1
    )
    
    # 모델 학습
    history = model.fit(
        X_train, y_train,
        epochs=epochs,
        batch_size=batch_size,
        validation_data=(X_val, y_val),
        callbacks=[early_stopping, reduce_lr],
        verbose=1
    )
    
    return history


In [None]:
mc_dropout_model.save('mc_dropout_3dcnn_model')


In [None]:
def plot_training_history(history):
    plt.figure(figsize=(12, 4))
    
    # 정확도 시각화
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    
    # 손실 시각화
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.tight_layout()
    plt.show()

In [None]:
def evaluate_model_with_uncertainty(model, X_test, y_test):
    # MC Dropout을 사용한 베이지안 추론
    mean_pred, std_pred = mc_dropout_predict(model, X_test)
    pred_classes = np.argmax(mean_pred, axis=1)
    true_classes = np.argmax(y_test, axis=1)
    
    # 정확도 계산
    accuracy = accuracy_score(true_classes, pred_classes)
    print(f"Test Accuracy: {accuracy:.4f}")
    
    # 혼동 행렬
    conf_matrix = confusion_matrix(true_classes, pred_classes)
    plt.figure(figsize=(8, 6))
    plt.imshow(conf_matrix, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title('Confusion Matrix')
    plt.colorbar()
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.tight_layout()
    plt.show()
    
    # 분류 보고서
    report = classification_report(true_classes, pred_classes)
    print("Classification Report:")
    print(report)
    
    # 불확실성 시각화
    plot_uncertainty(mean_pred, std_pred)
    
    # 불확실성과 정확도 관계 분석
    plot_uncertainty_vs_accuracy(mean_pred, std_pred, y_test)
    
    return mean_pred, std_pred


In [None]:
loaded_model = tf.keras.models.load_model('mc_dropout_3dcnn_model')

