<a href="https://colab.research.google.com/github/YianKim/2022_uncertainty_aware_semisupervise/blob/main/uncertainty_%EA%B8%B0%EB%B0%98_labeling_resnet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 라이브러리 불러오기


In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
from tensorflow import keras
import numpy as np
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split

import PIL
import pickle
import random
from tqdm import tqdm
from collections import Counter

from keras.layers.core import Lambda
from keras import backend as K

from keras.models import Sequential
from keras.layers import Conv2D
from keras.layers import BatchNormalization
from keras.regularizers import l2
from keras.layers import Activation
from keras.layers import Dropout
from keras.layers import MaxPooling2D
from keras.layers import Flatten
from keras.layers import Dense
from keras.layers import Reshape
from keras.layers import LSTM
from keras import optimizers
from keras.callbacks import ReduceLROnPlateau, EarlyStopping, TensorBoard, ModelCheckpoint
from sklearn.metrics import *
from keras.models import load_model

from tqdm import tqdm

## 데이터 불러오기


In [None]:
fashion_mnist = keras.datasets.fashion_mnist
(data, labels), (test_data, test_labels) = fashion_mnist.load_data()

In [None]:
train_data = data[range(10000)].reshape([10000,28,28,1])
train_labels = labels[range(10000)].reshape([10000,1])

In [None]:
unlab_data = data[range(10000,60000)].reshape([50000,28,28,1])
unlab_labels = labels[range(10000,60000)].reshape([50000,1])

In [None]:
test_data = test_data.reshape([10000,28,28,1])
test_labels = test_labels.reshape([10000,1])

In [None]:
train_labels2 = []
unlab_labels2 = []
test_labels2 = []

In [None]:
for i in range(10000):
  white = [0,0,0,0,0,0,0,0,0,0]
  white[train_labels[i][0]] = 1
  train_labels2.append(white)

for i in range(50000):
  white = [0,0,0,0,0,0,0,0,0,0]
  white[unlab_labels[i][0]] = 1
  unlab_labels2.append(white)

for i in range(10000):
  white = [0,0,0,0,0,0,0,0,0,0]
  white[test_labels[i][0]] = 1
  test_labels2.append(white)

In [None]:
train_labels = np.array(train_labels2)
unlab_labels = np.array(unlab_labels2)
test_labels = np.array(test_labels2)

## resnet

In [None]:
from tensorflow import Tensor
from tensorflow.keras.layers import Input, Conv2D, ReLU, BatchNormalization,\
                                    Add, AveragePooling2D, Flatten, Dense
from tensorflow.keras.models import Model

def PermaDropout(rate):
    return Lambda(lambda x: K.dropout(x, level=rate))

def relu_bn(inputs: Tensor) -> Tensor:
    relu = ReLU()(inputs)
    bn = BatchNormalization()(relu)
    return bn

def residual_block(x: Tensor, downsample: bool, filters: int, kernel_size: int = 3) -> Tensor:
    y = Conv2D(kernel_size=kernel_size,
               strides= (1 if not downsample else 2),
               filters=filters,
               padding="same")(x)
    y = relu_bn(y)
    y = Conv2D(kernel_size=kernel_size,
               strides=1,
               filters=filters,
               padding="same")(y)

    if downsample:
        x = Conv2D(kernel_size=1,
                   strides=2,
                   filters=filters,
                   padding="same")(x)
    out = Add()([x, y])
    out = relu_bn(out)
    return out

