In [1]:
import numpy as np
import pandas as pd
import os
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization, GlobalAveragePooling2D
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import load_img, img_to_array
import matplotlib.pyplot as plt
from tensorflow.keras.applications.resnet50 import preprocess_input

data_path = 'D:\galaxy_classification\data3.csv'
image_dir = 'D:\galaxy_classification\images'

data = pd.read_csv(data_path)

class_mapping = {'S': 0, 'E': 1, 'SB': 2, 'EDGE': 3}

# 각 클래스별로 15000장씩 샘플링
sampled_data = data.groupby('class').apply(lambda x: x.sample(15000, random_state=42)).reset_index(drop=True)

train_data, val_data = train_test_split(sampled_data, test_size=0.2, random_state=42)

def image_generator(data, batch_size, image_dir, class_mapping, augment=False):
    datagen = ImageDataGenerator(
        preprocessing_function=preprocess_input,  # ResNet50에 맞는 전처리 함수
        rotation_range=30,
        width_shift_range=0.3,
        height_shift_range=0.3,
        shear_range=0.3,
        zoom_range=0.3,
        horizontal_flip=True,
        fill_mode='nearest'
    ) if augment else ImageDataGenerator(preprocessing_function=preprocess_input)

    num_samples = data.shape[0]
    while True:
        for offset in range(0, num_samples, batch_size):
            batch_samples = data.iloc[offset:offset+batch_size]
            
            images = []
            labels = []
            for _, row in batch_samples.iterrows():
                image_id = row['image_id']
                label = row['class']
                
                image_path = os.path.join(image_dir, f"{image_id}.jpg")
                
                if os.path.exists(image_path):
                    img = load_img(image_path, target_size=(128, 128))
                    img = img_to_array(img)
                    images.append(img)
                    labels.append(class_mapping[label])
            
            X = np.array(images)
            y = to_categorical(np.array(labels), num_classes=len(class_mapping))
            
            if augment:
                yield next(datagen.flow(X, y, batch_size=batch_size))
            else:
                yield X, y

batch_size = 32

train_generator = image_generator(train_data, batch_size, image_dir, class_mapping, augment=True)
val_generator = image_generator(val_data, batch_size, image_dir, class_mapping, augment=False)

# ResNet50 모델 불러오기 (사전 학습된 가중치 사용, 최상위 레이어는 제외)
base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(128, 128, 3))

# 모든 레이어를 미세 조정 가능하도록 설정
for layer in base_model.layers:
    layer.trainable = True

# 새로운 레이어 추가
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(256, activation='relu')(x)
x = Dropout(0.2)(x)
predictions = Dense(len(class_mapping), activation='softmax')(x)

# 모델 정의
model = Model(inputs=base_model.input, outputs=predictions)

# 모델 컴파일
model.compile(optimizer=Adam(learning_rate=0.00001), loss='categorical_crossentropy', metrics=['accuracy'])
model.summary()

# 학습률 감소 및 조기 종료 콜백 추가
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=3, min_lr=0.000001)
early_stopping = EarlyStopping(monitor='val_loss', patience=6, restore_best_weights=True)

# 데이터 제너레이터 테스트
X_test, y_test = next(train_generator)
print(f"Batch X shape: {X_test.shape}, Batch y shape: {y_test.shape}")


Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 128, 128, 3) 0                                            
__________________________________________________________________________________________________
conv1_pad (ZeroPadding2D)       (None, 134, 134, 3)  0           input_1[0][0]                    
__________________________________________________________________________________________________
conv1_conv (Conv2D)             (None, 64, 64, 64)   9472        conv1_pad[0][0]                  
__________________________________________________________________________________________________
conv1_bn (BatchNormalization)   (None, 64, 64, 64)   256         conv1_conv[0][0]                 
______________________________________________________________________________________________

In [2]:

# 모델 훈련
history = model.fit(
    train_generator,
    steps_per_epoch=len(train_data) // batch_size,
    epochs=20,
    validation_data=val_generator,
    validation_steps=len(val_data) // batch_size,
    callbacks=[reduce_lr, early_stopping],
    verbose=1
)



Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20

KeyboardInterrupt: 

In [None]:
model.save('D:\galaxy_classification\galaxy_classifier_ver15.h5')

# 결과 플롯
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
epochs = range(len(acc))

plt.plot(epochs, acc, 'r', label='Training accuracy')
plt.plot(epochs, val_acc, 'b', label='Validation accuracy')
plt.title('Training and validation accuracy')
plt.legend()
plt.show()