https://towardsdatascience.com/lossless-triplet-loss-7e932f990b24 참조

In [1]:
import tensorflow as tf

In [2]:
from keras import backend as K
from keras.layers import Input, BatchNormalization, LSTM, Dense, concatenate, Conv2D, MaxPooling2D, Flatten
from keras.optimizers import Adam, SGD
from keras.models import Model, Sequential
from keras.utils import plot_model
from keras.preprocessing.image import ImageDataGenerator

import numpy as np
import random
import matplotlib.pyplot as plt
import os
import cv2
import json

Using TensorFlow backend.


In [3]:
tf.__version__

'1.15.0'

# 1. Define loss & base network
- 두 가지 loss 함수
    - `triplet_loss`
        - 출처: https://thelonenutblog.wordpress.com/2017/12/18/what-siamese-dreams-are-made-of/
    - `lossless_triplet_loss`
        - 출처: https://towardsdatascience.com/lossless-triplet-loss-7e932f990b24

In [4]:
def triplet_loss(y_true, y_pred, N=128, alpha = 0.2):
    """
    Implementation of the triplet loss function
    Arguments:
    y_true -- true labels, required when you define a loss in Keras, not used in this function.
    y_pred -- python list containing three objects:
            anchor:   the encodings for the anchor data
            positive: the encodings for the positive data (similar to anchor)
            negative: the encodings for the negative data (different from anchor)
    Returns:
    loss -- real number, value of the loss
    """
    anchor = tf.convert_to_tensor(y_pred[:,0:N])
    positive = tf.convert_to_tensor(y_pred[:,N:N*2]) 
    negative = tf.convert_to_tensor(y_pred[:,N*2:N*3])
    
    # distance between the anchor and the positive
    pos_dist = tf.reduce_sum(tf.square(tf.subtract(anchor,positive)))

    # distance between the anchor and the negative
    neg_dist = tf.reduce_sum(tf.square(tf.subtract(anchor,negative)))

    # compute loss
    basic_loss = pos_dist-neg_dist + alpha

    loss = tf.maximum(basic_loss,0.0)

    return loss

In [5]:
def lossless_triplet_loss(y_true, y_pred, N=128, beta=128, epsilon=1e-8):
    """
    Implementation of the triplet loss function
    
    Arguments:
    y_true -- true labels, required when you define a loss in Keras, you don't need it in this function.
    y_pred -- python list containing three objects:
            anchor -- the encodings for the anchor data
            positive -- the encodings for the positive data (similar to anchor)
            negative -- the encodings for the negative data (different from anchor)
    N  --  The number of dimension 
    beta -- The scaling factor, N is recommended
    epsilon -- The Epsilon value to prevent ln(0)
    
    
    Returns:
    loss -- real number, value of the loss
    """
    anchor = tf.convert_to_tensor(y_pred[:,0:N])
    positive = tf.convert_to_tensor(y_pred[:,N:N*2]) 
    negative = tf.convert_to_tensor(y_pred[:,N*2:N*3])
    
    # distance between the anchor and the positive
    pos_dist = tf.reduce_sum(tf.square(tf.subtract(anchor,positive)),1)
    # distance between the anchor and the negative
    neg_dist = tf.reduce_sum(tf.square(tf.subtract(anchor,negative)),1)
    
    #Non Linear Values  
    
    # -ln(-x/N+1)
    pos_dist = -tf.math.log(-tf.divide((pos_dist),beta)+1+epsilon)
    neg_dist = -tf.math.log(-tf.divide((N-neg_dist),beta)+1+epsilon)
    
    # compute loss
    loss = neg_dist + pos_dist
    
    
    return loss

