In [1]:
import os
import time
import pickle
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score

# --- 1. 경로 및 설정 ---
# 'cifar-10-batches-py' 폴더 자체의 경로를 정확하게 입력해주세요.
DATASET_PATH = '/Users/younghohuh/Downloads/cifar-10-batches-py'
OUTPUT_CSV_PATH = 'cifar10_predictions_final.csv'
NUM_TEST_SAMPLES = 1000  # 테스트할 이미지 개수

# 클래스 이름 정의 (파일 로드 순서와 일치)
class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
id_to_label = {i: label for i, label in enumerate(class_names)}

# --- 2. CIFAR-10 Batch 파일 로드 함수 ---
def unpickle(file):
    """pickle 파일을 로드하는 헬퍼 함수"""
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

def load_cifar10_raw_data(dataset_path):
    """
    CIFAR-10 batch 파일들을 로드하고, 학습 및 테스트 데이터를 반환합니다.
    """
    # 학습 데이터 로드 (data_batch_1 ~ 5)
    train_images_list = []
    train_labels_list = []
    print("학습 데이터 로드 중...")
    for i in range(1, 6):
        file_path = os.path.join(dataset_path, f'data_batch_{i}')
        batch_dict = unpickle(file_path)
        train_images_list.append(batch_dict[b'data'])
        train_labels_list.extend(batch_dict[b'labels'])
    
    X_train = np.concatenate(train_images_list)
    y_train = np.array(train_labels_list)
    
    # 테스트 데이터 로드 (test_batch)
    print("테스트 데이터 로드 중...")
    test_dict = unpickle(os.path.join(dataset_path, 'test_batch'))
    X_test = test_dict[b'data']
    y_test = np.array(test_dict[b'labels']) # 실제 정답 (성능 평가용)
    
    print("데이터 로드 완료.")
    return (X_train, y_train), (X_test, y_test)

# --- 3. 메인 실행 로직 ---
# 단계 1: 데이터 로드 및 전처리
print("### 1. 데이터 로딩 및 전처리 ###")
(X_train_full, y_train_full), (X_test_full, y_test_full) = load_cifar10_raw_data(DATASET_PATH)

print("\n데이터 정규화를 진행합니다...")
X_train_full_normalized = X_train_full / 255.0
X_test_normalized = X_test_full / 255.0
print("-" * 30)

# 단계 2: 학습/검증 데이터 분할 (프레젠테이션 Idea #3)
print("\n### 2. 학습(Train) / 검증(Validation) 데이터 분할 ###")
# 전체 학습 데이터를 학습용(45,000)과 검증용(5,000)으로 분할
X_train, X_val, y_train, y_val = train_test_split(
    X_train_full_normalized, y_train_full, 
    test_size=5000, 
    random_state=42, 
    stratify=y_train_full # 클래스 비율을 유지하며 분할
)
print(f"  - 학습(Train) 데이터: {X_train.shape}")
print(f"  - 검증(Validation) 데이터: {X_val.shape}")
print("-" * 30)

# 단계 3: 최적의 k 찾기 (검증 데이터 사용)
print("\n### 3. 최적의 k 찾기 (홀수 k) ###")
print("경고: 이 과정은 각 k마다 상당한 시간이 소요될 수 있습니다.")
best_k = 0
best_accuracy = 0
k_values = range(1, 14, 2)  # 1, 3, 5, ..., 13

for k in k_values:
    print(f"\n_k = {k}_ 모델 테스트 중...")
    start_time = time.time()
    
    # n_jobs=-1: CPU 모든 코어를 사용하여 계산 속도 향상
    knn = KNeighborsClassifier(n_neighbors=k, n_jobs=-1)
    # 학습 데이터로 모델 학습
    knn.fit(X_train, y_train)
    # 검증 데이터로 성능 측정
    y_val_pred = knn.predict(X_val)
    accuracy = accuracy_score(y_val, y_val_pred)
    
    end_time = time.time()
    print(f"-> k = {k} 검증 정확도: {accuracy * 100:.2f}% (소요 시간: {end_time - start_time:.2f}초)")
    
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        best_k = k

