In [1]:
!pip install split-folders

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting split-folders
  Downloading split_folders-0.5.1-py3-none-any.whl (8.4 kB)
Installing collected packages: split-folders
Successfully installed split-folders-0.5.1


In [2]:
import os
import zipfile

import cv2
import numpy as np

from matplotlib import pyplot as plt
import matplotlib.cm as cm

import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img

import pickle
from tqdm.notebook import tqdm

import splitfolders
from tensorflow.keras.applications.resnet import ResNet50
import datetime


In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## 데이터 Load



### zip파일 풀기

In [4]:
base_path = '/content/drive/MyDrive/image_recognition/data/image_data/img/'

In [14]:
name_lst = os.listdir('/content/drive/MyDrive/image_recognition/data/image_data/img')[:10]
print(name_lst)

['blepharitis.zip', 'Cataract.zip', 'conjunctivitis.zip', 'corneal.zip', 'corneal_ulcer.zip', 'Entropion.zip', 'epiphora.zip', 'Nuclear_Sclerosis.zip', 'PIH.zip', 'Xanthelasma.zip']


In [13]:
 with zipfile.ZipFile(base_path + name_lst, 'r') as zip_ref:
    zip_ref.extractall('/content')

Copying files: 10684 files [1:06:36,  2.67 files/s]


In [12]:
for name in name_lst: 
  with zipfile.ZipFile(base_path + name, 'r') as zip_ref:
    zip_ref.extractall('/content')

FileNotFoundError: ignored

### split folder

In [15]:
names = [name.split('.')[0] for name in name_lst]
print(names)

['blepharitis', 'Cataract', 'conjunctivitis', 'corneal', 'corneal_ulcer', 'Entropion', 'epiphora', 'Nuclear_Sclerosis', 'PIH', 'Xanthelasma']


In [16]:
for name in names:
  splitfolders.ratio('/content/'+name, output=f'{name}_dataset', seed=77, ratio=(0.8, 0.1, 0.1))

Copying files: 13759 files [00:06, 2086.38 files/s]
Copying files: 13758 files [00:14, 917.25 files/s] 
Copying files: 19198 files [00:23, 804.46 files/s]
Copying files: 9600 files [00:11, 803.87 files/s] 
Copying files: 13758 files [00:19, 716.65 files/s] 
Copying files: 13365 files [00:29, 538.89 files/s]

KeyboardInterrupt: ignored

## Zero-Centering

In [None]:
# with open(f'/content/drive/MyDrive/image_recognition/이중분류/zero_centering_pickle/{name]}_train.pickle', 'rb') as f:
#   mean_img = pickle.load(f)


In [None]:
# sub_mean_img = lambda image: image - mean_img

### ImageGenerator

In [None]:
batch_size = 128
epochs = 3

In [None]:
eval_lst = []
pred_lst = []
names = ['epiphora']
for name in names:
  # zero-centering 하기 위한 평균값들 pickle로 불러오기
  with open(f'/content/drive/MyDrive/image_recognition/이중분류/zero_centering_pickle/{name}_train.pickle', 'rb') as f:
    mean_img = pickle.load(f)

  sub_mean_img = lambda image: image - mean_img
  train_datagen = ImageDataGenerator(preprocessing_function = sub_mean_img)
  val_datagen = ImageDataGenerator(preprocessing_function = sub_mean_img)
  test_datagen = ImageDataGenerator(preprocessing_function = sub_mean_img)
  # ImageGenerator
  train_generator = train_datagen.flow_from_directory(f'/content/{name}_dataset/train', batch_size=batch_size, target_size=(224, 224), class_mode = 'binary' )
  val_generator = val_datagen.flow_from_directory(f'/content/{name}_dataset/val', batch_size=batch_size, target_size=(224, 224), class_mode = 'binary' )
  test_generator = test_datagen.flow_from_directory(f'/content/{name}_dataset/test', batch_size=batch_size, target_size=(224, 224), class_mode = 'binary' )

  # Modeling
  base_model_50 = ResNet50(include_top=False, input_shape = (224,224 ,3), weights = 'imagenet')

  base_model_50.trainable = True	

  for layer in base_model_50.layers[:-10]:
    layer.trainable = False		

  inputs = tf.keras.Input(shape=(224, 224, 3))

  x = base_model_50(inputs, training=False) # batchnorm 부분 update 방지

  x = tf.keras.layers.Flatten(input_shape=base_model_50.output_shape[1:])(x)
  x = tf.keras.layers.Dense(256, activation='relu')(x)
  x = tf.keras.layers.Dropout(0.5)(x)
  outputs = tf.keras.layers.Dense(1, activation='sigmoid')(x)
  s_model_res50 = tf.keras.Model(inputs, outputs)

  # Model Compile
  s_model_res50.compile(optimizer = 'adam',
                loss = 'binary_crossentropy',
                metrics=['accuracy', 'Recall'])

  # tensorboard log 저장
  # model_name = name + 'model'
  # current_time = datetime.datetime.now().strftime("%m%d%H%M")
  # log_dir = "/content/drive/MyDrive/image_recognition/data/image_data/binary_log" + "/" + 'First_trial/' +current_time + model_name
  # board = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=2) # epoch마다 히스토그램 계산


  # fit 
  history = s_model_res50.fit(train_generator, validation_data = val_generator, epochs = epochs)

  # save model

  # s_model_res50.save(f'/content/drive/MyDrive/image_recognition/data/image_data/binary_models/{name}_model')
  
  # 평가지표
  # evaluation = s_model_res50.evaluate(test_generator)
  predict = s_model_res50.predict(test_generator)
  # eval_lst.append(evaluation)  # 1. Loss값 2. Accuracy 3. Recall
  pred_lst.append(predict)


