In [None]:
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('darkgrid')

import os
from glob import glob

from keras.preprocessing.image import ImageDataGenerator
from keras.applications.densenet import DenseNet121
from keras.layers import Dense, GlobalAveragePooling2D
from keras.models import Model
from keras import backend as K
from keras.models import load_model

from itertools import chain

In [None]:
#파일 불러오기
df = pd.read_csv('/content/drive/MyDrive/AI/section4/project/chunan.csv', index_col=0)
num_obs = len(df)
print('Number of observations in dataset:',num_obs)

# 이미지 경로
my_glob = glob('/content/drive/MyDrive/AI/section4/project/chunan/*.jpg')
print('Number of observations in imagefolders:', len(my_glob))

# csv파일에 경로 붙이기
full_img_paths = {os.path.basename(x): x for x in my_glob}
df['full_path'] = df['Id'].map(full_img_paths.get)

# 예상육량에 대해 one hot encoding
from itertools import chain
all_labels = np.unique(list(chain(*df['예상육량'].map(lambda x: x.split('|')).tolist())))
all_labels = [x for x in all_labels if len(x)>0]
for c_label in all_labels:
    if len(c_label)>1: # 라벨 중 0값 제외
        df[c_label] = df['예상육량'].map(lambda finding: 1 if c_label in finding else 0)


In [None]:
idx = df['월령'].sort_values().index[-2]
plt.imshow(plt.imread(df.iloc[idx]['full_path']), cmap='bone')

In [None]:
sns.countplot(df['예상육질'])

In [None]:
#sns.countplot()
sns.barplot(x=all_labels, 
            y=df[all_labels].sum(), 
            order = df[all_labels].sum().sort_values(ascending=False).index)

plt.tick_params(axis='x')

In [None]:
from sklearn.model_selection import GroupShuffleSplit
from sklearn.model_selection import train_test_split

n = None # use all data
gss = GroupShuffleSplit(n_splits=1, train_size=.7, random_state=42)

for train_idx, test_idx in gss.split(df[: n], groups = df[: n]['월령'].values):
    train_df = df.iloc[train_idx]
    test_df, valid_df = train_test_split(df.iloc[test_idx], 
                                   test_size = 0.2, 
                                   random_state = 42) #should add stratified sampling
    
train_df.head()
test_df.head()

In [None]:
def get_train_generator(df, image_dir, x_col, y_cols, shuffle=True, batch_size=32, seed=1, target_w = 320, target_h = 320):
   
    print("getting train generator...") 
    # 이미지 정규화
    image_generator = ImageDataGenerator(
        samplewise_center=True,
        samplewise_std_normalization= True)
    
    # 이미지 경로에 특정 batch size 적용 
    generator = image_generator.flow_from_dataframe(
            dataframe=df,
            directory=image_dir,
            x_col=x_col,
            y_col=y_cols,
            class_mode="raw",
            batch_size=batch_size,
            shuffle=shuffle,
            seed=seed,
            target_size=(target_w,target_h))
    
    return generator

def get_test_and_valid_generator(valid_df, test_df, train_df, image_dir, x_col, y_cols, sample_size=100, batch_size=32, seed=1, target_w = 320, target_h = 320):
    
    print("getting train and valid generators...")
    # 데이터 셋에서 generator 적용
    raw_train_generator = ImageDataGenerator().flow_from_dataframe(
        dataframe=train_df, 
        directory=IMAGE_DIR, 
        x_col=x_col, 
        y_col=y_cols, 
        class_mode="raw", 
        batch_size=sample_size, 
        shuffle=True, 
        target_size=(target_w, target_h))
    # get data sample
    batch = raw_train_generator.next()
    data_sample = batch[0]

    # 테스트 generator를 위한 평균 및 표준편차 적용
    image_generator = ImageDataGenerator(
        featurewise_center=True,
        featurewise_std_normalization= True)
    
    # generator를 훈련 데이터에 적용
    image_generator.fit(data_sample)

    # generator를 test 데이터에 적용
    valid_generator = image_generator.flow_from_dataframe(
            dataframe=valid_df,
            directory=image_dir,
            x_col=x_col,
            y_col=y_cols,
            class_mode="raw",
            batch_size=batch_size,
            shuffle=False,
            seed=seed,
            target_size=(target_w,target_h))

    test_generator = image_generator.flow_from_dataframe(
            dataframe=test_df,
            directory=image_dir,
            x_col=x_col,
            y_col=y_cols,
            class_mode="raw",
            batch_size=batch_size,
            shuffle=False,
            seed=seed,
            target_size=(target_w,target_h))
    return valid_generator, test_generator       