In [6]:
def cnn_base_network(in_dims, out_dims):
    model = Sequential()

    model.add(Conv2D(input_shape = in_dims,
                     filters = 10, kernel_size = (3,3), strides = (1,1), padding = 'same', activation='relu'))

    model.add(Conv2D(64, (3, 3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Conv2D(32, (3, 3), activation='relu'))
    
    model.add(Conv2D(64, (3, 3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Conv2D(32, (3, 3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Flatten())    #1차원화

    model.add(Dense(32, activation = 'relu'))
    model.add(Dense(out_dims, activation = 'sigmoid'))

    return model

# 2. Define input generator

In [7]:
data_dir = '../../../../../Documents/pshopper/mmfashion/data/Attr_Predict/Img_fortest'   #클래스 5개만 있는 테스트 디렉토리

In [8]:
os.listdir(data_dir)

['Abstract_Brushstroke_Pocket_Top',
 'Abstract_Diamond_Print_Dress',
 'Abstract_Ombre_Capri_Leggings',
 'Abstract_Print_French_Terry_Shorts',
 'Abstract_Tile_Fringe-Trimmed_Kimono']

In [9]:
CLASS_DICT = {k:v for k, v in enumerate(os.listdir(data_dir))}

In [10]:
CLASS_DICT

{0: 'Abstract_Brushstroke_Pocket_Top',
 1: 'Abstract_Diamond_Print_Dress',
 2: 'Abstract_Ombre_Capri_Leggings',
 3: 'Abstract_Print_French_Terry_Shorts',
 4: 'Abstract_Tile_Fringe-Trimmed_Kimono'}

In [11]:
gen = ImageDataGenerator(rescale=1./255)
num_classes = len(CLASS_DICT) - 1

In [12]:
def listdir_fullpath(d):
    return [os.path.join(d, f) for f in os.listdir(d)]

In [13]:
anchor_label, neg_label = random.sample(range(num_classes), 2)

POS_POOL = listdir_fullpath(os.path.join(data_dir, CLASS_DICT[anchor_label]))
NEG_POOL = listdir_fullpath(os.path.join(data_dir, CLASS_DICT[neg_label]))

print('pos_pool: {}'.format(POS_POOL))
print('neg_pool: {}'.format(NEG_POOL))

pos_pool: ['../../../../../Documents/pshopper/mmfashion/data/Attr_Predict/Img_fortest\\Abstract_Brushstroke_Pocket_Top\\img_00000001.jpg', '../../../../../Documents/pshopper/mmfashion/data/Attr_Predict/Img_fortest\\Abstract_Brushstroke_Pocket_Top\\img_00000002.jpg', '../../../../../Documents/pshopper/mmfashion/data/Attr_Predict/Img_fortest\\Abstract_Brushstroke_Pocket_Top\\img_00000003.jpg', '../../../../../Documents/pshopper/mmfashion/data/Attr_Predict/Img_fortest\\Abstract_Brushstroke_Pocket_Top\\img_00000004.jpg', '../../../../../Documents/pshopper/mmfashion/data/Attr_Predict/Img_fortest\\Abstract_Brushstroke_Pocket_Top\\img_00000005.jpg', '../../../../../Documents/pshopper/mmfashion/data/Attr_Predict/Img_fortest\\Abstract_Brushstroke_Pocket_Top\\img_00000006.jpg', '../../../../../Documents/pshopper/mmfashion/data/Attr_Predict/Img_fortest\\Abstract_Brushstroke_Pocket_Top\\img_00000007.jpg', '../../../../../Documents/pshopper/mmfashion/data/Attr_Predict/Img_fortest\\Abstract_Brushstr

In [15]:
def input_triplet_gen(gen, data_dir, num_classes, batch_size):
    
    anchor_label, neg_label = random.sample(range(1, num_classes+1), 2)
    
    POS_DIR = os.path.join(data_dir, CLASS_DICT[anchor_label])
    NEG_DIR = os.path.join(data_dir, CLASS_DICT[neg_label])
    
    anchor_gen = gen.flow_from_directory(POS_DIR, 
                                              target_size=(150, 150),
                                             batch_size=batch_size)
    
    pos_gen = gen.flow_from_directory(POS_DIR, 
                                              target_size=(150, 150),
                                             batch_size=batch_size)
    
    neg_gen = gen.flow_from_directory(NEG_DIR, 
                                              target_size=(150, 150),
                                             batch_size=batch_size)

    while True:
        X1i = anchor_gen.next()
        X2i = pos_gen.next()
        X3i = neg_gen.next()
        
#        if [X1i[0], X2i[0], X3i[0]] == 
        
        yield [X1i[0], X2i[0], X3i[0]], X1i[1]

# 3. Create model

In [15]:
in_dims = (150, 150, 3)    # (28, 28, 1)
out_dims = 128

# Create the 3 inputs
anchor_in = Input(shape=in_dims, name='anchor')
pos_in = Input(shape=in_dims, name='positive')
neg_in = Input(shape=in_dims, name='negative')

# with tf.compat.v1.Session(config=config):
    # Share base network with the 3 inputs
base_network = cnn_base_network(in_dims, out_dims)
anchor_out = base_network(anchor_in)
pos_out = base_network(pos_in)
neg_out = base_network(neg_in)
    
merged_vector = concatenate([anchor_out, pos_out, neg_out], axis=-1)

# Define the trainable model
model = Model(inputs=[anchor_in, pos_in, neg_in], outputs=merged_vector)
model.compile(optimizer=Adam(lr=0.0001),
                    loss=lossless_triplet_loss)
#tf2되면서 lr -> learning_rate








In [16]:
base_network.summary()

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_1 (Conv2D)            (None, 150, 150, 10)      280       
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 148, 148, 64)      5824      
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 74, 74, 64)        0         
_________________________________________________________________
conv2d_3 (Conv2D)            (None, 72, 72, 32)        18464     
_________________________________________________________________
conv2d_4 (Conv2D)            (None, 70, 70, 64)        18496     
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 35, 35, 64)        0         
_________________________________________________________________
conv2d_5 (Conv2D)            (None, 33, 33, 32)        18464     
__________

In [17]:
model.summary()

__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
anchor (InputLayer)             (None, 150, 150, 3)  0                                            
__________________________________________________________________________________________________
positive (InputLayer)           (None, 150, 150, 3)  0                                            
__________________________________________________________________________________________________
negative (InputLayer)           (None, 150, 150, 3)  0                                            
__________________________________________________________________________________________________
sequential_1 (Sequential)       (None, 128)          327928      anchor[0][0]                     
                                                                 positive[0][0]                   
          

# 4. Train

In [18]:
batch_size = 1

In [None]:
# Training the model
H = model.fit_generator(input_triplet_gen(gen, data_dir, num_classes, batch_size), 
                        steps_per_epoch = 100, 
                        epochs=5, 
                        use_multiprocessing=True)




Epoch 1/5


Exception in thread Thread-6:
Traceback (most recent call last):
  File "c:\users\user\.conda\envs\pshopper\lib\threading.py", line 914, in _bootstrap_inner
    self.run()
  File "c:\users\user\.conda\envs\pshopper\lib\threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "c:\users\user\.conda\envs\pshopper\lib\site-packages\keras\utils\data_utils.py", line 666, in _run
    with closing(self.executor_fn(_SHARED_SEQUENCES)) as executor:
  File "c:\users\user\.conda\envs\pshopper\lib\site-packages\keras\utils\data_utils.py", line 661, in <lambda>
    initargs=(seqs, self.random_seed))
  File "c:\users\user\.conda\envs\pshopper\lib\multiprocessing\context.py", line 118, in Pool
    context=self.get_context())
  File "c:\users\user\.conda\envs\pshopper\lib\multiprocessing\pool.py", line 174, in __init__
    self._repopulate_pool()
  File "c:\users\user\.conda\envs\pshopper\lib\multiprocessing\pool.py", line 239, in _repopulate_pool
    w.start()
  File "c:\us

In [None]:
#결과 확인
plt.plot(H.history['loss'])
# plt.savefig('./CNN/loss_graph.png')
# plt.legend(['loss'], loc = 'upper left')
plt.show()

# 5. Save model & weights

In [None]:
model_dir = './trained/lossless_triplet.json'
model_weights_dir = './trained/lossless_triplet.h5'

In [None]:
#Save model
model_json = model.to_json()
with open(model_dir, "w") as json_file : 
    json_file.write(model_json)

#Save weights
model.save_weights(model_weights_dir)
print("Saved model to disk")

In [None]:
#Load model & weights
from keras.models import model_from_json
json_file = open(model_dir, "r")
loaded_model_json = json_file.read()
json_file.close()
loaded_model = model_from_json(loaded_model_json)
loaded_model.load_weights(model_weights_dir)
print("Loaded model from disk")

# 6. Application

In [None]:
def item_to_encoding(item, model):
    return model.predict(np.array([item]))

## 6.1 Load test data

In [None]:
test_dir = '../../img/mmfashion_mini'    #클래스 5개만 있는 테스트 디렉토리

In [None]:
test_batch_size = 2000

test_datagen = ImageDataGenerator(rescale=1./255)

test_generator = test_datagen.flow_from_directory(
                        data_dir,  # this is the target directory
                        target_size=(150, 150),  # 모든 이미지의 크기가 150x150로 조정됩니다.
                        batch_size=test_batch_size)

batch = test_generator.next()
images = batch[0]
labels = batch[1]

# images = images.reshape(-1, 150, 150)
labels = np.array([np.argmax(x) for x in labels])

In [None]:
def get_triplets(x, y, sample_size):
    a = []
    p = []
    n = []
    
    num_classes = len(set(y))
    for _ in range(sample_size):
        num1, num2 = random.sample(range(num_classes), 2)
        positive_pool = x[np.where(y == num1)[0]]
        negative_pool = x[np.where(y == num2)[0]]
        num3, num4 = random.sample(range(len(positive_pool)), 2)

        anchor = positive_pool[num3]
        positive = positive_pool[num4]
        negative = negative_pool[random.randint(0, len(negative_pool)-1)]
        
        a.append(anchor)
        p.append(positive)
        n.append(negative)
        
    return np.asarray(a), np.asarray(p), np.asarray(n)

In [None]:
sample_size = 500
t_a, t_p, t_n = get_triplets(images, labels, 500)

In [None]:
count_true = 0
for k in range(sample_size):
    anchor_encoding = item_to_encoding(t_a[k], base_network)
    pos_encoding = item_to_encoding(t_p[k], base_network)
    neg_encoding = item_to_encoding(t_n[k], base_network)

    anchor_pos_dist = np.linalg.norm(anchor_encoding-pos_encoding)
    anchor_neg_dist = np.linalg.norm(anchor_encoding-neg_encoding)

    cond = anchor_pos_dist < anchor_neg_dist
    
    if cond: count_true += 1
        
    else: 
        print('---------- wrong embedding : index %d -----------'%k)
        plt.imshow(np.hstack([t_a[k, :, :], t_p[k, :, :], t_n[k, :, :]]))
        plt.show()
        
print("true: {} false: {}".format(count_true, sample_size-count_true))
print("Accuracy: {}".format(count_true / sample_size))

In [None]:
def sub_dir(o_dir, n_dir):
    """
    함수 설명
    Arguments:
    - o_dir: old directory. 뎁스 바꿔주고 싶은 이미지 폴더(루트)
    e.g. '../../img/mmfashion'
    
    -n_dir: new directory. 새로운 폴더명. 아무거나 상관없음
    e.g. '../../img/mmfashion_flow'
    
    Return:
    없음
    
    """
    dirlist=os.listdir(o_dir)
    for i in dirlist:
        if not os.path.exists(n_dir):
            os.mkdir(n_dir)
        if not os.path.exists(n_dir+'/'+i):
            os.mkdir(n_dir+'/'+i)
        shutil.move(o_dir+"/"+i,n_dir+'/'+i)
        
sub_dir( '../../img/mmfashion',  '../../img/mmfashion_flow')