print("-" * 30)
print(f"최적의 k를 찾았습니다: k = {best_k} (최고 검증 정확도: {best_accuracy * 100:.2f}%)")
print("-" * 30)

# 단계 4: 최종 모델 학습 및 예측 (전체 학습 데이터 사용)
print(f"\n### 4. 최종 모델 학습 및 예측 (k={best_k}) ###")
print("전체 50,000개 데이터로 최종 모델을 학습합니다... (시간이 다소 소요됩니다)")
final_knn_classifier = KNeighborsClassifier(n_neighbors=best_k, n_jobs=-1)
# ★★★ 전체 학습 데이터(Train + Validation)로 다시 학습
final_knn_classifier.fit(X_train_full_normalized, y_train_full)

print(f"\n{NUM_TEST_SAMPLES}개의 테스트 데이터에 대한 예측을 시작합니다...")
start_time = time.time()
X_test_subset = X_test_normalized[:NUM_TEST_SAMPLES]
final_predictions_numeric = final_knn_classifier.predict(X_test_subset)
end_time = time.time()

print(f"최종 예측 완료! (소요 시간: {end_time - start_time:.2f}초)")
print("-" * 30)

# (참고) 실제 1000개 테스트 데이터에 대한 정확도 출력
actual_labels = y_test_full[:NUM_TEST_SAMPLES]
final_test_accuracy = accuracy_score(actual_labels, final_predictions_numeric)
print(f"★ 최종 모델의 테스트 정확도(1000개 샘플): {final_test_accuracy * 100:.2f}%")
print("-" * 30)

# 단계 5: 결과 CSV 파일로 저장
print("\n### 5. 결과 저장 ###")
ids = np.arange(1, NUM_TEST_SAMPLES + 1) # 과제 요구사항에 맞게 ID를 1부터 생성
predicted_class_names = [id_to_label[i] for i in final_predictions_numeric]

final_df = pd.DataFrame({'id': ids, 'label': predicted_class_names})
final_df.to_csv(OUTPUT_CSV_PATH, index=False)

print(f"'{OUTPUT_CSV_PATH}' 파일로 결과가 성공적으로 저장되었습니다.")
print("\n=== 최종 결과 미리보기 ===")
print(final_df.head())

### 1. 데이터 로딩 및 전처리 ###
학습 데이터 로드 중...
테스트 데이터 로드 중...
데이터 로드 완료.

데이터 정규화를 진행합니다...
------------------------------

### 2. 학습(Train) / 검증(Validation) 데이터 분할 ###
  - 학습(Train) 데이터: (45000, 3072)
  - 검증(Validation) 데이터: (5000, 3072)
------------------------------

### 3. 최적의 k 찾기 (홀수 k) ###
경고: 이 과정은 각 k마다 상당한 시간이 소요될 수 있습니다.

_k = 1_ 모델 테스트 중...
-> k = 1 검증 정확도: 34.56% (소요 시간: 8.28초)

_k = 3_ 모델 테스트 중...
-> k = 3 검증 정확도: 33.30% (소요 시간: 8.68초)

_k = 5_ 모델 테스트 중...
-> k = 5 검증 정확도: 34.40% (소요 시간: 8.48초)

_k = 7_ 모델 테스트 중...
-> k = 7 검증 정확도: 34.34% (소요 시간: 9.35초)

_k = 9_ 모델 테스트 중...
-> k = 9 검증 정확도: 34.04% (소요 시간: 9.55초)

_k = 11_ 모델 테스트 중...
-> k = 11 검증 정확도: 33.58% (소요 시간: 9.53초)

_k = 13_ 모델 테스트 중...
-> k = 13 검증 정확도: 33.34% (소요 시간: 9.50초)
------------------------------
최적의 k를 찾았습니다: k = 1 (최고 검증 정확도: 34.56%)
------------------------------

### 4. 최종 모델 학습 및 예측 (k=1) ###
전체 50,000개 데이터로 최종 모델을 학습합니다... (시간이 다소 소요됩니다)

1000개의 테스트 데이터에 대한 예측을 시작합니다...
최종 예측 완료! (소요 시간: 2.54초)
----------