In [None]:
IMAGE_DIR = None # 모든 행에 이미지 경로 추가
train_generator = get_train_generator(train_df, IMAGE_DIR, "full_path", all_labels)
valid_generator, test_generator= get_test_and_valid_generator(valid_df, test_df, train_df, IMAGE_DIR, "full_path", all_labels)

In [None]:
x, y = train_generator.__getitem__(2)
plt.imshow(x[0]);

In [None]:
def compute_class_freqs(labels):
    
    # 개체의 예상육량의 개수
    N = len(labels)
    
    positive_frequencies = (np.sum(labels, 0)) / N
    negative_frequencies = (1- positive_frequencies)

    return positive_frequencies, negative_frequencies



def get_weighted_loss(pos_weights, neg_weights, epsilon=1e-7):
    
    def weighted_loss(y_true, y_pred):

        # 손실함수 초기화
        loss = 0.0

        for i in range(len(pos_weights)):
            # 특성에 평균 가중치 적용
            loss += -1 * K.mean(pos_weights * y_true * K.log(y_pred + epsilon) + 
                          (1 - y_true) * neg_weights * K.log(1 - y_pred + epsilon))
            
        return loss
    
    return weighted_loss

freq_pos, freq_neg = compute_class_freqs(train_generator.labels)
pos_weights = freq_neg
neg_weights = freq_pos
pos_contribution = freq_pos * pos_weights 
neg_contribution = freq_neg * neg_weights

In [None]:
# 예상육량 불균형 균형화
data = pd.DataFrame({"Class": all_labels, "Label": "Positive", "Value": pos_contribution})
data = data.append([{"Class": all_labels[l], "Label": "Negative", "Value": v} 
                        for l,v in enumerate(neg_contribution)], ignore_index=True)

sns.barplot(x="Class", y="Value", hue="Label" ,data=data);

In [None]:
base_model = DenseNet121(
    include_top=False,
    weights="imagenet",
    input_tensor=None,
    input_shape=None,
)
layer_names = [layer.name for layer in base_model.layers]
layers = [base_model.get_layer(name).output for name in layer_names]


down_stack = Model(inputs=base_model.input, outputs=layers)

down_stack.trainable = False    

x = base_model.output

# global spatial average pooling layer 추가
x = GlobalAveragePooling2D()(x)

# 층 추가
predictions = Dense(len(all_labels), activation="sigmoid")(x)

model = Model(inputs=base_model.input, outputs=predictions)
model.compile(metrics=['accuracy'] ,optimizer='adam', loss=get_weighted_loss(pos_weights, neg_weights))

history = model.fit(train_generator, 
                    validation_data=valid_generator,
                    steps_per_epoch=5, 
                    validation_steps=100, 
                    epochs = 20)

In [None]:
import tensorflow
tensorflow.keras.utils.plot_model(model, show_shapes=True)

In [None]:
plt.plot(history.history['loss'])
plt.ylabel("loss")
plt.xlabel("epoch")
plt.title("Training Loss Curve")
plt.show()

In [None]:
pred_Y = model.predict(test_generator, batch_size = 25, verbose = True)

In [None]:
history = model.fit(train_generator, 
                    validation_data=test_generator,
                    steps_per_epoch=5, 
                    validation_steps=100, 
                    epochs = 20)

In [None]:
plt.plot(history.history['loss'])
plt.ylabel("loss")
plt.xlabel("epoch")
plt.title("Training Loss Curve")
plt.show()

In [None]:
from sklearn.metrics import roc_curve, auc

test_Y = test_generator.labels
fig, c_ax = plt.subplots(1,1, figsize = (9, 9))

for (idx, c_label) in enumerate(all_labels):
    fpr, tpr, thresholds = roc_curve(test_Y[:,idx].astype(int), pred_Y[:,idx])
    c_ax.plot(fpr, tpr, label = '%s (AUC:%0.2f)'  % (c_label, auc(fpr, tpr)))

c_ax.legend()
c_ax.set_xlabel('False Positive Rate')
c_ax.set_ylabel('True Positive Rate')

In [None]:
for c_label, p_count, t_count in zip(all_labels, 
                                     100*np.mean(pred_Y,0), 
                                     100*np.mean(test_Y,0)):
    print('%s: 등급: %2.2f%%, predict 등급: %2.2f%%' % (c_label, t_count, p_count))