Skip to content

Commit

Permalink
fitting with stochastic variational inference
Browse files Browse the repository at this point in the history
  • Loading branch information
David committed Aug 2, 2018
1 parent 82eb73e commit 5f41b5f
Show file tree
Hide file tree
Showing 2 changed files with 393 additions and 80 deletions.
246 changes: 227 additions & 19 deletions hpfrec/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class HPF:
Model for recommending items based on probabilistic Poisson factorization
on sparse count data (e.g. number of times a user played different songs),
using variational inference with coordinate-ascent.
using either variational inference with coordinate-ascent, or stochastic variational inference.
Can use different stopping criteria for the opimization procedure:
Expand All @@ -22,9 +22,10 @@ class HPF:
default threshold (see Note).
4) Check the the difference in the user-factor matrix after every N iterations (stop_crit='diff-norm', check_every)
and stop once the *l2-norm* of this difference is below a certain threshold (stop_thr).
Note that this is *not a percent* difference as it is for log-likelihood criteria, so you should put a larger
Note that this is **not a percent** difference as it is for log-likelihood criteria, so you should put a larger
value than the default here.
This is a much faster criterion to calculate and is recommended for larger datasets.
This is a much faster criterion to calculate and is recommended for larger datasets, but it is
**NOT recommended** when using stochastic variational inference.
If passing reindex=True, it will internally reindex all user and item IDs. Your data will not require
reindexing if the IDs for users and items in counts_df meet the following criteria:
Expand Down Expand Up @@ -90,18 +91,30 @@ class HPF:
Calculate log-likelihood every N iterations.
stop_thr : float
Threshold for proportion increase in log-likelihood or l2-norm for difference between matrices.
users_per_batch : None or int
Number of users to take for each batch update in stochastic variational inference. If passing None, will
perform full-batch variational inference, which leads to better results but on larger datasets takes longer
to converge.
step_size : function -> float in (0, 1)
Function that takes the iteration/epoch number as input (starting at zero) and produces the step size
for the global parameters as output (only used when fitting with stochastic variational inference).
The step size must be a number between zero and one, and should be decresing with bigger iteration numbers.
Ignored when passing users_per_batch=None.
maxiter : int
Maximum number of iterations for which to run the optimization procedure. Recommended to use a lower
number when passing a batch size.
reindex : bool
Whether to reindex data internally.
verbose : bool
Whether to print convergence messages.
random_seed : int or None
Random seed to use when starting the parameters.
allow_inconsistent_math : bool
Whether to allow inconsistent floating-point math (producing slightly different results on each run)
which would allow parallelization of the updates for the shape parameters of Lambda and Gamma.
verbose : bool
Whether to print convergence messages.
full_llk : bool
Whether to calculate the full lok-likehood, including terms that don't depend on the model parameters
(thus are constant for a given dataset).
keep_data : bool
Whether to keep information about which user was associated with each item
in the training set, so as to exclude those items later when making Top-N
Expand All @@ -115,7 +128,7 @@ class HPF:
number of users and items).
keep_all_objs : bool
Whether to keep intermediate objects/variables in the object that are not necessary for
predictions - these are: Gamma_shp, Gamma_rte, Lambda_shp, Lambda_rte, kappa_rte, tau_rte
predictions - these are: Gamma_shp, Gamma_rte, Lambda_shp, Lambda_rte, k_rte, t_rte
(when passing True here, the model object will have these extra attributes too).
Without these objects, it's not possible to call functions that alter the model parameters
given new information after it's already fit.
Expand Down Expand Up @@ -146,9 +159,11 @@ class HPF:
def __init__(self, k=30, a=0.3, a_prime=0.3, b_prime=1.0,
c=0.3, c_prime=0.3, d_prime=1.0, ncores=-1,
stop_crit='train-llk', check_every=10, stop_thr=1e-3,
maxiter=100, reindex=True, random_seed = None,
allow_inconsistent_math=False, verbose=True, full_llk=True,
keep_data=True, save_folder=None, produce_dicts=True, keep_all_objs=True):
users_per_batch=None, step_size=lambda x: 1/np.sqrt(x+1),
maxiter=100, reindex=True, verbose=True,
random_seed = None, allow_inconsistent_math=False, full_llk=True,
keep_data=True, save_folder=None, produce_dicts=True,
keep_all_objs=True):

