configs.py

In [2]:

GPU_MEMORY_FRAC = 0.95
N_SHOT = 0

N_ANCHOR = 100

PRE_RANK = 5
PRE_LEARNING_RATE = 2e-4
PRE_LAMBDA = 10

RANK = 20
LEARNING_RATE = 1e-2
LAMBDA = 1e-3
BATCH_SIZE = 1000

USE_CACHE = True

In [None]:
anchor.py 

In [3]:
import random

import numpy as np
from sklearn.preprocessing import normalize




def _init_anchor_points(train_data, row_k, col_k):
    train_user_ids = train_data[:, 0].astype(np.int64)
    train_item_ids = train_data[:, 1].astype(np.int64)

    anchor_idxs = []
    while len(anchor_idxs) < N_ANCHOR:
        anchor_idx = random.randint(0, train_data.shape[0] - 1)
        if anchor_idx in anchor_idxs:
            continue

        anchor_row = train_data[anchor_idx]
        user_id = int(anchor_row[0])
        item_id = int(anchor_row[1])

        k = np.multiply(row_k[user_id][train_user_ids],
                        col_k[item_id][train_item_ids])
        sum_a_of_anchor = np.sum(k)
        if sum_a_of_anchor < 1:
            continue

        print('>> %10d\t%d' % (anchor_idx, sum_a_of_anchor))
        anchor_idxs.append(anchor_idx)

    return anchor_idxs


def _get_distance_matrix(latent):
    _normalized_latent = normalize(latent, axis=1)
    # print(_normalized_latent.shape)

    cos = np.matmul(_normalized_latent, _normalized_latent.T)
    cos = np.clip(cos, -1, 1)
    d = np.arccos(cos)
    assert np.count_nonzero(np.isnan(d)) == 0
    return d


def _get_k_from_distance(d):
    m = np.zeros(d.shape)
    m[d < 0.8] = 1
    return np.multiply(np.subtract(np.ones(d.shape), np.square(d)), m)


def _get_ks_from_latents(row_latent, col_latent):

    # for i in range(row_latent.shape[0]):
    #     print(row_latent[i][:4])
    #
    # assert False
    row_d = _get_distance_matrix(row_latent)
    col_d = _get_distance_matrix(col_latent)

    row_k = _get_k_from_distance(row_d)
    col_k = _get_k_from_distance(col_d)

    return row_k, col_k


class AnchorManager:
    def __init__(
            self,
            session,
            models,
            batch_manager,
            row_latent_init,
            col_latent_init, ):

        train_data = batch_manager.train_data

        row_latent = row_latent_init
        col_latent = col_latent_init

        row_k, col_k = _get_ks_from_latents(row_latent, col_latent)

        anchor_idxs = _init_anchor_points(train_data, row_k, col_k)
        assert len(anchor_idxs) == N_ANCHOR
        # print(anchor_idxs)
        anchor_points = train_data[anchor_idxs]

        self.train_data = train_data
        self.valid_data = batch_manager.valid_data
        self.test_data = batch_manager.test_data

        self.anchor_idxs = anchor_idxs
        self.anchor_points = anchor_points

        self.row_k = row_k
        self.col_k = col_k

    def _get_k(self, anchor_idx, data):
        row_k = self.row_k
        col_k = self.col_k
        anchor_point = self.anchor_points[anchor_idx]

        user_id = int(anchor_point[0])
        item_id = int(anchor_point[1])

        user_ids = data[:, 0].astype(np.int64)
        item_ids = data[:, 1].astype(np.int64)

        return np.multiply(row_k[user_id][user_ids], col_k[item_id][item_ids])

    def get_train_k(self, anchor_idx):
        return self._get_k(anchor_idx, self.train_data)

    def get_valid_k(self, anchor_idx):
        return self._get_k(anchor_idx, self.valid_data)

    def get_test_k(self, anchor_idx):
        return self._get_k(anchor_idx, self.test_data)


base-dataset

In [5]:
import os
import random

import numpy as np

# from ..configs import *


def _make_dir_if_not_exists(path):
    if not os.path.exists(path):
        os.mkdir(path)


