# A working

In [10]:
import copy
import abc
import os
import math
import datetime
import numpy as np
import tensorflow as tf

from tensorflow.keras.layers import Dense, Flatten, Conv2D
from tensorflow.keras import Model

UNL = 0
POS = 1
NEG = -1

DF = tf.keras.datasets.mnist

## Data pipes
- Should be replaced with TF dataset APIs

In [11]:
class DataIterator(object):
    def __init__(
        self,
        data_lists,
        batch_size,
        max_epoch=None,
        repeat=True,
        shuffle=True,
        epoch_finished=None,
    ):
        for idx in range(len(data_lists) - 1):
            assert len(data_lists[idx]) == len(data_lists[idx + 1])
        self._data = data_lists
        self._batch_size = batch_size
        self._repeat = repeat
        self._shuffle = shuffle
        self._num_data = len(self._data[0])
        assert self._num_data >= self._batch_size
        self._shuffle_indexes = self._maybe_generate_shuffled_indexes()
        self._epoch_finished = 0 if epoch_finished is None else epoch_finished
        self._max_epoch = max_epoch

    @property
    def num_data(self):
        return self._num_data

    @property
    def finished(self):
        if not self._repeat:
            if self.epoch_finished == 1:
                return True
        if self._max_epoch is not None:
            return self.epoch_finished > self._max_epoch
        else:
            return False

    @property
    def epoch_finished(self):
        return self._epoch_finished

    def _maybe_generate_shuffled_indexes(self):
        indexes = list(range(self._num_data))
        if self._shuffle:
            np.random.shuffle(indexes)
        return indexes

    def get_next_batch(self, batch_size=None):
        if batch_size is None:
            batch_size = self._batch_size
        else:
            assert self._num_data >= batch_size
        if len(self._shuffle_indexes) == 0:
            raise StopIteration()
        if len(self._shuffle_indexes) >= batch_size:  # when data left is enough
            indexes = self._shuffle_indexes[:batch_size]
            self._shuffle_indexes = self._shuffle_indexes[batch_size:]
        else:  # when data left is not enough.
            indexes = self._shuffle_indexes
            self._shuffle_indexes = []
        if len(self._shuffle_indexes) == 0:
            self._epoch_finished += 1
            if self._repeat:
                if self._max_epoch is not None:
                    if self._epoch_finished > self._max_epoch:
                        raise StopIteration()
                self._shuffle_indexes = self._maybe_generate_shuffled_indexes()
                num_left = batch_size - len(indexes)
                indexes.extend(self._shuffle_indexes[:num_left])
                self._shuffle_indexes = self._shuffle_indexes[num_left:]
        return [l[indexes] for l in self._data]

    def __iter__(self):
        return self

    def __next__(self):
        return self.get_next_batch()