def create_res_net():
    
    inputs = Input(shape=(28, 28, 1))
    num_filters = 64
    
    t = BatchNormalization()(inputs)
    t = Conv2D(kernel_size=3,
               strides=1,
               filters=num_filters,
               padding="same")(t)
    t = relu_bn(t)
    t = PermaDropout(0.5)(t)
    
    num_blocks_list = [2, 5, 5, 2]
    for i in range(len(num_blocks_list)):
        num_blocks = num_blocks_list[i]
        for j in range(num_blocks):
            t = residual_block(t, downsample=(j==0 and i!=0), filters=num_filters)
        num_filters *= 2
        t = PermaDropout(0.5)(t)
    
    t = AveragePooling2D(4)(t)
    t = Flatten()(t)
    outputs = Dense(10, activation='softmax')(t)
    
    model = Model(inputs, outputs)

    model.compile(
        optimizer='adam',
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

In [None]:
model = create_res_net() # or create_plain_net()
model.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 28, 28, 1)]  0                                            
__________________________________________________________________________________________________
batch_normalization (BatchNorma (None, 28, 28, 1)    4           input_1[0][0]                    
__________________________________________________________________________________________________
conv2d (Conv2D)                 (None, 28, 28, 64)   640         batch_normalization[0][0]        
__________________________________________________________________________________________________
re_lu (ReLU)                    (None, 28, 28, 64)   0           conv2d[0][0]                     
______________________________________________________________________________________________

In [None]:
lr_reducer = ReduceLROnPlateau(monitor='val_loss', factor=0.9, patience=3)
early_stopper = EarlyStopping(monitor='val_accuracy', min_delta=0, patience=30, mode='auto')

model.fit(
    x=train_data,
    y=train_labels,
    epochs=500,
    verbose=1,
    validation_split=0.3,
    batch_size=64,
    callbacks=[lr_reducer, early_stopper]
)

Epoch 1/500


KeyboardInterrupt: 

In [None]:
model.save('labeling_resnet.h5')

## 모델 결과

In [None]:
def model_eval():
  pred_mu = model.predict(test_data)
  for i in range(1, 30):
    pred_mu += model.predict(test_data)
  pred_mu = pred_mu/30
  predicted_test_labels = np.argmax(pred_mu, axis=1)
  return(accuracy_score(np.argmax(test_labels, axis=1), predicted_test_labels))

In [None]:
model = keras.models.load_model('labeling_resnet.h5')
model_eval()

# 라벨링

## 라벨링; 1

In [None]:
model = keras.models.load_model('labeling_resnet.h5')

In [None]:
# Vars : 분산들
# Outs : 결과 값들

Vars = []
Outs = []

temp1 = []

for i in tqdm(range(30)):
  temp1.append(model.predict(unlab_data))

for j in tqdm(range(unlab_data.shape[0])):

  temp2 = np.array([0,0,0,0,0,0,0,0,0,0], 'float32')
  
  for i in range(30):
    temp2 += temp1[i][j]
  Outs.append(temp2/30)
  
  outputs=[]

  for i in range(30):
    outputs.append(temp1[i][j][np.argmax(temp2)])
  Vars.append(np.var(outputs))

In [None]:
Outs = np.array(Outs)

class마다 균등하게 선택

In [None]:
Counter(np.argmax(unlab_labels, axis=1))

### 불확정성 컷 2

In [None]:
# upper bound of variance?

UB25 = []
UB50 = []
UB75 = []


for h in range(10):
  classvars = []
  for i in tqdm(range(50000)):
    if np.argmax(unlab_labels, axis=1).tolist()[i]==h:
      classvars.append(Vars[i])
  UB25.append(np.percentile(classvars, 25))
  UB50.append(np.percentile(classvars, 50))
  UB75.append(np.percentile(classvars, 75))

In [None]:
# UB25 < UB50 < UB75

In [None]:
vars25 = []
vars50 = []
vars75 = []
vars100 = []

ind = 0 


for i in Vars:
  if i <= UB25[np.argmax(unlab_labels, axis=1)[ind]]:
    vars25.append(ind)
  elif i <= UB50[np.argmax(unlab_labels, axis=1)[ind]]:
    vars50.append(ind)
  elif i <= UB75[np.argmax(unlab_labels, axis=1)[ind]]:
    vars75.append(ind)
  else:
    vars100.append(ind)
  ind += 1

In [None]:
k1 = random.sample(range(len(vars25)), len(vars25))
k2 = random.sample(range(len(vars50)), len(vars50))
k3 = random.sample(range(len(vars75)), len(vars75))
k4 = random.sample(range(len(vars100)), len(vars100))

lowvars = k1[0:len(k1)]+k2[0:len(k2)]
highvars = k3[0:len(k3)]+k4[0:len(k4)]

clstvars1 = k1[0:np.int(len(k1)/2)] + k2[0:np.int(len(k2)/2)] + k3[0:np.int(len(k3)/2)] + k4[0:np.int(len(k4)/2)]
clstvars2 = k1[np.int(len(k1)/2):len(k1)] + k2[np.int(len(k2)/2):len(k2)] + k3[np.int(len(k3)/2):len(k3)] + k4[np.int(len(k4)/2):len(k4)]

In [None]:
# 같은 길이의 랜덤하게 고른 data들에 모델로 label 부여 후 정확도 측정
randomindx = random.sample(range(50000), 50000)
randomindx2 = randomindx[0:25000]
randomindx = randomindx[25000:50000]
accuracy_score(np.argmax(np.array(Outs)[randomindx], axis=1), np.argmax(unlab_labels[randomindx], axis=1))

In [None]:
# 분산추출
accuracy_score(np.argmax(np.array(Outs)[lowvars], axis=1), np.argmax(unlab_labels[lowvars], axis=1))

In [None]:
# 층화추출
accuracy_score(np.argmax(np.array(Outs)[clstvars1], axis=1), np.argmax(unlab_labels[clstvars1], axis=1))

In [None]:
train_data_1 = unlab_data[lowvars]
train_labels_1 = Outs[lowvars]

In [None]:
shufindx = random.sample(range(train_data_1.shape[0]),train_data_1.shape[0])
train_data_1 = train_data_1[shufindx]
train_labels_1 = train_labels_1[shufindx]

In [None]:
train_data_2 = unlab_data[clstvars1]
train_labels_2 = Outs[clstvars1]

In [None]:
shufindx = random.sample(range(train_data_2.shape[0]),train_data_2.shape[0])
train_data_2 = train_data_2[shufindx]
train_labels_2 = train_labels_2[shufindx]

In [None]:
train_data_3 = unlab_data[randomindx]
train_labels_3 = Outs[randomindx]

In [None]:
shufindx = random.sample(range(train_data_3.shape[0]),train_data_3.shape[0])
train_data_3 = train_data_3[shufindx]
train_labels_3 = train_labels_3[shufindx]

In [None]:
unlab_data_1 = unlab_data[highvars]
unlab_labels_1 = unlab_labels[highvars]
unlab_data_2 = unlab_data[clstvars2]
unlab_labels_2 = unlab_labels[clstvars2]
unlab_data_3 = unlab_data[randomindx2]
unlab_labels_3 = unlab_labels[randomindx2]

In [None]:
Counter(np.argmax(train_labels_1, axis=1))

In [None]:
Counter(np.argmax(train_labels_2, axis=1))

In [None]:
Counter(np.argmax(train_labels_3, axis=1))

In [None]:
Counter(np.argmax(unlab_labels_1, axis=1))

In [None]:
Counter(np.argmax(unlab_labels_2, axis=1))

In [None]:
Counter(np.argmax(unlab_labels_3, axis=1))

## 모델링

간단한 모델에서 받은 지식를 복잡한 모델에서 학습 

uncertainty aware data vs random sampling data

#### Resnet

In [None]:
def relu_bn(inputs: Tensor) -> Tensor:
    relu = ReLU()(inputs)
    bn = BatchNormalization()(relu)
    return bn

def create_simple_net():
    
    inputs = Input(shape=(28, 28, 1))
    num_filters = 64
    
    t = BatchNormalization()(inputs)
    t = Conv2D(kernel_size=3,
               strides=1,
               filters=num_filters,
               padding="same")(t)
    t = relu_bn(t)
    t = Conv2D(kernel_size=3,
               strides=1,
               filters=num_filters,
               padding="same")(t)
    t = relu_bn(t)
    t = Dropout(0.5)(t)
    t = Flatten()(t)
    outputs = Dense(10, activation='softmax')(t)
    
    model = Model(inputs, outputs)

    model.compile(
        optimizer='adam',
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

In [None]:
lr_reducer = ReduceLROnPlateau(monitor='val_loss', factor=0.9, patience=3)
early_stopper = EarlyStopping(monitor='val_accuracy', min_delta=0, patience=30, mode='auto')
model = create_simple_net()

model.fit(
    x=train_data_1,
    y=train_labels_1,
    epochs=500,
    verbose=1,
    validation_split=0.3,
    batch_size=64,
    callbacks=[lr_reducer, early_stopper]
)

In [None]:
for i in range(len(model.layers)-1):
    model.layers[i].trainable = False
    
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.00001),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

lr_reducer = ReduceLROnPlateau(monitor='val_loss', factor=0.9, patience=3)
early_stopper = EarlyStopping(monitor='val_accuracy', min_delta=0, patience=30, mode='auto')

model.fit(
    x=train_data,
    y=train_labels,
    epochs=500,
    verbose=1,
    validation_split=0.3,
    batch_size=64,
    callbacks=[lr_reducer, early_stopper]
)

model.save('labeling_resnet_a1.h5')
model_eval()

In [None]:
lr_reducer = ReduceLROnPlateau(monitor='val_loss', factor=0.9, patience=3)
early_stopper = EarlyStopping(monitor='val_accuracy', min_delta=0, patience=30, mode='auto')
model = create_simple_net()

model.fit(
    x=train_data_2,
    y=train_labels_2,
    epochs=500,
    verbose=1,
    validation_split=0.3,
    batch_size=64,
    callbacks=[lr_reducer, early_stopper]
)

for i in range(len(model.layers)-1):
    model.layers[i].trainable = False
    
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.00001),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