class DatasetManager:
    KIND_MOVIELENS_100K = 'movielens-100k'
    KIND_MOVIELENS_1M = 'movielens-1m'
    KIND_MOVIELENS_10M = 'movielens-10m'
    KIND_MOVIELENS_20M = 'movielens-20m'
    KIND_NETFLIX = 'netflix'

    KIND_OBJECTS = ( \
        (KIND_MOVIELENS_100K, 'http://files.grouplens.org/datasets/movielens/ml-100k.zip'), \
        (KIND_MOVIELENS_1M,  'http://files.grouplens.org/datasets/movielens/ml-1m.zip'), \
        (KIND_MOVIELENS_10M, 'http://files.grouplens.org/datasets/movielens/ml-10m.zip'), \
        (KIND_MOVIELENS_20M, 'http://files.grouplens.org/datasets/movielens/ml-20m.zip'), \
        (KIND_NETFLIX, None)
    )

    def _set_kind_and_url(self, kind):
        self.kind = kind
        for k, url in self.KIND_OBJECTS:
            if k == kind:
                self.url = url
                return True
        raise NotImplementedError()

    def _download_data_if_not_exists(self):
        if not os.path.exists('data/{}'.format(self.kind)):
            os.system('wget {url} -O data/{kind}.zip'.format(
                url=self.url, kind=self.kind))
            os.system(
                'unzip data/{kind}.zip -d data/{kind}/'.format(kind=self.kind))

    def __init_data(self, detail_path, delimiter, header=False):
        current_u = 0
        u_dict = {}
        current_i = 0
        i_dict = {}

        data = []
        with open('data/{}{}'.format(self.kind, detail_path), 'r') as f:
            if header:
                f.readline()

            for line in f:
                cols = line.strip().split(delimiter)
                assert len(cols) == 4
                # cols = [float(c) for c in cols]
                user_id = cols[0]
                item_id = cols[1]
                r = float(cols[2])
                t = int(cols[3])

                u = u_dict.get(user_id, None)
                if u is None:
                    u_dict[user_id] = current_u
                    u = current_u
                    current_u += 1

                i = i_dict.get(item_id, None)
                if i is None:
                    # print(current_i)
                    i_dict[item_id] = current_i
                    i = current_i
                    current_i += 1

                data.append((u, i, r, t))
            f.close()

        data = np.array(data)
        np.save('data/{}/data.npy'.format(self.kind), data)

    def _init_data(self):
        if self.kind == self.KIND_MOVIELENS_100K:
            self.__init_data('/ml-100k/u.data', '\t')
        elif self.kind == self.KIND_MOVIELENS_1M:
            self.__init_data('/ml-1m/ratings.dat', '::')
        elif self.kind == self.KIND_MOVIELENS_10M:
            self.__init_data('/ml-10M100K/ratings.dat', '::')
        elif self.kind == self.KIND_MOVIELENS_20M:
            self.__init_data('/ml-20m/ratings.csv', ',', header=True)
        else:
            raise NotImplementedError()

    def _load_base_data(self):
        return np.load('data/{}/data.npy'.format(self.kind))

    def _split_data(self):
        data = self.data
        n_shot = self.n_shot
        np.random.shuffle(data)

        if self.n_shot == -1:
            # n_shot이 -1일때는 더 sparse하게 전체 레이팅을 9:1로 test train set을 나눈다.
            n_train = int(data.shape[0] * 0.1)
            n_valid = int(n_train * 0.9)

            train_data = data[:n_valid]
            valid_data = data[n_valid:n_train]
            test_data = data[n_train:]

            np.save(self._get_npy_path('train'), train_data)
            np.save(self._get_npy_path('valid'), valid_data)
            np.save(self._get_npy_path('test'), test_data)

        elif self.n_shot == 0:
            # n_shot이 0일때는 다른 알고리즘들처럼 전체 레이팅을 1:9로 test train set을 나눈다.
            n_train = int(data.shape[0] * 0.9)
            n_valid = int(n_train * 0.98)

            train_data = data[:n_valid]
            valid_data = data[n_valid:n_train]
            test_data = data[n_train:]

            np.save(self._get_npy_path('train'), train_data)
            np.save(self._get_npy_path('valid'), valid_data)
            np.save(self._get_npy_path('test'), test_data)

        else:
            # 전체 유저 중에 20%를 일단 test user로 뗍니다.
            test_user_ids = random.sample(
                list(range(self.n_user)), self.n_user // 5)

            train_data = []
            test_data = []
            count_dict = {}
            for i in range(data.shape[0]):
                row = data[i]
                user_id = int(row[0])
                if user_id in test_user_ids:
                    count = count_dict.get(user_id, 0)
                    if count < n_shot:
                        train_data.append(row)
                    else:
                        test_data.append(row)
                    count_dict[user_id] = count + 1
                else:
                    train_data.append(row)

            train_data = np.array(train_data)
            n_valid = int(train_data.shape[0] * 0.98)
            train_data, valid_data = train_data[:n_valid], train_data[n_valid:]

            np.save(self._get_npy_path('train'), train_data)
            np.save(self._get_npy_path('valid'), valid_data)

            test_data = np.array(test_data)
            np.save(self._get_npy_path('test'), test_data)

    def _get_npy_path(self, split_kind):
        return 'data/{}/shot-{}/{}.npy'.format(self.kind, self.n_shot,
                                               split_kind)

    def __init__(self, kind, n_shot=0):
        assert type(n_shot) == int and n_shot >= -1

        _make_dir_if_not_exists('data')
        self._set_kind_and_url(kind)
        self._download_data_if_not_exists()
        self.n_shot = n_shot

        # 예쁜 형태로 정제된 npy 파일이 없으면, 정제를 수행합니다.
        if not os.path.exists('data/{}/data.npy'.format(kind)):
            self._init_data()
        self.data = self._load_base_data()

        _make_dir_if_not_exists(
            'data/{}/shot-{}'.format(self.kind, self.n_shot))

        self.n_user = int(np.max(self.data[:, 0])) + 1
        self.n_item = int(np.max(self.data[:, 1])) + 1
        self.n_row = self.n_user
        self.n_col = self.n_item

        # split된 데이터가 없으면 split합니다.
        if not os.path.exists(
                self._get_npy_path('train')) or not os.path.exists(
                    self._get_npy_path('valid')) or not os.path.exists(
                        self._get_npy_path('test')):
            self._split_data()

        self.train_data = np.load(self._get_npy_path('train'))
        self.valid_data = np.load(self._get_npy_path('valid'))
        self.test_data = np.load(self._get_npy_path('test'))

    def get_train_data(self):
        return self.train_data

    def get_valid_data(self):
        return self.valid_data

    def get_test_data(self):
        return self.test_data


# if __name__ == '__main__':
#     kind = DatasetManager.KIND_MOVIELENS_100K
#     kind = DatasetManager.KIND_MOVIELENS_1M
#     kind = DatasetManager.KIND_MOVIELENS_10M
#     kind = DatasetManager.KIND_MOVIELENS_20M
#     dataset_manager = DatasetManager(kind)

batch.py

In [7]:
import numpy as np



class BatchManager:
    def __init__(self, kind):
        self.kind = kind
        dataset_manager = DatasetManager(kind, N_SHOT)
        self.train_data = dataset_manager.get_train_data()
        self.valid_data = dataset_manager.get_valid_data()
        self.test_data = dataset_manager.get_test_data()

        self.n_user = int(
            max(
                np.max(self.train_data[:, 0]),
                np.max(self.valid_data[:, 0]), np.max(self.test_data[:,
                                                                     0]))) + 1
        self.n_item = int(
            max(
                np.max(self.train_data[:, 1]),
                np.max(self.valid_data[:, 1]), np.max(self.test_data[:,
                                                                     1]))) + 1
        self.mu = np.mean(self.train_data[:, 2])
        self.std = np.std(self.train_data[:, 2])

base/rprop.py 

In [9]:
"""
    RProp (Resilient Backpropagation) for TensorFlow.
    This code is forked form "https://raw.githubusercontent.com/dirkweissenborn/genie-kb/master/rprop.py".
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import tensorflow as tf
from tensorflow.python.framework import ops
from tensorflow.python.training import optimizer


class RPropOptimizer(optimizer.Optimizer):
    """
        Optimizer that implements the RProp algorithm.
    """

    def __init__(self,
                 stepsize=0.1,
                 etaplus=1.2,
                 etaminus=0.5,
                 stepsizemax=50.0,
                 stepsizemin=1e-6,
                 use_locking=False,
                 name="RProp"):
        super(RPropOptimizer, self).__init__(use_locking, name)
        self._stepsize = stepsize
        self._etaplus = etaplus
        self._etaminus = etaminus
        self._stepsizemax = stepsizemax
        self._stepsizemin = stepsizemin

    def _create_slots(self, var_list):
        '''
        :param var_list:
        :return:
        '''
        # Create the beta1 and beta2 accumulators on the same device as the first
        # variable.

        # Create slots for the first and second moments.
        for v in var_list:
            self._get_or_make_slot(
                v,
                tf.ones([v.get_shape().num_elements()], dtype=tf.float32) *
                self._stepsize,
                "step",
                self._name, )
            self._get_or_make_slot(
                v,
                tf.zeros([v.get_shape().num_elements()], dtype=tf.float32),
                "delta",
                self._name, )
            self._get_or_make_slot(
                v,
                tf.zeros([v.get_shape().num_elements()], dtype=tf.float32),
                "grad",
                self._name, )

    def _apply_dense(self, grad, var):
        grad_slot = self.get_slot(var, "grad")
        step_slot = self.get_slot(var, "step")
        delta_slot = self.get_slot(var, "delta")

        grad = tf.reshape(grad, [-1])
        sign = tf.cast(tf.sign(grad_slot * grad), tf.int64)
        with tf.control_dependencies([sign]):
            grad = grad_slot.assign(grad)

            p_indices = tf.where(tf.equal(sign, 1))  # positive indices
            m_indices = tf.where(tf.equal(sign, -1))  # minus indices
            z_indices = tf.where(tf.equal(sign, 0))  # zero indices

        step_p_update = tf.expand_dims(
            tf.minimum(
                tf.gather_nd(step_slot, p_indices) * self._etaplus,
                self._stepsizemax), 1)
        step_m_update = tf.expand_dims(
            tf.maximum(
                tf.gather_nd(step_slot, m_indices) * self._etaminus,
                self._stepsizemin), 1)
        step_z_update = tf.expand_dims(tf.gather_nd(step_slot, z_indices), 1)
        with tf.control_dependencies(
            [step_p_update, step_m_update, step_z_update]):
            step = tf.scatter_update(step_slot, p_indices, step_p_update)
            step = tf.scatter_update(step, m_indices, step_m_update)
            step = tf.scatter_update(step, z_indices, step_z_update)
            step = step_slot.assign(step)

        delta_p_update = tf.expand_dims(
            tf.gather_nd(tf.sign(grad) * step, p_indices), 1)
        delta_z_update = tf.expand_dims(
            tf.gather_nd(tf.sign(grad) * step, z_indices), 1)
        with tf.control_dependencies([delta_p_update, delta_z_update]):
            delta = tf.scatter_update(delta_slot, p_indices, delta_p_update)
            delta = tf.scatter_update(delta, z_indices, delta_z_update)
            delta = delta_slot.assign(delta)

        with tf.control_dependencies([sign]):
            grad = tf.scatter_update(grad, m_indices,
                                     tf.zeros_like(m_indices, tf.float32))
            grad = grad_slot.assign(grad)

        up = tf.reshape(delta, var.get_shape())
        var_update = var.assign_sub(up, use_locking=self._use_locking)

        return tf.group(*[var_update, step, delta, grad])

    def _apply_sparse(self, grad, var):
        raise NotImplementedError("RProp should be used only in batch_mode.")

base-train.py

In [10]:
import math

import tensorflow as tf


def cosine_decay_learning_rate(learning_rate,
                               global_step,
                               decay_steps=200,
                               alpha=0.01):
    # tensorflow==1.4.0에서 못쓰니까 구현.
    global_step = tf.cast(global_step, tf.int64)
    step = tf.cast(tf.mod(global_step, decay_steps), tf.float32)
    cosine_decay = 0.5 * (1.0 + tf.cos(math.pi * step / decay_steps))
    decayed = (1 - alpha) * cosine_decay + alpha
    return learning_rate * decayed

local.py

In [11]:
import time
import math

import numpy as np



class LocalModel:
    def __init__(self, session, models, anchor_idx, anchor_manager,
                 batch_manager):
        self.session = session
        self.models = models
        self.batch_manager = batch_manager
        self.anchor_idx = anchor_idx
        self.anchor_manager = anchor_manager

        print('>> update k in anchor_idx [{}].'.format(anchor_idx))
        self.train_k = anchor_manager.get_train_k(anchor_idx)
        self.valid_k = anchor_manager.get_valid_k(anchor_idx)
        self.test_k = anchor_manager.get_test_k(anchor_idx)

model.py


In [12]:
import math

import tensorflow as tf

import numpy as np



def _create_p_or_q_variable(n, rank, batch_manager):
    mu = batch_manager.mu
    std = batch_manager.std

    _mu = math.sqrt(mu / rank)
    _std = math.sqrt((math.sqrt(mu * mu + std * std) - mu) / rank)
    return tf.Variable(
        tf.truncated_normal([n, rank], _mu, _std, dtype=tf.float64))


def init_models_for_pre_train(batch_manager):
    n_row, n_col = batch_manager.n_user, batch_manager.n_item

    u = tf.placeholder(tf.int64, [None], name='u')
    i = tf.placeholder(tf.int64, [None], name='i')
    r = tf.placeholder(tf.float64, [None], name='r')

    # init weights
    mu = batch_manager.mu
    std = batch_manager.std
    p = _create_p_or_q_variable(n_row, PRE_RANK, batch_manager)
    q = _create_p_or_q_variable(n_col, PRE_RANK, batch_manager)

    p_lookup = tf.nn.embedding_lookup(p, u)
    q_lookup = tf.nn.embedding_lookup(q, i)
    r_hat = tf.reduce_sum(tf.multiply(p_lookup, q_lookup), 1)

    reg_loss = tf.add_n(
        [tf.reduce_sum(tf.square(p)),
         tf.reduce_sum(tf.square(q))])
    loss = tf.reduce_sum(tf.square(r - r_hat)) + PRE_LAMBDA * reg_loss
    rmse = tf.sqrt(tf.reduce_mean(tf.square(r - r_hat)))

    optimizer = tf.train.MomentumOptimizer(PRE_LEARNING_RATE, 0.9)
    # optimizer = tf.train.GradientDescentOptimizer(PRE_LEARNING_RATE)
    train_ops = [
        optimizer.minimize(loss, var_list=[p]),
        optimizer.minimize(loss, var_list=[q])
    ]

    return {
        'u': u,
        'i': i,
        'r': r,
        'train_ops': train_ops,
        'loss': loss,
        'rmse': rmse,
        'p': p,
        'q': q,
    }


def _get_train_op(optimizer, loss, var_list):
    gvs = optimizer.compute_gradients(loss, var_list=var_list)
    # capped_gvs = [(tf.clip_by_value(grad, -100.0, 100.0), var)
    #               for grad, var in gvs]
    capped_gvs = gvs
    train_op = optimizer.apply_gradients(capped_gvs)
    return train_op


def init_models(batch_manager):
    n_row, n_col = batch_manager.n_user, batch_manager.n_item

    u = tf.placeholder(tf.int64, [None], name='u')
    i = tf.placeholder(tf.int64, [None], name='i')
    r = tf.placeholder(tf.float64, [None], name='r')
    k = tf.placeholder(tf.float64, [None, N_ANCHOR], name='k')
    k_sum = tf.reduce_sum(k, axis=1)

    # init weights
    ps, qs, losses, r_hats = [], [], [], []
    for anchor_idx in range(N_ANCHOR):
        p = _create_p_or_q_variable(n_row, RANK, batch_manager)
        q = _create_p_or_q_variable(n_col, RANK, batch_manager)
        ps.append(p)
        qs.append(q)

        p_lookup = tf.nn.embedding_lookup(p, u)
        q_lookup = tf.nn.embedding_lookup(q, i)
        r_hat = tf.reduce_sum(tf.multiply(p_lookup, q_lookup), axis=1)
        r_hats.append(r_hat)

    r_hat = tf.reduce_sum(tf.multiply(k, tf.stack(r_hats, axis=1)), axis=1)
    r_hat = tf.where(tf.greater(k_sum, 1e-2), r_hat, tf.ones_like(r_hat) * 3)
    rmse = tf.sqrt(tf.reduce_mean(tf.square(r - r_hat)))

    optimizer = tf.train.GradientDescentOptimizer(LEARNING_RATE)
    loss = tf.reduce_sum(tf.square(r_hat - r)) + LAMBDA * tf.reduce_sum(
        [tf.reduce_sum(tf.square(p_or_q)) for p_or_q in ps + qs])
    train_ops = [
        _get_train_op(optimizer, loss, [p, q]) for p, q in zip(ps, qs)
    ]

    return {
        'u': u,
        'i': i,
        'r': r,
        'k': k,
        'train_ops': train_ops,
        'rmse': rmse,
    }

pre_trainer.py

In [13]:
import os
import time
import math
import random

import tensorflow as tf
import numpy as np



def _validate(session, batch_manager, models):
    valid_rmse = session.run(
        models['rmse'],
        feed_dict={
            models['u']: batch_manager.valid_data[:, 0],
            models['i']: batch_manager.valid_data[:, 1],
            models['r']: batch_manager.valid_data[:, 2]
        })

    test_rmse = session.run(
        models['rmse'],
        feed_dict={
            models['u']: batch_manager.test_data[:, 0],
            models['i']: batch_manager.test_data[:, 1],
            models['r']: batch_manager.test_data[:, 2]
        })

    return valid_rmse, test_rmse


def get_p_and_q(kind, use_cache=True):
    if use_cache:
        try:
            p = np.load('llorma_g/{}-p.npy'.format(kind))
            q = np.load('llorma_g/{}-q.npy'.format(kind))
            return p, q
        except:
            print('>> There is no cached p and q.')

    batch_manager = BatchManager(kind)
    models = init_models_for_pre_train(batch_manager)

    session = tf.Session()
    session.run(tf.global_variables_initializer())

    min_valid_rmse = float("Inf")
    min_valid_iter = 0
    final_test_rmse = float("Inf")

    random_model_idx = random.randint(0, 1000000)

    file_path = "tmp/model-{}.ckpt".format(random_model_idx)

    u = batch_manager.train_data[:, 0]
    i = batch_manager.train_data[:, 1]
    r = batch_manager.train_data[:, 2]

    saver = tf.train.Saver()
    for iter in range(1000000):
        for train_op in models['train_ops']:
            _, loss, train_rmse = session.run(
                (train_op, models['loss'], models['rmse']),
                feed_dict={models['u']: u,
                           models['i']: i,
                           models['r']: r})

        valid_rmse, test_rmse = _validate(session, batch_manager, models)

        if valid_rmse < min_valid_rmse:
            min_valid_rmse = valid_rmse
            min_valid_iter = iter
            final_test_rmse = test_rmse
            saver.save(session, file_path)

        if iter >= min_valid_iter + 100:
            break

        print('>> ITER:',
              "{:3d}".format(iter), "{:3f}, {:3f} {:3f} / {:3f}".format(
                  train_rmse, valid_rmse, test_rmse, final_test_rmse))

    saver.restore(session, file_path)
    p, q = session.run(
        (models['p'], models['q']),
        feed_dict={
            models['u']: batch_manager.train_data[:, 0],
            models['i']: batch_manager.train_data[:, 1],
            models['r']: batch_manager.train_data[:, 2]
        })
    np.save('llorma_g/{}-p.npy'.format(kind), p)
    np.save('llorma_g/{}-q.npy'.format(kind), q)

    session.close()
    return p, q

trainer.py

In [14]:
import os
import time
import math
import random

import tensorflow as tf
import numpy as np

#from . import pre_trainer
#from .anchor import AnchorManager
#from .batch import BatchManager
#from .configs import *
#from .local import LocalModel
#from .model import init_models


def __init_session():
    # gpu_options = tf.GPUOptions(
    #     per_process_gpu_memory_fraction=GPU_MEMORY_FRAC)
    # gpu_config = tf.ConfigProto(gpu_options=gpu_options)
    # session = tf.Session(config=gpu_config)

    config = tf.ConfigProto()
    config.gpu_options.allow_growth = True

    session = tf.Session(config=config)
    session.run(tf.global_variables_initializer())
    return session


def _get_k(local_models, kind='train'):
    k = np.stack(
        [
            getattr(local_model, '{}_k'.format(kind))
            for local_model in local_models
        ],
        axis=1)
    k = np.clip(k, 0.0, 1.0)
    k = np.divide(k, np.sum(k, axis=1, keepdims=1))
    k[np.isnan(k)] = 0
    return k


def _validate(
        session,
        models,
        batch_manager,
        valid_k,
        test_k, ):
    valid_rmse = session.run(
        models['rmse'],
        feed_dict={
            models['u']: batch_manager.valid_data[:, 0],
            models['i']: batch_manager.valid_data[:, 1],
            models['r']: batch_manager.valid_data[:, 2],
            models['k']: valid_k,
        })

    test_rmse = session.run(
        models['rmse'],
        feed_dict={
            models['u']: batch_manager.test_data[:, 0],
            models['i']: batch_manager.test_data[:, 1],
            models['r']: batch_manager.test_data[:, 2],
            models['k']: test_k,
        })

    return valid_rmse, test_rmse


def _train(kind):
    row_latent_init, col_latent_init = pre_trainer.get_p_and_q(
        kind, use_cache=USE_CACHE)

    batch_manager = BatchManager(kind)
    models = init_models(batch_manager)

    session = __init_session()
    anchor_manager = AnchorManager(
        session,
        models,
        batch_manager,
        row_latent_init,
        col_latent_init, )
    local_models = [
        LocalModel(session, models, anchor_idx, anchor_manager, batch_manager)
        for anchor_idx in range(N_ANCHOR)
    ]

    train_k = _get_k(local_models, kind='train')
    valid_k = _get_k(local_models, kind='valid')
    test_k = _get_k(local_models, kind='test')

    min_valid_rmse = float("Inf")
    min_valid_iter = 0
    final_test_rmse = float("Inf")
    start_time = time.time()

    batch_rmses = []
    train_data = batch_manager.train_data

    for iter in range(10000000):
        for m in range(0, train_data.shape[0], BATCH_SIZE):
            end_m = min(m + BATCH_SIZE, train_data.shape[0])
            u = train_data[m:end_m, 0]
            i = train_data[m:end_m, 1]
            r = train_data[m:end_m, 2]
            k = train_k[m:end_m, :]
            results = session.run(
                [models['rmse']] + models['train_ops'],
                feed_dict={
                    models['u']: u,
                    models['i']: i,
                    models['r']: r,
                    models['k']: k,
                })
            batch_rmses.append(results[0])

            if m % (BATCH_SIZE * 100) == 0:
                print('  - ', results[:1])

        if iter % 1 == 0:
            valid_rmse, test_rmse = _validate(session, models, batch_manager,
                                              valid_k, test_k)
            if valid_rmse < min_valid_rmse:
                min_valid_rmse = valid_rmse
                min_valid_iter = iter
                final_test_rmse = test_rmse

            batch_rmse = sum(batch_rmses) / len(batch_rmses)
            batch_rmses = []
            print('  - ITER{:4d}:'.format(iter),
                  "{:.5f}, {:.5f} {:.5f} / {:.5f}".format(
                      batch_rmse, valid_rmse, test_rmse, final_test_rmse))


def main(kind):
    _train(kind)

base/memory_saving_gradients.py

In [16]:
!pip install toposort

Collecting toposort
  Downloading https://files.pythonhosted.org/packages/e9/8a/321cd8ea5f4a22a06e3ba30ef31ec33bea11a3443eeb1d89807640ee6ed4/toposort-1.5-py2.py3-none-any.whl
Installing collected packages: toposort
Successfully installed toposort-1.5


In [17]:

from toposort import toposort
import contextlib
import numpy as np
import tensorflow as tf
import tensorflow.contrib.graph_editor as ge
import time
import sys
sys.setrecursionlimit(10000)
# refers back to current module if we decide to split helpers out
util = sys.modules[__name__]

# getting rid of "WARNING:tensorflow:VARIABLES collection name is deprecated"
setattr(tf.GraphKeys, "VARIABLES", "variables")

# save original gradients since tf.gradient could be monkey-patched to point
# to our version
from tensorflow.python.ops import gradients as tf_gradients_lib
tf_gradients = tf_gradients_lib.gradients

MIN_CHECKPOINT_NODE_SIZE=1024    # use lower value during testing

# specific versions we can use to do process-wide replacement of tf.gradients
def gradients_speed(ys, xs, grad_ys=None, **kwargs):
    return gradients(ys, xs, grad_ys, checkpoints='speed', **kwargs)

def gradients_memory(ys, xs, grad_ys=None, **kwargs):
    return gradients(ys, xs, grad_ys, checkpoints='memory', **kwargs)

def gradients_collection(ys, xs, grad_ys=None, **kwargs):
    return gradients(ys, xs, grad_ys, checkpoints='collection', **kwargs)

def gradients(ys, xs, grad_ys=None, checkpoints='collection', **kwargs):
    '''
    Authors: Tim Salimans & Yaroslav Bulatov
    memory efficient gradient implementation inspired by "Training Deep Nets with Sublinear Memory Cost"
    by Chen et al. 2016 (https://arxiv.org/abs/1604.06174)
    ys,xs,grad_ys,kwargs are the arguments to standard tensorflow tf.gradients
    (https://www.tensorflow.org/versions/r0.12/api_docs/python/train.html#gradients)
    'checkpoints' can either be
        - a list consisting of tensors from the forward pass of the neural net
          that we should re-use when calculating the gradients in the backward pass
          all other tensors that do not appear in this list will be re-computed
        - a string specifying how this list should be determined. currently we support
            - 'speed':  checkpoint all outputs of convolutions and matmuls. these ops are usually the most expensive,
                        so checkpointing them maximizes the running speed
                        (this is a good option if nonlinearities, concats, batchnorms, etc are taking up a lot of memory)
            - 'memory': try to minimize the memory usage
                        (currently using a very simple strategy that identifies a number of bottleneck tensors in the graph to checkpoint)
            - 'collection': look for a tensorflow collection named 'checkpoints', which holds the tensors to checkpoint
    '''

    #    print("Calling memsaving gradients with", checkpoints)
    if not isinstance(ys,list):
        ys = [ys]
    if not isinstance(xs,list):
        xs = [xs]

    bwd_ops = ge.get_backward_walk_ops([y.op for y in ys],
                                       inclusive=True)

    debug_print("bwd_ops: %s", bwd_ops)

    # forward ops are all ops that are candidates for recomputation
    fwd_ops = ge.get_forward_walk_ops([x.op for x in xs],
                                      inclusive=True,
                                      within_ops=bwd_ops)
    debug_print("fwd_ops: %s", fwd_ops)

    # exclude ops with no inputs
    fwd_ops = [op for op in fwd_ops if op.inputs]

    # don't recompute xs, remove variables
    xs_ops = _to_ops(xs)
    fwd_ops = [op for op in fwd_ops if not op in xs_ops]
    fwd_ops = [op for op in fwd_ops if not '/assign' in op.name]
    fwd_ops = [op for op in fwd_ops if not '/Assign' in op.name]
    fwd_ops = [op for op in fwd_ops if not '/read' in op.name]
    ts_all = ge.filter_ts(fwd_ops, True) # get the tensors
    ts_all = [t for t in ts_all if '/read' not in t.name]
    ts_all = set(ts_all) - set(xs) - set(ys)

    # construct list of tensors to checkpoint during forward pass, if not
    # given as input
    if type(checkpoints) is not list:
        if checkpoints == 'collection':
            checkpoints = tf.get_collection('checkpoints')

        elif checkpoints == 'speed':
            # checkpoint all expensive ops to maximize running speed
            checkpoints = ge.filter_ts_from_regex(fwd_ops, 'conv2d|Conv|MatMul')

        elif checkpoints == 'memory':

            # remove very small tensors and some weird ops
            def fixdims(t): # tf.Dimension values are not compatible with int, convert manually
                try:
                    return [int(e if e.value is not None else 64) for e in t]
                except:
                    return [0]  # unknown shape
            ts_all = [t for t in ts_all if np.prod(fixdims(t.shape)) > MIN_CHECKPOINT_NODE_SIZE]
            ts_all = [t for t in ts_all if 'L2Loss' not in t.name]
            ts_all = [t for t in ts_all if 'entropy' not in t.name]
            ts_all = [t for t in ts_all if 'FusedBatchNorm' not in t.name]
            ts_all = [t for t in ts_all if 'Switch' not in t.name]
            ts_all = [t for t in ts_all if 'dropout' not in t.name]
            # DV: FP16_FIX - need to add 'Cast' layer here to make it work for FP16
            ts_all = [t for t in ts_all if 'Cast' not in t.name]

            # filter out all tensors that are inputs of the backward graph
            with util.capture_ops() as bwd_ops:
                tf_gradients(ys, xs, grad_ys, **kwargs)

            bwd_inputs = [t for op in bwd_ops for t in op.inputs]
            # list of tensors in forward graph that is in input to bwd graph
            ts_filtered = list(set(bwd_inputs).intersection(ts_all))
            debug_print("Using tensors %s", ts_filtered)

            # try two slightly different ways of getting bottlenecks tensors
            # to checkpoint
            for ts in [ts_filtered, ts_all]:

                # get all bottlenecks in the graph
                bottleneck_ts = []
                for t in ts:
                    b = set(ge.get_backward_walk_ops(t.op, inclusive=True, within_ops=fwd_ops))
                    f = set(ge.get_forward_walk_ops(t.op, inclusive=False, within_ops=fwd_ops))
                    # check that there are not shortcuts
                    b_inp = set([inp for op in b for inp in op.inputs]).intersection(ts_all)
                    f_inp = set([inp for op in f for inp in op.inputs]).intersection(ts_all)
                    if not set(b_inp).intersection(f_inp) and len(b_inp)+len(f_inp) >= len(ts_all):
                        bottleneck_ts.append(t)  # we have a bottleneck!
                    else:
                        debug_print("Rejected bottleneck candidate and ops %s", [t] + list(set(ts_all) - set(b_inp) - set(f_inp)))

                # success? or try again without filtering?
                if len(bottleneck_ts) >= np.sqrt(len(ts_filtered)): # yes, enough bottlenecks found!
                    break

            if not bottleneck_ts:
                raise Exception('unable to find bottleneck tensors! please provide checkpoint nodes manually, or use checkpoints="speed".')

            # sort the bottlenecks
            bottlenecks_sorted_lists = tf_toposort(bottleneck_ts, within_ops=fwd_ops)
            sorted_bottlenecks = [t for ts in bottlenecks_sorted_lists for t in ts]

            # save an approximately optimal number ~ sqrt(N)
            N = len(ts_filtered)
            if len(bottleneck_ts) <= np.ceil(np.sqrt(N)):
                checkpoints = sorted_bottlenecks
            else:
                step = int(np.ceil(len(bottleneck_ts) / np.sqrt(N)))
                checkpoints = sorted_bottlenecks[step::step]

        else:
            raise Exception('%s is unsupported input for "checkpoints"' % (checkpoints,))

    checkpoints = list(set(checkpoints).intersection(ts_all))

    # at this point automatic selection happened and checkpoints is list of nodes
    assert isinstance(checkpoints, list)

    debug_print("Checkpoint nodes used: %s", checkpoints)
    # better error handling of special cases
    # xs are already handled as checkpoint nodes, so no need to include them
    xs_intersect_checkpoints = set(xs).intersection(set(checkpoints))
    if xs_intersect_checkpoints:
        debug_print("Warning, some input nodes are also checkpoint nodes: %s",
                    xs_intersect_checkpoints)
    ys_intersect_checkpoints = set(ys).intersection(set(checkpoints))
    debug_print("ys: %s, checkpoints: %s, intersect: %s", ys, checkpoints,
                ys_intersect_checkpoints)
    # saving an output node (ys) gives no benefit in memory while creating
    # new edge cases, exclude them
    if ys_intersect_checkpoints:
        debug_print("Warning, some output nodes are also checkpoints nodes: %s",
              format_ops(ys_intersect_checkpoints))

    # remove initial and terminal nodes from checkpoints list if present
    checkpoints = list(set(checkpoints) - set(ys) - set(xs))

    # check that we have some nodes to checkpoint
    if not checkpoints:
        raise Exception('no checkpoints nodes found or given as input! ')

    # disconnect dependencies between checkpointed tensors
    checkpoints_disconnected = {}
    for x in checkpoints:
        if x.op and x.op.name is not None:
            grad_node = tf.stop_gradient(x, name=x.op.name+"_sg")
        else:
            grad_node = tf.stop_gradient(x)
        checkpoints_disconnected[x] = grad_node

    # partial derivatives to the checkpointed tensors and xs
    ops_to_copy = fast_backward_ops(seed_ops=[y.op for y in ys],
                                    stop_at_ts=checkpoints, within_ops=fwd_ops)
    debug_print("Found %s ops to copy within fwd_ops %s, seed %s, stop_at %s",
                    len(ops_to_copy), fwd_ops, [r.op for r in ys], checkpoints)
    debug_print("ops_to_copy = %s", ops_to_copy)
    debug_print("Processing list %s", ys)
    copied_sgv, info = ge.copy_with_input_replacements(ge.sgv(ops_to_copy), {})
    for origin_op, op in info._transformed_ops.items():
        op._set_device(origin_op.node_def.device)
    copied_ops = info._transformed_ops.values()
    debug_print("Copied %s to %s", ops_to_copy, copied_ops)
    ge.reroute_ts(checkpoints_disconnected.values(), checkpoints_disconnected.keys(), can_modify=copied_ops)
    debug_print("Rewired %s in place of %s restricted to %s",
                checkpoints_disconnected.values(), checkpoints_disconnected.keys(), copied_ops)

    # get gradients with respect to current boundary + original x's
    copied_ys = [info._transformed_ops[y.op]._outputs[0] for y in ys]
    boundary = list(checkpoints_disconnected.values())
    dv = tf_gradients(ys=copied_ys, xs=boundary+xs, grad_ys=grad_ys, **kwargs)
    debug_print("Got gradients %s", dv)
    debug_print("for %s", copied_ys)
    debug_print("with respect to %s", boundary+xs)

    inputs_to_do_before = [y.op for y in ys]
    if grad_ys is not None:
        inputs_to_do_before += grad_ys
    wait_to_do_ops = list(copied_ops) + [g.op for g in dv if g is not None]
    my_add_control_inputs(wait_to_do_ops, inputs_to_do_before)

    # partial derivatives to the checkpointed nodes
    # dictionary of "node: backprop" for nodes in the boundary
    d_checkpoints = {r: dr for r,dr in zip(checkpoints_disconnected.keys(),
                                        dv[:len(checkpoints_disconnected)])}
    # partial derivatives to xs (usually the params of the neural net)
    d_xs = dv[len(checkpoints_disconnected):]

    # incorporate derivatives flowing through the checkpointed nodes
    checkpoints_sorted_lists = tf_toposort(checkpoints, within_ops=fwd_ops)
    for ts in checkpoints_sorted_lists[::-1]:
        debug_print("Processing list %s", ts)
        checkpoints_other = [r for r in checkpoints if r not in ts]
        checkpoints_disconnected_other = [checkpoints_disconnected[r] for r in checkpoints_other]

        # copy part of the graph below current checkpoint node, stopping at
        # other checkpoints nodes
        ops_to_copy = fast_backward_ops(within_ops=fwd_ops, seed_ops=[r.op for r in ts], stop_at_ts=checkpoints_other)
        debug_print("Found %s ops to copy within %s, seed %s, stop_at %s",
                    len(ops_to_copy), fwd_ops, [r.op for r in ts],
                    checkpoints_other)
        debug_print("ops_to_copy = %s", ops_to_copy)
        if not ops_to_copy: # we're done!
            break
        copied_sgv, info = ge.copy_with_input_replacements(ge.sgv(ops_to_copy), {})
        for origin_op, op in info._transformed_ops.items():
            op._set_device(origin_op.node_def.device)
        copied_ops = info._transformed_ops.values()
        debug_print("Copied %s to %s", ops_to_copy, copied_ops)
        ge.reroute_ts(checkpoints_disconnected_other, checkpoints_other, can_modify=copied_ops)
        debug_print("Rewired %s in place of %s restricted to %s",
                    checkpoints_disconnected_other, checkpoints_other, copied_ops)

        # gradient flowing through the checkpointed node
        boundary = [info._transformed_ops[r.op]._outputs[0] for r in ts]
        substitute_backprops = [d_checkpoints[r] for r in ts]
        dv = tf_gradients(boundary,
                          checkpoints_disconnected_other+xs,
                          grad_ys=substitute_backprops, **kwargs)
        debug_print("Got gradients %s", dv)
        debug_print("for %s", boundary)
        debug_print("with respect to %s", checkpoints_disconnected_other+xs)
        debug_print("with boundary backprop substitutions %s", substitute_backprops)

        inputs_to_do_before = [d_checkpoints[r].op for r in ts]
        wait_to_do_ops = list(copied_ops) + [g.op for g in dv if g is not None]
        my_add_control_inputs(wait_to_do_ops, inputs_to_do_before)

        # partial derivatives to the checkpointed nodes
        for r, dr in zip(checkpoints_other, dv[:len(checkpoints_other)]):
            if dr is not None:
                if d_checkpoints[r] is None:
                    d_checkpoints[r] = dr
                else:
                    d_checkpoints[r] += dr
        def _unsparsify(x):
            if not isinstance(x, tf.IndexedSlices):
                return x
            assert x.dense_shape is not None, "memory_saving_gradients encountered sparse gradients of unknown shape"
            indices = x.indices
            while indices.shape.ndims < x.values.shape.ndims:
                indices = tf.expand_dims(indices, -1)
            return tf.scatter_nd(indices, x.values, x.dense_shape)

        # partial derivatives to xs (usually the params of the neural net)
        d_xs_new = dv[len(checkpoints_other):]
        for j in range(len(xs)):
            if d_xs_new[j] is not None:
                if d_xs[j] is None:
                    d_xs[j] = _unsparsify(d_xs_new[j])
                else:
                    d_xs[j] += _unsparsify(d_xs_new[j])


    return d_xs

def tf_toposort(ts, within_ops=None):
    all_ops = ge.get_forward_walk_ops([x.op for x in ts], within_ops=within_ops)

    deps = {}
    for op in all_ops:
        for o in op.outputs:
            deps[o] = set(op.inputs)
    sorted_ts = toposort(deps)

    # only keep the tensors from our original list
    ts_sorted_lists = []
    for l in sorted_ts:
        keep = list(set(l).intersection(ts))
        if keep:
            ts_sorted_lists.append(keep)

    return ts_sorted_lists

def fast_backward_ops(within_ops, seed_ops, stop_at_ts):
    bwd_ops = set(ge.get_backward_walk_ops(seed_ops, stop_at_ts=stop_at_ts))
    ops = bwd_ops.intersection(within_ops).difference([t.op for t in stop_at_ts])
    return list(ops)

@contextlib.contextmanager
def capture_ops():
  """Decorator to capture ops created in the block.
  with capture_ops() as ops:
    # create some ops
  print(ops) # => prints ops created.
  """

  micros = int(time.time()*10**6)
  scope_name = str(micros)
  op_list = []
  with tf.name_scope(scope_name):
    yield op_list

  g = tf.get_default_graph()
  op_list.extend(ge.select_ops(scope_name+"/.*", graph=g))

def _to_op(tensor_or_op):
  if hasattr(tensor_or_op, "op"):
    return tensor_or_op.op
  return tensor_or_op

def _to_ops(iterable):
  if not _is_iterable(iterable):
    return iterable
  return [_to_op(i) for i in iterable]

def _is_iterable(o):
  try:
    _ = iter(o)
  except Exception:
    return False
  return True

DEBUG_LOGGING=False
def debug_print(s, *args):
  """Like logger.log, but also replaces all TensorFlow ops/tensors with their
  names. Sensitive to value of DEBUG_LOGGING, see enable_debug/disable_debug
  Usage:
    debug_print("see tensors %s for %s", tensorlist, [1,2,3])
  """

  if DEBUG_LOGGING:
    formatted_args = [format_ops(arg) for arg in args]
    print("DEBUG "+s % tuple(formatted_args))

def format_ops(ops, sort_outputs=True):
  """Helper method for printing ops. Converts Tensor/Operation op to op.name,
  rest to str(op)."""

  if hasattr(ops, '__iter__') and not isinstance(ops, str):
    l = [(op.name if hasattr(op, "name") else str(op)) for op in ops]
    if sort_outputs:
      return sorted(l)
    return l
  else:
    return ops.name if hasattr(ops, "name") else str(ops)

def my_add_control_inputs(wait_to_do_ops, inputs_to_do_before):
    for op in wait_to_do_ops:
        ci = [i for i in inputs_to_do_before if op.control_inputs is None or i not in op.control_inputs]
        ge.add_control_inputs(op, ci)

ModuleNotFoundError: ignored