In [12]:
class PuDataIterator(object):
    def __init__(
        self,
        u_data,
        l_data,
        batch_size,
        max_epoch=None,
        epoch_finished=0,
        repeat=True,
        shuffle=True,
    ):
        self._u_num = u_data[0].shape[0]
        self._l_num = l_data[0].shape[0]
        self._data_num = self._u_num + self._l_num
        self._p_u = float(self._u_num) / float(self._u_num + self._l_num)
        self._p_l = float(self._l_num) / float(self._u_num + self._l_num)
        self._batch_size = batch_size
        self._used_u_num, self._used_l_num = 0, 0
        self._u_iterator = DataIterator(
            u_data,
            int(batch_size * self._p_u),
            repeat=repeat,
            shuffle=shuffle,
            epoch_finished=epoch_finished,
            max_epoch=max_epoch,
        )
        self._l_iterator = DataIterator(
            l_data,
            int(batch_size * self._p_l),
            repeat=repeat,
            shuffle=shuffle,
            epoch_finished=epoch_finished,
            max_epoch=max_epoch,
        )
        self._finished_epoch = epoch_finished
        self._max_epoch = max_epoch
        self._repeat = repeat
        self._shuffle = shuffle

    @property
    def epoch_finished(self):
        return self._finished_epoch

    @property
    def num_data(self):
        return self._data_num

    @property
    def finished(self):
        if self._max_epoch is not None:
            return self.epoch_finished > self._max_epoch
        else:
            return False

    def __next__(self):
        used_num = self._used_l_num + self._used_u_num + self._batch_size
        next_u_num = round(used_num * self._p_u - self._used_u_num)
        self._used_u_num += next_u_num
        next_l_num = round(used_num * self._p_l - self._used_l_num)
        next_l_num += self._batch_size - next_u_num - next_l_num
        self._used_l_num += next_l_num
        # Whatever the case, at least one sample is expected from each iterator
        # (though the iterator may be empty, when self._repeat == False).
        assert next_l_num != 0 and next_u_num != 0

        if self._max_epoch is not None:
            if self._finished_epoch >= self._max_epoch:
                raise StopIteration()
        # Stop iteration only if both iterator is finished. So no data will
        # be missed.
        if self._u_iterator.finished and self._l_iterator.finished:
            raise StopIteration()

        try:
            u_data = self._u_iterator.get_next_batch(int(next_u_num))
        except StopIteration:
            u_data = None

        try:
            l_data = self._l_iterator.get_next_batch(int(next_l_num))
        except StopIteration:
            l_data = None

        if not self._repeat:

            # It is guaranteed here that, if one of the iterator is finished,
            # another one will make up the missing part.
            if self._u_iterator.finished and not self._l_iterator.finished:
                u_num = 0 if u_data is None else u_data[0].shape[0]
                left = self._l_iterator.get_next_batch(int(next_u_num - u_num))
                l_data = [
                    np.concatenate((l_data[i], left[i])) for i in range(len(l_data))
                ]
            if self._l_iterator.finished and not self._u_iterator.finished:
                l_num = 0 if l_data is None else l_data[0].shape[0]
                left = self._u_iterator.get_next_batch(int(next_l_num - l_num))
                u_data = [
                    np.concatenate((u_data[i], left[i])) for i in range(len(u_data))
                ]

        self._finished_epoch = min(
            self._u_iterator.epoch_finished, self._l_iterator.epoch_finished
        )
        if u_data is None:
            return l_data
        elif l_data is None:
            return u_data
        else:
            return [np.concatenate((u_data[i], l_data[i])) for i in range(len(l_data))]

    def __iter__(self):
        return self

In [28]:
class PuLearningDataSet(object):
    def __init__(self, cfg):
        
        self.max_epoch = cfg["max_epoch"]
        self.batch_size = cfg["batch_size"]
        self.prior = cfg["prior"]
        
        (_x, _y), _ = DF.load_data()
        _x= _x / 255.0
        _x = _x[..., tf.newaxis].astype(np.float32)
        _y = _y.astype(np.int32)
        
        _y[_y % 2 == 1] = UNL
        _y[(_y % 2 == 0) & (_y != UNL)] = POS
        
        # This is ugly, but edit here to adjust how you want to specify the number of positive samples
        _y[(_y == POS)][:int(60000*self.prior)] = UNL
        
        self.num_labeled = (_y == POS).sum()
        
        shuffled_indexes = np.array(range(len(_y)))
        np.random.shuffle(shuffled_indexes)
        _y = _y[shuffled_indexes]
        _x = _x[shuffled_indexes]
        
        unlabeled_mask = (_y == UNL)
        labeled_mask = (_y == POS)
        
        _y = np.expand_dims(_y, axis=1)
        
        u_x = _x[unlabeled_mask]
        u_y = _y[unlabeled_mask]
        
        l_x = _x[labeled_mask]
        l_y = _y[labeled_mask]
        
        self.iterator = PuDataIterator(
            (u_x, u_y),
            (l_x, l_y),
            self.batch_size,
            max_epoch=self.max_epoch,
            repeat=True,
            shuffle=True,
        )

## Loss Def.

