In [34]:
import os
import cv2
from tensorflow.keras.applications import ResNet50, DenseNet121, VGG16, VGG19, MobileNetV2
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Flatten, Dropout, Input, Conv2D
# from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
from sklearn.model_selection import GridSearchCV
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
import numpy as np
from keras.layers import Input, Conv2D, Flatten, Dense, Dropout
from keras.optimizers import Adam, RMSprop, SGD
from sklearn.model_selection import GridSearchCV

In [28]:
"""
  Colab에서 scikeras를 설치하는 경우 세션이 강제 종료되는 경우가 발생하여
  설치가 불가능하고 GridSearch가 불가능할 경우 해당 클래스를 선언한 뒤에 사용하면 됩니다.
  로컬 pc에서는 keras 버전을 수정할 수 있어서 scikeras 설치 가능
  GPU가 없는 PC라면 굳이...
  !pip install scikeras
  from scikeras import KerasClassifier
"""
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.model_selection import cross_val_score
class KerasClassifier(BaseEstimator, ClassifierMixin):
  def __init__(self, build_fn=None, epochs=1, batch_size=32, verbose=0):
    self.build_fn = build_fn
    self.epochs = epochs
    self.batch_size = batch_size
    self.verbose = verbose
    self.model_ = None
  def fit(self, X, y):
    self.model_ = self.build_fn()
    self.model_.fit(X, y, epochs=self.epochs, batch_size=self.batch_size, verbose=self.verbose)
    return self
  def predict(self, X):
    return (self.model_.predict(X)>0.5).astype('int32')
  def predict_proba(self, X):
    return self.model_.predict(X)
# model = KerasClassifier(build_fn="모델명", epochs="에폭 값", batch_size="배치 값", verbose=0)

In [29]:
import tensorflow as tf

gpus = tf.config.list_physical_devices('GPU')
if not gpus:
    print("GPU 없음.")
else:
    print(f"사용 가능한 GPU: {gpus}")

from tensorflow.python.client import device_lib
print("사용 가능한 장치 목록:")
print(device_lib.list_local_devices())


사용 가능한 GPU: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
사용 가능한 장치 목록:
[name: "/device:CPU:0"
device_type: "CPU"
memory_limit: 268435456
locality {
}
incarnation: 13865507343045769809
xla_global_id: -1
, name: "/device:GPU:0"
device_type: "GPU"
memory_limit: 10062135296
locality {
  bus_id: 1
  links {
  }
}
incarnation: 3621622776237905020
physical_device_desc: "device: 0, name: NVIDIA GeForce RTX 3060, pci bus id: 0000:08:00.0, compute capability: 8.6"
xla_global_id: 416903419
]


In [30]:
# image crop
def crop_knee(image, crop_height, crop_width):
  h, w = image.shape[:2]
  center = (h//2, w//2)
  cropped_img = image[
      center[0] - crop_height//2 : center[0] + crop_height//2,
      center[1] - crop_width//2 : center[1] + crop_width//2
  ]
  return cropped_img
# HE CLAHE
def apply_clahe(image):
  clahe = cv2.createCLAHE(clipLimit=2, tileGridSize=(8, 8))
  return clahe.apply(image)
# Data Load & Preprocessing
def process_load_img(input_dir, crop_height, crop_width):
  images = []
  labels = []
  for class_folder in os.listdir(input_dir):
    class_input_path = os.path.join(input_dir, class_folder)
    class_label = int(class_folder)
    for img_file in os.listdir(class_input_path):
      img_path = os.path.join(input_dir, class_folder)
      img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
      if img is not None:
        cropped_img = crop_knee(img, crop_height, crop_width)
        clahe_img = apply_clahe(cropped_img)
        images.append(clahe_img)
        labels.append(class_label)
    return np.array(images), np.array(labels)
# Data Merge & Split(new dataset)
def merge_split_datas(train_dir, val_dir, crop_height, crop_width, test_size=.2):
  X_train, y_train = process_load_img(train_dir, crop_height, crop_width)
  X_val, y_val = process_load_img(val_dir, crop_height, crop_width)
  X_comb = np.concatenate((X_train, X_val), axis=0)
  y_comb = np.concatenate((y_train, y_val), axis=0)
  X_train_n, X_val_n, y_train_n, y_val_n = train_test_split(X_comb, y_comb, test_size=test_size, random_state=42, stratify=y_comb)
  return X_train_n, X_val_n, y_train_n, y_val_n

# Data Merge & Split(new dataset)
def merge_split_datas(train_dir, val_dir, test_dir, crop_height, crop_width, test_size=.2):
    X_train, y_train = process_load_img(train_dir, crop_height, crop_width)
    X_val, y_val = process_load_img(val_dir, crop_height, crop_width)
    X_test, y_test = process_load_img(test_dir, crop_height, crop_width)
    X_comb = np.concatenate((X_train, X_val), axis=0)
    y_comb = np.concatenate((y_train, y_val), axis=0)
    X_train_n, X_val_n, y_train_n, y_val_n = train_test_split(X_comb, y_comb, test_size=test_size, random_state=42, stratify=y_comb)

    return X_train_n, X_val_n, X_test, y_train_n, y_val_n, y_test

In [35]:

input_shape = (crop_height, crop_width, 1)
class_num = 3

def create_model(model_name='ResNet50', optimizer='adam', dropout_rate=.2, fc_units=512, class_num=3):
    input_layer = Input(shape=input_shape)
    conv_layer = Conv2D(3, (3, 3), padding='same')(input_layer)
    if model_name == 'ResNet50':
        base = ResNet50(weights='imagenet', include_top=False, input_tensor=conv_layer)
    elif model_name == 'VGG16':
        base = VGG16(weights='imagenet', include_top=False, input_tensor=conv_layer)
    elif model_name == 'VGG19':
        base = VGG19(weights='imagenet', include_top=False, input_tensor=conv_layer)
    elif model_name == 'MobileNetV2':
        base = MobileNetV2(weights='imagenet', include_top=False, input_tensor=conv_layer)
    else:
        base = DenseNet121(weights='imagenet', include_top=False, input_tensor=conv_layer)
    base_out = base.output
    flatten = Flatten()(base_out)
    fc = Dense(fc_units, activation='relu')(flatten)
    dropout = Dropout(dropout_rate)(fc)
    output = Dense(class_num, activation='softmax')(dropout)
    model = Model(inputs=input_layer, outputs=output)
    
    if optimizer == 'adam':
        optimizer_instance = Adam()
    elif optimizer == 'rmsprop':
        optimizer_instance = RMSprop()
    else:
        optimizer_instance = SGD()
    
    model.compile(optimizer=optimizer_instance, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

model = KerasClassifier(build_fn=create_model, verbose=0)

param_grid = {
    'model_name': ['ResNet50', 'DenseNet', 'VGG16', 'VGG19', 'MobileNetV2'],
    'batch_size': [16, 32],
    'epochs': [5, 10, 20],
    'optimizer': ['adam', 'rmsprop', 'SGD'],
    'dropout_rate': [0.2, 0.4, 0.5],
    'fc_units': [512, 1024],
    'class_num': [class_num]  # Passing class_num here
}

grid = GridSearchCV(estimator=model, param_grid=param_grid, n_jobs=-1, cv=3)
grid_res = grid.fit(X_train_n, y_train_n)
means = grid_res.cv_results_['mean_test_score']
std = grid_res.cv_results_['std_test_score']
params = grid_res.cv_results_['params']
best_model = grid_res.best_estimator_.model
test_loss, test_acc = best_model.evaluate(X_test, y_test, verbose=0)
print(f"Test Accuracy: {test_acc}")


BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.