In [None]:
['blepharitis', 'Cataract', 'conjunctivitis', 'corneal', 'corneal_ulcer', 'Entropion', 'epiphora', 'Nuclear_Sclerosis', 'PIH', 'Xanthelasma']

In [None]:
!unzip '/content/drive/MyDrive/image_recognition/data/image_data/test_img/test_balanced_10.zip'

In [None]:
# inference 용 각
test_dataset = {}
base_path = '/content/test_balanced_10/'
name_lst = os.listdir(base_path)

for name in name_lst:
  sub_path = base_path + name + '/'
  img_path = [sub_path + img for img in os.listdir(sub_path)]

  test_imgs = [cv2.imread(img) for img in img_path]
  test_imgs = np.array(test_imgs)
  test_dataset[name] = test_imgs



In [None]:
# 질병 dict
eye_dic = {0:'blepharitis', 1: 'Cataract', 2: 'conjunctivitis', 3: 'corneal', 4: 'corneal_ulcer', 5: 'Entropion', 6: 'epiphora', 
           7: 'Nuclear_Sclerosis', 8: 'PIH', 9: 'Xanthelasma'}
names = ['blepharitis', 'Cataract', 'conjunctivitis', 'corneal', 'corneal_ulcer', 'Entropion', 'epiphora', 'Nuclear_Sclerosis', 'PIH', 'Xanthelasma']
# 각 질병 모델의 예측값을 담을 list
model_predicts = []

for name in names: 
  # 1. 각각 saved 된 model 을 불러오기
  model_path = f'/content/drive/MyDrive/image_recognition/data/image_data/binary_models/{name}_model'
  eye_model = tf.keras.models.load_model(model_path)

  # zero-centering을 위한 mean 값
  # with open(f'/content/drive/MyDrive/image_recognition/이중분류/zero_centering_pickle/{name}_train.pickle', 'rb') as f:
  #   mean_img = pickle.load(f)
  

  # # test image 가져오기 
  # test_imgs = test_imgs - mean_img
  # y_pred 
  y_predict = eye_model.predict(test_dataset['PIH'])

  # 각 사진들의 평균 확률값
  mean_predict = np.mean(y_predict, axis = 0)

  model_predicts.append(mean_predict)


In [None]:
model_predicts

[array([0.71762884], dtype=float32),
 array([1.], dtype=float32),
 array([0.90791315], dtype=float32),
 array([0.9992721], dtype=float32),
 array([0.99659693], dtype=float32),
 array([0.9999615], dtype=float32),
 array([0.49949834], dtype=float32),
 array([0.9994904], dtype=float32),
 array([0.9994952], dtype=float32)]

In [None]:
# 5. 뽑히는 index 값을 이용해서 질병명 print 하기 
standard_value = 0.6
if all(standard_value > x for x in model_predicts):
  print('정상일 확률이 높습니다')
else:
  for idx, val in enumerate(model_predicts):
    if val >= standard_value:
      print(f'{eye_dic[idx]}이 {val * 100}% 확률로 의심이 됩니다')


blepharitis이 [71.762886]% 확률로 의심이 됩니다
Cataract이 [100.]% 확률로 의심이 됩니다
conjunctivitis이 [90.79131]% 확률로 의심이 됩니다
corneal이 [99.92721]% 확률로 의심이 됩니다
corneal_ulcer이 [99.65969]% 확률로 의심이 됩니다
Entropion이 [99.99615]% 확률로 의심이 됩니다
Nuclear_Sclerosis이 [99.949036]% 확률로 의심이 됩니다
PIH이 [99.949524]% 확률로 의심이 됩니다


In [None]:
# 6. label 값이 뽑히는 질병들에 포함되면 정답(1) / 없는경우 실패(0) (= model 에측값)
# 7. confusion matrix -> prediction: 포함되면(positive) 포함안되면(negative) / reality: 각 질병의 유무의 따라 True / False
# 8. standard value를 바꾸면서 confusion matrix가 어떻게 바뀌는지 (최대한 false negative 낮추는 방향으로 --> Recall 높이는 방향)