In [29]:
def calculate_losses(network_out, labels, prior):
    assert network_out.shape.ndims == 2
    assert network_out.shape[1] == 1
    loss_func = lambda network_out, y: tf.nn.sigmoid(
        -network_out * y
    )
    positive = tf.cast(tf.equal(labels, POS), tf.float32)
    unlabeled = tf.cast(tf.equal(labels, UNL), tf.float32)
    num_positive = tf.maximum(1.0, tf.reduce_sum(positive))
    num_unlabeled = tf.maximum(1.0, tf.reduce_sum(unlabeled))
    losses_positive = loss_func(network_out, POS)
    losses_negative = loss_func(network_out, NEG)
    positive_risk = tf.reduce_sum(
        prior * positive / num_positive * losses_positive
    )
    negative_risk = tf.reduce_sum(
        (unlabeled / num_unlabeled - prior * positive / num_positive) * losses_negative,
    )
    return positive_risk, negative_risk

def nnpu_loss(network_out, labels):
    positive_risk, negative_risk = calculate_losses(
        network_out, labels, pu_dataset.prior
    )
    is_ga = tf.less(negative_risk, -0.0)
    return tf.cond(
        is_ga,
        lambda: -1.0 * negative_risk,
        lambda: positive_risk + negative_risk,
    )

## Model Def.

In [30]:
class MNISTModel(tf.keras.Model):
    def __init__(self):
        super(MNISTModel, self).__init__()
        self.conv1 = Conv2D(32, 3, activation='relu')
        self.flatten = Flatten()
        self.d1 = Dense(128, activation='relu')
        self.d2 = Dense(1, activation=None)

    def call(self, x):
        x = self.conv1(x)
        x = self.flatten(x)
        x = self.d1(x)
        return self.d2(x)

## A custom training loop
Note: only that it works, not optimized for testing performance

In [None]:
optimizer = tf.keras.optimizers.Adam(0.0001)
gs_model = MNISTModel()

@tf.function
def train_step(data, labels):
    with tf.GradientTape() as tape:
        predictions = gs_model(data, training=True)
        loss = nnpu_loss(predictions, labels)

    gradients = tape.gradient(loss, gs_model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, gs_model.trainable_variables))
    train_loss(loss)

cfg = {
    "batch_size": 128,
    "max_epoch": 1,
    "prior": 0.3
}
print(cfg)

EPOCHS = 10
step = 0

dt_str = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
log_dir = "./log/{}".format(dt_str)
print('log_dir={}'.format(log_dir))

model_dir = "./model/{}".format(dt_str)
print('model_dir={}'.format(model_dir))

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_summary_writer = tf.summary.create_file_writer(os.path.join(log_dir, "train"))

for epoch in range(EPOCHS):
    
    train_loss.reset_states()
    pu_dataset = PuLearningDataSet(cfg)

    for data, labels in pu_dataset.iterator:
        step += 1
        train_step(data, labels)
        # print(train_loss.result().numpy(), end='>', flush=True)
        with train_summary_writer.as_default():
            tf.summary.scalar('loss', train_loss.result(), step=step)
            
    print(epoch, ", loss=", train_loss.result().numpy(), flush=True)

    gs_model.save(
        filepath=model_dir,
        overwrite=True,
        include_optimizer=True,
        save_format="tf"
    )
    # print('Model saved to {}'.format(model_dir))

## Some testing?

In [155]:
(_x, _y), _ = DF.load_data()
_x= _x / 255.0
_x = _x[..., tf.newaxis].astype(np.float32)
_y = _y.astype(np.int32)

loaded_model = tf.saved_model.load(model_dir)
f = loaded_model.signatures["serving_default"]
pred = f(tf.constant(_x, dtype=tf.float32))['output_1'].numpy()
pred

array([[-3.731093 ],
       [-4.939483 ],
       [ 4.5985384],
       ...,
       [-3.303848 ],
       [ 4.30934  ],
       [ 3.5497808]], dtype=float32)

In [156]:
((pred>0).squeeze() == (_y>0)).sum() / len(_y)

0.6484833333333333