## checking input
assert isinstance(k, int)
Expand Down Expand Up @@ -221,6 +236,21 @@ def __init__(self, k=30, a=0.3, a_prime=0.3, b_prime=1.0,
verbose = bool(verbose)
if (stop_crit == 'maxiter') and (not verbose):
check_every = 0

if users_per_batch is not None:
if not isinstance(step_size, types.FunctionType):
raise ValueError("'step_size' must be a function.")
if len(inspect.getfullargspec(step_size).args) < 1:
raise ValueError("'step_size' must be able to take the iteration number as input.")
assert (step_size(0) >= 0) and (step_size(0) <= 1)
assert (step_size(1) >= 0) and (step_size(1) <= 1)
if isinstance(users_per_batch, float):
users_per_batch = int(users_per_batch)
assert isinstance(users_per_batch, int)
assert users_per_batch > 0
else:
users_per_batch = 0
step_size = None

## storing these parameters
self.k = k
Expand All @@ -245,6 +275,8 @@ def __init__(self, k=30, a=0.3, a_prime=0.3, b_prime=1.0,
self.produce_dicts = bool(produce_dicts)
self.full_llk = bool(full_llk)
self.keep_all_objs = bool(keep_all_objs)
self.step_size = step_size
self.users_per_batch = users_per_batch

## initializing other attributes
self.Theta = None
Expand All @@ -260,7 +292,8 @@ def fit(self, counts_df, val_set=None):
"""
Fit Hierarchical Poisson Model to sparse count data
Fits a hierarchical Poisson model to count data using mean-field approximation with coordinate-ascent.
Fits a hierarchical Poisson model to count data using mean-field approximation with either
full-batch coordinate-ascent or mini-batch stochastic coordinate-ascent.
Note
----
Expand Down Expand Up @@ -362,10 +395,10 @@ def _process_data(self, input_df):
self.input_df['UserId'] = self.input_df.UserId.astype(ctypes.c_int)
self.input_df['ItemId'] = self.input_df.ItemId.astype(ctypes.c_int)

if self.batch_size is not None:
if self.nusers < self.batch_size:
if self.users_per_batch != 0:
if self.nusers < self.users_per_batch:
warnings.warn("Batch size passed is larger than number of users. Will set it to nusers/20.")
self.batch_size = ctypes.c_int(self.nusers/20)
self.users_per_batch = ctypes.c_int(self.nusers/20)

return None

Expand Down Expand Up @@ -416,8 +449,8 @@ def _store_metadata(self):
self._st_ix_user = np.r_[[0], self._st_ix_user[:self._st_ix_user.shape[0]-1]]
self.seen = self.seen.ItemId.values
return None
def _fit(self):

def _cast_before_fit():
## setting all parameters and data to the right type
self.Theta = np.empty((self.nusers, self.k), dtype='float32')
self.Beta = np.empty((self.nitems, self.k), dtype='float32')
Expand All @@ -442,6 +475,10 @@ def _fit(self):

if self.save_folder is None:
self.save_folder = ""

def _fit(self):

self._cast_before_fit()

if self.val_set is None:
use_valset = cython_loops.cast_int(0)
Expand All @@ -458,6 +495,7 @@ def _fit(self):
self.input_df.Count.values, self.input_df.UserId.values, self.input_df.ItemId.values,
self.Theta, self.Beta,
self.maxiter, self.stop_crit, self.check_every, self.stop_thr,
self.users_per_batch, self.step_size,
self.save_folder, self.random_seed, self.verbose,
self.ncores, cython_loops.cast_int(self.allow_inconsistent_math),
use_valset,
Expand All @@ -470,8 +508,8 @@ def _fit(self):
self.Gamma_rte = temp[1]
self.Lambda_shp = temp[2]
self.Lambda_rte = temp[3]
self.kappa_rte = temp[4]
self.tau_rte = temp[5]
self.k_rte = temp[4]
self.t_rte = temp[5]

def _process_data_single(self, counts_df):
assert self.is_fitted
Expand Down Expand Up @@ -505,6 +543,156 @@ def _process_data_single(self, counts_df):
counts_df['Count'] = counts_df.ItemId.values.astype(ctypes.c_float)
return counts_df

def partial_fit(self, counts_df, step_size=None, nusers=None, nitems=None,
users_in_batch=None, items_in_batch=None):
"""
Updates the model with batches of user data
Note
----
You must pass the **full user-item interactions** that are non-zero for some subset of users.
Otherwise the model will not converge to reasonable results.
Note
----
All user and items IDs must be integers starting at one, without gaps in the numeration.
Note
----
For better results, fit the model with full-batch iterations (using the 'fit' method).
Parameters
----------
counts_df : data frame (n_samples, 3)
Data frame with the user-item interactions for some subset of users. Must have columns
'UserId', 'ItemId', 'Count'.
step_size : None or float in (0, 1)
Step size with which to update the global variables in the model. Must be a number between
zero and one. If passing None, will determine it according to the step size function with which
the model was initialized and the number of iterations or calls to partial fit that have been
performed. If no valid function was passed at the initialization, it will use 1/sqrt(i+1).
nusers : int
Total number of users (not just in this batch!). Only required if calling partial_fit for the
first time on a model object that hasn't been fit.
nitems : int
Total number of items (not just in this batch!). Only required if calling partial_fit for the
first time on a model object that hasn't been fit.
users_in_batch : None or array (n_users_sample,)
Users that are present int counts_df. If passing None, will determine the unique elements in
counts_df.UserId, but passing them if you already have them will skip this step.
items_in_batch : None or array (n_items_sample,)
Items that are present int counts_df. If passing None, will determine the unique elements in
counts_df.ItemId, but passing them if you already have them will skip this step.
Returns
-------
self : obj
Copy of this object.
"""

#TODO: add functionality for adding new users and items on-the-fly
if self.reindex:
raise ValueError("'partial_fit' can only be called when using reindex=False.")
if not self.keep_all_objs:
raise ValueError("'partial_fit' can only be called when using keep_all_objs=True.")
if self.keep_data:
try:
self.seen
msg = "When using 'partial_fit', the list of items seen by each user is not updated "
msg += "with the data passed here."
warnings.warn(msg)
except:
msg = "When fitting the model through 'partial_fit' without calling 'fit' beforehand, "
msg += "'keep_data' will be forced to False."
warnings.warn(msg)
self.keep_data = False

if nusers is None:
try:
nusers = self.nusers
except:
raise ValueError("Must specify total number of users when calling 'partial_fit' for the first time.")
if nitems is None:
try:
nitems = self.nitems
except:
raise ValueError("Must specify total number of items when calling 'partial_fit' for the first time.")

if self.nusers is None:
self.nusers = nusers
if self.nitems is None:
self.nitems = nitems

if step_size is None:
try:
self.step_size(0)
try:
step_size = self.step_size(self.niter)
except:
self.niter = 0
step_size = 1.0
except:
try:
step_size = 1 / np.sqrt(self.niter + 1)
except:
self.niter = 0
step_size = 1.0
assert step_size >= 0
assert step_size <= 1

if counts_df.__class__.__name__ == "ndarray":
counts_df = pd.DataFrame(counts_df)
counts_df.columns[:3] = ['UserId', 'ItemId', 'Count']

assert counts_df.__class__.__name__ == "DataFrame"
assert 'UserId' in counts_df.columns.values
assert 'ItemId' in counts_df.columns.values
assert 'Count' in counts_df.columns.values
assert counts_df.shape[0] > 0

Y_batch = counts_df.Count.astype('float32')
ix_u_batch = counts_df.UserId.astype(ctypes.c_int)
ix_i_batch = counts_df.ItemId.astype(ctypes.c_int)

if users_in_batch is None:
users_in_batch = np.unique(ix_u_batch)
else:
users_in_batch = np.array(users_in_batch).astype(ctypes.c_int)
if items_in_batch is None:
items_in_batch = np.unique(ix_i_batch)
else:
items_in_batch = np.array(items_in_batch).astype(ctypes.c_int)

if (self.Theta is None) or (self.Beta is None):
self._cast_before_fit()
self.Gamma_shp, self.Gamma_rte, self.Lambda_shp, self.Lambda_rte, self.k_rte, self.t_rte = initialize_parameters(
self.Theta, self.Beta, self.random_seed, self.a, self.a_prime, self.b_prime, self.c, self.c_prime, self.d_prime)

k_shp = cython_loops.cast_float(self.a_prime + self.k * self.a)
t_shp = cython_loops.cast_float(self.c_prime + self.k * self.c)
add_k_rte = cython_loops.cast_float(self.a_prime / self.b_prime)
add_t_rte = cython_loops.cast_float(self.c_prime / self.d_prime)
multiplier_batch = float(nusers) / users_in_batch.shape[0]

cython_loops.partial_fit(
Y_batch,
ix_u_batch, ix_i_batch,
Theta, Beta,
Gamma_shp, Gamma_rte,
Lambda_shp, Lambda_rte,
k_rte, t_rte,
add_k_rte, add_t_rte, self.a, self.c,
k_shp, t_shp, cython_loops.cast_int(self.k),
users_this_batch, items_this_batch,
cython_loops.cast_int(self.allow_inconsistent_math),
cython_loops.cast_float(step_size), cython_loops.cast_float(multiplier_batch),
self.ncores
)

self.niter += 1
self.is_fitted = True
return self

def predict_factors(self, counts_df, maxiter=10, nthreads=1, random_seed=1, stop_thr=1e-3, return_all=False):
"""
Gets latent factors for a user given her item counts
Expand Down Expand Up @@ -544,6 +732,7 @@ def predict_factors(self, counts_df, maxiter=10, nthreads=1, random_seed=1, stop
latent_factors : array (k,)
Calculated latent factors for the user, given the input data
"""

## processing the data
counts_df = self._process_data_single(counts_df)

Expand All @@ -568,7 +757,8 @@ def predict_factors(self, counts_df, maxiter=10, nthreads=1, random_seed=1, stop
else:
return Theta

def add_user(self, user_id, counts_df, update_existing=False, maxiter=10, nthreads=1, random_seed=1, stop_thr=1e-3):
def add_user(self, user_id, counts_df, update_existing=False, update_item_factors=False,
maxiter=10, nthreads=1, random_seed=1, stop_thr=1e-3):
"""
Add a new user to the model or update parameters for a user according to new data
Expand All @@ -578,7 +768,8 @@ def add_user(self, user_id, counts_df, update_existing=False, maxiter=10, nthrea
Note
----
This function only works with one user at a time.
This function only works with one user at a time. For updating many users at the same time,
use 'partial_fit' instead.
Note
----
Expand All @@ -598,6 +789,9 @@ def add_user(self, user_id, counts_df, update_existing=False, maxiter=10, nthrea
update_existing : bool
Whether this should be an update of the parameters for an existing user (when passing True), or
an addition of a new user that was not in the model before (when passing False).
update_item_factors : bool
Whether to also update the item factors after each iteration, with a step size given by the function with which
the model object was initialized.
maxiter : int
Maximum number of iterations to run.
nthreads : int
Expand All @@ -615,6 +809,8 @@ def add_user(self, user_id, counts_df, update_existing=False, maxiter=10, nthrea
True : bool
Will return True if the process finishes successfully.
"""

#TODO: add option that updates the items parameters after each iteration
if update_existing:
## checking that the user already exists
if self.produce_dicts:
Expand Down Expand Up @@ -664,6 +860,18 @@ def add_user(self, user_id, counts_df, update_existing=False, maxiter=10, nthrea
self.Gamma_shp = np.r_[self.Gamma_shp, temp[0]]
self.Gamma_rte = np.r_[self.Gamma_rte, temp[1]]

## updating the list of seen items for this user
if self.keep_data:
if update_existing:
n_seen_by_user_before = self._n_seen_by_user[user_id]
self._n_seen_by_user[user_id] = counts_df.shape[0]
self.seen = np.r_[self.seen[:user_id], counts_df.ItemId.values, self.seen[(user_id + 1):]]
self._st_ix_user[(user_id + 1):] += self._n_seen_by_user[user_id] - n_seen_by_user_before
else:
self._n_seen_by_user = np.r_[self._n_seen_by_user, counts_df.shape[0]]
self._st_ix_user = np.r_[self._st_ix_user, self._st_ix_user.shape[0] - 1]
self.seen = np.r_[self.seen, counts_df.ItemId.values]

return True

def predict(self, user, item):
Expand Down

0 comments on commit 5f41b5f

Please sign in to comment.