In [None]:
from tensorflow.python.client import device_lib
device_lib.list_local_devices()

In [None]:
import os
import numpy as np
import pandas as pd
from glob import glob
from itertools import chain
from sklearn.metrics import roc_curve, auc, roc_auc_score, accuracy_score, average_precision_score
from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator

import tensorflow as tf

---
## 데이터 전처리 및 데이터 분리
---

In [None]:
DATA_DIR = ''
image_size = 128
batch_size = 32

In [None]:
df = pd.read_csv(f'{DATA_DIR}Data_Entry_2017.csv')

In [None]:
data_image_paths = {os.path.basename(x): x for x in glob(os.path.join(DATA_DIR, 'images*', '*', '*.png'))}

In [None]:
df['path'] = df['Image Index'].map(data_image_paths.get)

In [None]:
labels = np.unique(list(chain(*df['Finding Labels'].map(lambda x: x.split('|')).tolist())))
labels = [x for x in labels if len(x) > 0]

In [None]:
all_labels = labels
#all_labels

In [None]:
for label in labels:
    if len(label) > 1:
        df[label] = df['Finding Labels'].map(lambda finding: 1.0 if label in finding else 0.0)

In [None]:
#df.head()

In [None]:
from sklearn.model_selection import train_test_split

train_and_valid_df, test_df = train_test_split(df, 
                                               test_size=0.2, 
                                               random_state=2021, shuffle=True, stratify=df['Finding Labels'])

train_df, valid_df = train_test_split(train_and_valid_df, 
                                      test_size=0.3, 
                                      random_state=2021, shuffle=True, stratify=train_and_valid_df['Finding Labels'])

In [None]:
print(f'train_and_valid_df {train_and_valid_df.shape[0]} train {train_df.shape[0]} valid_df {valid_df.shape[0]} Test_df: {test_df.shape[0]}')

In [None]:
valid_df['Finding Labels'].value_counts()

In [None]:
test_df['Finding Labels'].value_counts()

In [None]:
#test_df

In [None]:
train_df['labels'] = train_df.apply(lambda x: x['Finding Labels'].split('|'), axis=1)
valid_df['labels'] = valid_df.apply(lambda x: x['Finding Labels'].split('|'), axis=1)
test_df['labels'] = test_df.apply(lambda x: x['Finding Labels'].split('|'), axis=1)

In [None]:
#train_df

In [None]:
core_idg = ImageDataGenerator(rescale=1 / 255)

train_gen = core_idg.flow_from_dataframe(dataframe=train_df,
                                             directory=None,
                                             x_col='path',
                                             y_col='labels',
                                             class_mode='categorical',
                                             batch_size=batch_size,
                                             classes=labels,
                                             target_size=(image_size, image_size))

valid_gen = core_idg.flow_from_dataframe(dataframe=valid_df,
                                             directory=None,
                                             x_col='path',
                                             y_col='labels',
                                             class_mode='categorical',
                                             batch_size=batch_size,
                                             classes=labels,
                                             target_size=(image_size, image_size))

test_gen = core_idg.flow_from_dataframe(dataframe=test_df,
                                             directory=None,
                                             x_col='path',
                                             y_col='labels',
                                             class_mode='categorical',
                                             batch_size=batch_size,
                                             classes=labels,
                                             target_size=(image_size, image_size))


valid_X, valid_Y = next(core_idg.flow_from_dataframe(dataframe=valid_df,
                                                       directory=None,
                                                       x_col='path',
                                                       y_col='labels',
                                                       class_mode='categorical',
                                                       batch_size=512,
                                                       classes=labels,
                                                       target_size=(image_size, image_size)))

test_X, test_Y = next(core_idg.flow_from_dataframe(dataframe=test_df,
                                                       directory=None,
                                                       x_col='path',
                                                       y_col='labels',
                                                       class_mode='categorical',
                                                       batch_size=512,
                                                       classes=labels,
                                                       target_size=(image_size, image_size)))

---
## 네트워크 모델 정의후 데이터 연결, 학습 진행
---

In [None]:
import tensorflow as tf
base_model = tf.keras.applications.DenseNet121(input_shape=(image_size, image_size, 3), weights=None, include_top=True, classes=15)
base_model.summary()

In [None]:
from keras.metrics import AUC
import keras


base_model.compile(optimizer='adam', 
              loss = 'binary_crossentropy', 
              metrics = [AUC(name='AUC'), 'accuracy'])

In [None]:
history = base_model.fit(train_gen,
                    epochs=50,
                    validation_data=(valid_X, valid_Y),)

In [None]:
history_df = pd.DataFrame(history.history)
history_df.to_csv('history_df.csv')
history_df

In [None]:
history_df.loc[:, ['loss', 'val_loss']].plot();
plt.savefig('loss,val_loss.png')
history_df.loc[:, ['accuracy', 'val_accuracy']].plot();
plt.savefig('accuracy,val_accuracy.png')
print("Minimum validation loss: {}".format(history_df['val_loss'].min()))
print("Maximum validation accuracy: {}".format(history_df['val_accuracy'].max()))

### validation_data 에 대한 정보

In [None]:
for c_label, s_count in zip(all_labels, 100*np.mean(valid_Y,0)):
    print('%s: %2.2f%%' % (c_label, s_count))

In [None]:
valid_pred_Y = model.predict(valid_X, batch_size = 32, verbose = True)

In [None]:
from sklearn.metrics import roc_curve, auc
fig, c_ax = plt.subplots(1,1, figsize = (9, 9))
for (idx, c_label) in enumerate(all_labels):
    fpr, tpr, thresholds = roc_curve(valid_Y[:,idx].astype(int), valid_pred_Y[:,idx])
    c_ax.plot(fpr, tpr, label = '%s (AUC:%0.2f)'  % (c_label, auc(fpr, tpr)))
