In [10]:
import tensorflow as tf
from tqdm import tqdm
import os
from pathlib import Path
import math
import pandas as pd
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [11]:
class GeMPoolingLayer(tf.keras.layers.Layer):
    def __init__(self, p=1., train_p=False):
        super().__init__()
        if train_p:
            self.p = tf.Variable(p, dtype=tf.float32)
        else:
            self.p = p
        self.eps = 1e-6

    def call(self, inputs: tf.Tensor, **kwargs):
        inputs = tf.clip_by_value(inputs, clip_value_min=1e-6, clip_value_max=tf.reduce_max(inputs))
        inputs = tf.pow(inputs, self.p)
        inputs = tf.reduce_mean(inputs, axis=[1, 2], keepdims=False)
        inputs = tf.pow(inputs, 1./self.p)
        return inputs


class DelfArcFaceModel(tf.keras.Model):
    def __init__(self, input_shape, n_classes, margin, logit_scale, feature_size, p=None, train_p=False):
        super().__init__()
        self.backbone = tf.keras.applications.ResNet101(include_top=False, weights="imagenet", input_shape=input_shape)
        #elf.backbone.summary()

        if p is not None:
            self.global_pooling = GeMPoolingLayer(p, train_p=train_p)
        else:
            self.global_pooling = functools.partial(tf.reduce_mean, axis=[1, 2], keepdims=False)
        self.dense1 = tf.keras.layers.Dense(feature_size, activation='softmax', kernel_initializer="glorot_normal")
        self.bn1 = tf.keras.layers.BatchNormalization()
        self.Flatten = tf.keras.layers.Flatten()
        self.arcface = ArcFaceLayer(n_classes, margin, logit_scale)
        
    def call(self, inputs, training=True, mask=None):
        images, labels = inputs
        x = self.extract_feature(images)
        x = self.arcface((x, labels))
        return x
        
    def extract_feature(self, inputs):
        x = self.backbone(inputs)
        x = self.global_pooling(x)
        x = self.Flatten(x)
        x = self.dense1(x)
        x = self.bn1(x)
        return x


class ArcFaceLayer(tf.keras.layers.Layer):
    def __init__(self, num_classes, margin, logit_scale):
        super().__init__()
        self.num_classes = num_classes
        self.margin = margin
        self.logit_scale = logit_scale

    def build(self, input_shape):
        self.w = self.add_weight("weights", shape=[int(input_shape[0][-1]), self.num_classes], initializer=tf.keras.initializers.get("glorot_normal"))
        self.cos_m = tf.identity(tf.cos(self.margin), name='cos_m')
        self.sin_m = tf.identity(tf.sin(self.margin), name='sin_m')
        self.th = tf.identity(tf.cos(math.pi - self.margin), name='th')
        self.mm = tf.multiply(self.sin_m, self.margin, name='mm')

    def call(self, inputs, training=True, mask=None):
        embeddings, labels = inputs
        normed_embeddings = tf.nn.l2_normalize(embeddings, axis=1, name='normed_embd')
        normed_w = tf.nn.l2_normalize(self.w, axis=0, name='normed_weights')

        cos_t = tf.matmul(normed_embeddings, normed_w, name='cos_t')
        sin_t = tf.sqrt(1. - cos_t ** 2, name='sin_t')

        cos_mt = tf.subtract(cos_t * self.cos_m, sin_t * self.sin_m, name='cos_mt')

        cos_mt = tf.where(cos_t > self.th, cos_mt, cos_t - self.mm)

        mask = tf.one_hot(tf.cast(labels, tf.int32), depth=self.num_classes,
                          name='one_hot_mask')

        logits = tf.where(mask == 1., cos_mt, cos_t)
        logits = tf.multiply(logits, self.logit_scale, 'arcface_logist')
        return logits

In [12]:
EPOCHS = 1000
batch_size = 64
image_size = 224
STEPS_PER_TPU_CALL = 1
learning_rate = 5e-5  # should be smaller than training on single GPU
feature_size = 2048  # Embedding size before the output layer
save_interval = 2000

# ArcFace params
margin = 0.1  # DELG used 0.1, original ArcFace paper used 0.5. When margin is 0, it should be the same as doing a normal softmax but with embedding and weight normalised.
logit_scale = int(math.sqrt(feature_size))

# GeM params
gem_p = 3.
train_p = False  # whether to learn gem_p or not
data_dirs = '/home/ubuntu/Dacon/cpt_data/landmark/'
data_dir = "/home/ubuntu/Dacon/jin/NIA"

In [13]:
checkpoint_dir = "/home/ubuntu/Dacon/jin/NIA/checkpoint/"
train_tf_records_dir = "/home/ubuntu/Dacon/jin/NIA/tfrecords/train*"
test_tf_records_dir = "/home/ubuntu/Dacon/jin/NIA/tfrecords/validation*"
training_csv_path = os.path.join(data_dir, "train.csv")
train_csv = pd.read_csv(str(training_csv_path))
num_samples = len(train_csv["id"].tolist())
unique_landmark_ids = train_csv["landmark_id"].unique().tolist()
unique_landmark_ids = tf.convert_to_tensor(unique_landmark_ids, dtype=tf.int64)

In [14]:
test_datagen = ImageDataGenerator(rescale=1./255)

In [15]:
test_data = test_datagen.flow_from_directory(
    directory = data_dirs + 'test/',
    class_mode = 'sparse',
    shuffle = True,
    target_size = (image_size, image_size),
    batch_size = batch_size)

Found 7488 images belonging to 309 classes.


In [27]:
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

model = DelfArcFaceModel(
            input_shape=(image_size, image_size, 3), n_classes=len(unique_landmark_ids), margin=margin, logit_scale=logit_scale,
            p=gem_p, train_p=train_p, feature_size=feature_size
        )
new_optimizer = tf.keras.optimizers.Adam()



In [28]:
@tf.function
def eval_step(images, labels):
  predictions = model((images, labels), training=False)
  eval_accuracy(labels, predictions)

In [29]:
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f14312fb6d0>

In [30]:
images, labels = next(iter(test_data))

In [41]:
  predictions = model((images, dummy_labels), training=False)




To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.



In [39]:
dummy_labels = np.zeros(64)

In [40]:
dummy_labels

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

In [43]:
np.argmax(predictions, axis=1)

array([ 71,  71,  71,  71, 172,  71,  71,  71,  71,  71,  71,  71, 172,
        71,  71,  71, 172,  71,  71,  71,  71,  71,  71,  71,  71,  71,
        71,  71,  71,  71,  71,  71,  71,  71,  71,  71,  71,  71,  71,
       172,  71,  71,  71,  71,  71,  71,  71,  71, 172,  71,  71,  71,
        71,  71,  71,  71,  71,  71,  71, 172,  71,  71,  71,  71])

In [None]:
for images, labels in test_data:
  eval_step(images, labels)
  print(eval_accuracy.result()*100)
print ('전략을 사용하지 않고, 저장된 모델을 복원한 후의 정확도: {}'.format(
    eval_accuracy.result()*100))