lr_reducer = ReduceLROnPlateau(monitor='val_loss', factor=0.9, patience=3)
early_stopper = EarlyStopping(monitor='val_accuracy', min_delta=0, patience=30, mode='auto')

model.fit(
    x=train_data,
    y=train_labels,
    epochs=500,
    verbose=1,
    validation_split=0.3,
    batch_size=64,
    callbacks=[lr_reducer, early_stopper]
)

model.save('labeling_resnet_b1.h5')
model_eval()

In [None]:
lr_reducer = ReduceLROnPlateau(monitor='val_loss', factor=0.9, patience=3)
early_stopper = EarlyStopping(monitor='val_accuracy', min_delta=0, patience=30, mode='auto')
model = create_simple_net()

model.fit(
    x=train_data_3,
    y=train_labels_3,
    epochs=500,
    verbose=1,
    validation_split=0.3,
    batch_size=64,
    callbacks=[lr_reducer, early_stopper]
)

for i in range(len(model.layers)-1):
    model.layers[i].trainable = False
    
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.00001),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

lr_reducer = ReduceLROnPlateau(monitor='val_loss', factor=0.9, patience=3)
early_stopper = EarlyStopping(monitor='val_accuracy', min_delta=0, patience=30, mode='auto')

model.fit(
    x=train_data,
    y=train_labels,
    epochs=500,
    verbose=1,
    validation_split=0.3,
    batch_size=64,
    callbacks=[lr_reducer, early_stopper]
)

model.save('labeling_resnet_c1.h5')
model_eval()