In [3]:
# !pip install scikeras
from scikeras import KerasClassifier
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.model_selection import cross_val_score

class KerasClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, build_fn=None, epochs=1, batch_size=32, verbose=0):
        self.build_fn = build_fn
        self.epochs = epochs
        self.batch_size = batch_size
        self.verbose = verbose
        self.model_ = None
    def fit(self, X, y):
        self.model_ = self.build_fn()
        self.model_.fit(X, y, epochs=self.epochs, batch_size=self.batch_size, verbose=self.verbose)
        return self
    def predict(self, X):
        return (self.model_.predict(X)>0.5).astype('int32')
    def predict_proba(self, X):
        return self.model_.predict(X)

# Define your Keras model building function
def build_fn():
    model = Sequential([
        Dense(12, input_dim=8, activation='relu'),
        Dense(8, activation='relu'),
        Dense(1, activation='sigmoid')
    ])
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

# Initialize your KerasClassifier with the build function and other parameters
model = KerasClassifier(build_fn=build_fn, epochs=1, batch_size=32, verbose=0)


import os
import cv2
import numpy as np
from sklearn.model_selection import train_test_split


# image crop
def crop_knee(image, crop_height, crop_width):
  h, w = image.shape[:2]
  center = (h//2, w//2)
  cropped_img = image[
      center[0] - crop_height//2 : center[0] + crop_height//2,
      center[1] - crop_width//2 : center[1] + crop_width//2
  ]
  return cropped_img

# HE CLAHE
def apply_clahe(image):
  clahe = cv2.createCLAHE(clipLimit=2, tileGridSize=(8, 8))
  return clahe.apply(image)

# equalize-normalize
def processed(image):
    # equalized = cv2.equalizeHist(image)
    normalized = cv2.normalize(image, None, 0,255, cv2.NORM_MINMAX)
    return normalized

# Data Load & Preprocessing
def process_load_img(input_dir, crop_height, crop_width):
  images = []
  labels = []
  for class_folder in os.listdir(input_dir):
    class_input_path = os.path.join(input_dir, class_folder)
    class_label = int(class_folder)
    print(f"class_folder:{class_input_path}")
    print("==============================")
    
    for img_file in os.listdir(class_input_path):
      img_path = os.path.join(input_dir, class_folder)
      img_r_path = os.path.join(img_path, img_file)
      print(f"img path: {img_path}")
      print("==============================")
      
      img = cv2.imread(img_r_path, cv2.IMREAD_GRAYSCALE)
      print(f"img:{img}")
      
      if img is not None:
        cropped_img = crop_knee(img, crop_height, crop_width) # 그려서
        clahe_img = apply_clahe(cropped_img) # 전처리
        prep_img = processed(clahe_img)
        images.append(prep_img) 
        labels.append(class_label)
    return np.array(images), np.array(labels)

# Data Merge & Split(new dataset)
def merge_split_datas(train_dir, val_dir, crop_height, crop_width, test_size=.2):
  X_train, y_train = process_load_img(train_dir, crop_height, crop_width)
  X_val, y_val = process_load_img(val_dir, crop_height, crop_width)
  X_comb = np.concatenate((X_train, X_val), axis=0)
  y_comb = np.concatenate((y_train, y_val), axis=0)
  X_train_n, X_val_n, y_train_n, y_val_n = train_test_split(X_comb, y_comb, test_size=test_size, random_state=42, stratify=y_comb)
  return X_train_n, X_val_n, y_train_n, y_val_n




train_dir = './datas/osteoarthritis/train'
val_dir = './datas/osteoarthritis/val'

crop_height = 256
crop_width = 256

def test_load_img(test_dir, t_crop_height, t_crop_width):
  t_images = []
  t_labels = []
  for class_folder in os.listdir(test_dir):
    # 0, 1, 2, 3, 4
    class_input_path = os.path.join(test_dir, class_folder)
    class_label = int(class_folder)
    print(f"class_folder:{class_input_path}")
    print("==============================")
    
    for img_file in os.listdir(class_input_path):
      img_path = os.path.join(test_dir, class_folder)
      img_path = os.path.join(img_path, img_file)
      print(f"img path: {img_path}")
      print("==============================")
      
      img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
      print(f"img:{img}")
      
  return np.array(t_images), np.array(t_labels)

# process_load_img(train_dir, crop_height, crop_width)
t_crop_height = 256
t_crop_width = 256
test_dir = './datas/osteoarthritis/test'
X_test,y_test = test_load_img(test_dir, t_crop_height, t_crop_width)



from tensorflow.keras.applications import ResNet50, DenseNet121
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Flatten, Dropout, Input, Conv2D
from scikeras.wrappers import KerasClassifier
from sklearn.model_selection import GridSearchCV
import numpy as np
import tensorflow as tf


input_shape = (crop_height, crop_width)
class_num = 5

def create_model(model_name='Resnet50', optimizer='adam', dropout_rate=.5, fc_units=512, class_num=class_num):
  input_layer = Input(shape=input_shape)
  conv_layer = Conv2D(3, (3, 3), padding='same')(input_layer)
  if model_name == 'ResNet50':
    base = ResNet50(weights='imagenet', include_top=False, input_shape=(crop_height, crop_width))
  elif model_name == 'VGG16':
    base = VGG16(weights='imagenet', include_top=False, input_shape=(crop_height, crop_width))
  else:
    base = DenseNet121(weights='imagenet', include_top=False, input_shape=(crop_height, crop_width))
    
  base_out = base(conv_layer)
  flatten = Flatten()(base_out)
  fc = Dense(fc_units, activation='relu')(flatten)
  dropout = Dropout(dropout_rate)(fc)
  output = Dense(class_num, activation='softmax')(dropout)
  model = Model(inputs=input_layer, outputs=output)
  model.Compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
  return model

model = KerasClassifier(build_fn=create_model, verbose=0)
param_grid = {
    'model_name':['ResNet50', 'DenseNet', 'VGG16'],
    'batch_size':[16, 32],
    'epochs':[30, 50, 80],
    'optimizer':['adam', 'rmsprop']
    # 'dropout_rate':[0.2, 0.4, 0.5],
    # 'fc_units':[512, 1024]
}

grid = GridSearchCV(estimator=model, param_grid=param_grid, n_jobs=-1, cv=3)
grid_res = grid.fit(X_train_n, y_train_n)
means = grid_res.cv_results_['mean_test_score']
std = grid_res.cv_results_['std_test_score']
params = grid_res.cv_results_['params']
best_model = grid_res.best_estimator_.model

test_loss, test_acc = best_model.evaluate(X_test, y_test, verbose=0)
print(f"Test Accuracy: {test_acc}")

X_train_n, X_val_n, y_train_n, y_val_n = merge_split_datas(train_dir, val_dir, crop_height, crop_width, test_size=.2)