c_ax.legend()
c_ax.set_xlabel('False Positive Rate')
c_ax.set_ylabel('True Positive Rate')
fig.savefig('valid_trained_net.png')

In [None]:
from sklearn.metrics import roc_auc_score
auc = roc_auc_score(valid_Y, valid_pred_Y)
print('ROC AUC: %f' % auc)

### test_data 에 대한 정보

In [None]:
for c_label, s_count in zip(all_labels, 100*np.mean(test_Y,0)):
    print('%s: %2.2f%%' % (c_label, s_count))

In [None]:
test_pred_Y = model.predict(test_X, batch_size = 32, verbose = True)

In [None]:
from sklearn.metrics import roc_curve, auc
fig, c_ax = plt.subplots(1,1, figsize = (9, 9))
for (idx, c_label) in enumerate(all_labels):
    fpr, tpr, thresholds = roc_curve(test_Y[:,idx].astype(int), test_pred_Y[:,idx])
    c_ax.plot(fpr, tpr, label = '%s (AUC:%0.2f)'  % (c_label, auc(fpr, tpr)))
c_ax.legend()
c_ax.set_xlabel('False Positive Rate')
c_ax.set_ylabel('True Positive Rate')
fig.savefig('test_trained_net.png')

In [None]:
from sklearn.metrics import roc_auc_score
auc = roc_auc_score(test_Y, test_pred_Y)
print('ROC AUC: %f' % auc)

### test 셋에 대한 평가

In [None]:
# Evaluate the model on the test data using `evaluate`
print("Evaluate on test data")
results = model.evaluate(test_X, test_Y, batch_size=128, return_dict=True)
print("test loss, test acc:", results)

# Generate predictions (probabilities -- the output of the last layer)
# on new data using `predict`
print("Generate predictions for 3 samples")
predictions = model.predict(test_X[:3])
print("predictions shape:", predictions.shape)

In [None]:
predictions=pd.DataFrame(predictions)
predictions.to_csv("predictions.csv")

In [None]:
#pd.DataFrame(predictions)

In [None]:
#test_X[:3]

In [None]:
#test_df[:3]

In [None]:
test_df.to_csv("test_df.csv")

### 모델 저장

In [None]:
model.save('model.h5')
model.save_weights('model_weights.h5')

## 저장한 모델 불러와서 Feature Map, Grad-Cam 확인하기

In [None]:
from tensorflow.keras.applications.vgg16 import preprocess_input
from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.models import Model
from matplotlib import pyplot
from numpy import expand_dims
from matplotlib import pyplot

In [None]:
from keras.models import load_model
model_2 = load_model('model.h5')
model_2.summary()

In [None]:
model2 = Model(inputs=model_2.input, outputs=model_2.layers[3].output)
model2.summary()

In [None]:
image = load_img("images/images/00000001_000.PNG" , target_size=(128,128))

image = img_to_array(image)

image = expand_dims(image, axis=0)

In [None]:
fig = pyplot.figure(figsize=(10,10))
for i in range(1,features.shape[3]+1):

    pyplot.subplot(16,4,i)
    pyplot.imshow(features[0,:,:,i-1] , cmap='gray')
    
pyplot.show()

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras

from IPython.display import Image, display
import matplotlib.pyplot as plt
import matplotlib.cm as cm

In [None]:
              
img_size = (128, 128)

preprocess_input = keras.applications.densenet.preprocess_input    
decode_predictions = keras.applications.densenet.preprocess_input   

last_conv_layer_name = "conv1/conv"

img_path = "images/images/00000001_000.PNG"

display(Image(img_path))

In [None]:
def get_img_array(img_path, size):
    img = keras.preprocessing.image.load_img(img_path, target_size=size)
    array = keras.preprocessing.image.img_to_array(img)
    array = np.expand_dims(array, axis=0)
    return array


def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None):
    grad_model = tf.keras.models.Model(
        [model.inputs], [model.get_layer(last_conv_layer_name).output, model.output]
    )
    with tf.GradientTape() as tape:
        last_conv_layer_output, preds = grad_model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(preds[0])
        class_channel = preds[:, pred_index]

    grads = tape.gradient(class_channel, last_conv_layer_output)

    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)

    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    return heatmap.numpy()

In [None]:
img_array = preprocess_input(get_img_array(img_path, size=img_size))

model = load_model('model.h5')

model.layers[-1].activation = None

preds = model.predict(img_array)

heatmap = make_gradcam_heatmap(img_array, model, last_conv_layer_name)

plt.matshow(heatmap)
plt.show()

In [None]:
def save_and_display_gradcam(img_path, heatmap, cam_path="cam.jpg", alpha=0.4):

    img = keras.preprocessing.image.load_img(img_path)
    img = keras.preprocessing.image.img_to_array(img)


    heatmap = np.uint8(255 * heatmap)


    jet = cm.get_cmap("jet")


    jet_colors = jet(np.arange(256))[:, :3]
    jet_heatmap = jet_colors[heatmap]


    jet_heatmap = keras.preprocessing.image.array_to_img(jet_heatmap)
    jet_heatmap = jet_heatmap.resize((img.shape[1], img.shape[0]))
    jet_heatmap = keras.preprocessing.image.img_to_array(jet_heatmap)


    superimposed_img = jet_heatmap * alpha + img
    superimposed_img = keras.preprocessing.image.array_to_img(superimposed_img)


    superimposed_img.save(cam_path)


    display(Image(cam_path))


save_and_display_gradcam(img_path, heatmap)