Skip to content

Commit

Permalink
small efficiency improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
david-cortes committed Nov 11, 2018
1 parent 016278a commit 69be92b
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 26 deletions.
64 changes: 50 additions & 14 deletions hpfrec/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ class HPF:
Whether the model has been fit to some data.
niter : int
Number of iterations for which the fitting procedure was run.
train_llk : int
Final training likelihood calculated when the model was fit (only when passing 'verbose=True').
References
----------
Expand Down Expand Up @@ -337,6 +339,7 @@ def __init__(self, k=30, a=0.3, a_prime=0.3, b_prime=1.0,
self.item_dict_ = None
self.is_fitted = False
self.niter = None
self.train_llk = None

def fit(self, counts_df, val_set=None):
"""
Expand Down Expand Up @@ -364,10 +367,11 @@ def fit(self, counts_df, val_set=None):
Parameters
----------
counts_df : pandas data frame (nobs, 3)
counts_df : pandas data frame (nobs, 3) or coo_matrix
Input data with one row per non-zero observation, consisting of triplets ('UserId', 'ItemId', 'Count').
Must containin columns 'UserId', 'ItemId', and 'Count'.
Combinations of users and items not present are implicitly assumed to be zero by the model.
Can also pass a sparse coo_matrix, in which case 'reindex' will be forced to 'False'.
val_set : pandas data frame (nobs, 3)
Validation set on which to monitor log-likelihood. Same format as counts_df.
Expand Down Expand Up @@ -412,6 +416,8 @@ def fit(self, counts_df, val_set=None):
return self

def _process_data(self, input_df):
calc_n = True

if isinstance(input_df, np.ndarray):
assert len(input_df.shape) > 1
assert input_df.shape[1] >= 3
Expand All @@ -424,8 +430,18 @@ def _process_data(self, input_df):
assert 'ItemId' in input_df.columns.values
assert 'Count' in input_df.columns.values
self.input_df = input_df[['UserId', 'ItemId', 'Count']]
elif input_df.__class__.__name__ == 'coo_matrix':
self.nusers = input_df.shape[0]
self.nitems = input_df.shape[1]
input_df = pd.DataFrame({
'UserId' : input_df.row,
'ItemId' : input_df.col,
'Count' : input_df.data
})
self.reindex = False
calc_n = False
else:
raise ValueError("'input_df' must be a pandas data frame or a numpy array")
raise ValueError("'input_df' must be a pandas data frame, numpy array, or scipy sparse coo_matrix.")

if self.stop_crit in ['maxiter', 'diff-norm']:
thr = 0
Expand Down Expand Up @@ -453,8 +469,9 @@ def _process_data(self, input_df):
pd.Series(self.user_mapping_).to_csv(os.path.join(self.save_folder, 'users.csv'), index=False)
pd.Series(self.item_mapping_).to_csv(os.path.join(self.save_folder, 'items.csv'), index=False)
else:
self.nusers = self.input_df.UserId.max() + 1
self.nitems = self.input_df.ItemId.max() + 1
if calc_n:
self.nusers = self.input_df.UserId.max() + 1
self.nitems = self.input_df.ItemId.max() + 1

if self.save_folder is not None:
with open(os.path.join(self.save_folder, "hyperparameters.txt"), "w") as pf:
Expand All @@ -470,9 +487,12 @@ def _process_data(self, input_df):
else:
pf.write("random seed: None\n")

self.input_df['Count'] = self.input_df.Count.astype('float32')
self.input_df['UserId'] = self.input_df.UserId.astype(ctypes.c_size_t)
self.input_df['ItemId'] = self.input_df.ItemId.astype(ctypes.c_size_t)
if self.input_df['Count'].dtype != ctypes.c_float:
self.input_df['Count'] = self.input_df.Count.astype('float32')
if self.input_df['UserId'].dtype != ctypes.c_size_t:
self.input_df['UserId'] = self.input_df.UserId.astype(ctypes.c_size_t)
if self.input_df['ItemId'].dtype != ctypes.c_size_t:
self.input_df['ItemId'] = self.input_df.ItemId.astype(ctypes.c_size_t)

if self.users_per_batch != 0:
if self.nusers < self.users_per_batch:
Expand All @@ -496,8 +516,16 @@ def _process_valset(self, val_set, valset=True):
assert 'ItemId' in val_set.columns.values
assert 'Count' in val_set.columns.values
self.val_set = val_set[['UserId', 'ItemId', 'Count']]
elif val_set.__class__.__name__ == 'coo_matrix':
assert val_set.shape[0] <= self.nusers
assert val_set.shape[1] <= self.nitems
val_set = pd.DataFrame({
'UserId' : val_set.row,
'ItemId' : val_set.col,
'Count' : val_set.data
})
else:
raise ValueError("'val_set' must be a pandas data frame or a numpy array")
raise ValueError("'val_set' must be a pandas data frame, numpy array, or sparse coo_matrix.")

if self.stop_crit == 'val-llk':
thr = 0
Expand Down Expand Up @@ -526,9 +554,13 @@ def _process_valset(self, val_set, valset=True):
"in common with the training set.")
else:
self.val_set.reset_index(drop=True, inplace=True)
self.val_set['Count'] = self.val_set.Count.astype('float32')
self.val_set['UserId'] = self.val_set.UserId.astype(ctypes.c_size_t)
self.val_set['ItemId'] = self.val_set.ItemId.astype(ctypes.c_size_t)

if self.val_set['Count'].dtype != ctypes.c_float:
self.val_set['Count'] = self.val_set.Count.astype('float32')
if self.val_set['UserId'].dtype != ctypes.c_size_t:
self.val_set['UserId'] = self.val_set.UserId.astype(ctypes.c_size_t)
if self.val_set['ItemId'].dtype != ctypes.c_size_t:
self.val_set['ItemId'] = self.val_set.ItemId.astype(ctypes.c_size_t)
return None

def _store_metadata(self, for_partial_fit=False):
Expand Down Expand Up @@ -585,7 +617,7 @@ def _fit(self):
if self.users_per_batch == 0:
self._st_ix_user = np.arange(1).astype(ctypes.c_size_t)

self.niter, temp = cython_loops.fit_hpf(
self.niter, temp, self.train_llk = cython_loops.fit_hpf(
self.a, self.a_prime, self.b_prime,
self.c, self.c_prime, self.d_prime,
self.input_df.Count.values, self.input_df.UserId.values, self.input_df.ItemId.values,
Expand Down Expand Up @@ -1214,12 +1246,16 @@ def predict(self, user, item):
else:
nan_entries = (user == -1) | (item == -1)
if nan_entries.sum() == 0:
return (self.Theta[user] * self.Beta[item]).sum(axis=1)
if user.dtype != ctypes.c_size_t:
user = user.astype(ctypes.c_size_t)
if item.dtype != ctypes.c_size_t:
item = item.astype(ctypes.c_size_t)
return cython_loops.predict_arr(self.Theta, self.Beta, user, item, self.ncores)
else:
non_na_user = user[~nan_entries]
non_na_item = item[~nan_entries]
out = np.empty(user.shape[0], dtype=self.Theta.dtype)
out[~nan_entries] = (self.Theta[non_na_user] * self.Beta[non_na_item]).sum(axis=1)
out[~nan_entries] = cython_loops.predict_arr(self.Theta, self.Beta, non_na_user.astype(ctypes.c_size_t), non_na_item.astype(ctypes.c_size_t), self.ncores)
out[nan_entries] = np.nan
return out

Expand Down
63 changes: 52 additions & 11 deletions hpfrec/cython_loops.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def assess_convergence(int i, check_every, stop_crit, last_crit, stop_thr,
llk_plus_rmse(&Theta[0,0], &Beta[0,0], &Yval[0],
&ix_u_val[0], &ix_i_val[0], nYv, k,
&errs[0], nthreads, verbose, full_llk)
errs[0] -= Theta[ix_u_val].sum(axis=0).dot(Beta[ix_i_val].sum(axis=0))
errs[0] -= sum_prediction(&Theta[0,0], &Beta[0,0], &ix_u_val[0], &ix_i_val[0], nYv, <int> k, nthreads)
errs[1] = np.sqrt(errs[1]/nYv)
else:
llk_plus_rmse(&Theta[0,0], &Beta[0,0], &Y[0],
Expand Down Expand Up @@ -116,6 +116,7 @@ def eval_after_term(stop_crit, int verbose, int nthreads, int full_llk, size_t k
&errs[0], nthreads, verbose, full_llk)
errs[0] -= Theta.sum(axis=0).dot(Beta.sum(axis=0))
errs[1] = np.sqrt(errs[1]/nY)
return errs[0]

### Random initializer for parameters
#####################################
Expand Down Expand Up @@ -247,12 +248,15 @@ def fit_hpf(float a, float a_prime, float b_prime,
&phi[0,0], &Y[0], k, sum_exp_trick,
&ix_u[0], &ix_i[0], nY, nthreads)

Gamma_rte = k_shp/k_rte + Beta.sum(axis=0, keepdims=True)
if par_sh:
pass
else:
Gamma_rte = k_shp/k_rte + Beta.sum(axis=0, keepdims=True)

### Comment: don't put this part before the update for Gamma rate
Gamma_shp[:,:] = a
Lambda_shp[:,:] = c
if par_sh>0:
if par_sh:
## this produces inconsistent results across runs, so there's a non-parallel version too
update_G_n_L_sh_par(&Gamma_shp[0,0], &Lambda_shp[0,0],
&phi[0,0], k,
Expand All @@ -265,7 +269,7 @@ def fit_hpf(float a, float a_prime, float b_prime,
Theta[:,:] = Gamma_shp/Gamma_rte

### Comment: these operations are pretty fast in numpy, so I preferred not to parallelize them.
### Moreover, compiler optimizations do a very poor job at parallelizing sums by columns.
### Moreover, compiler optimizations from .pyx files do a very poor job at parallelizing sums by columns.
Lambda_rte = t_shp/t_rte + Theta.sum(axis=0, keepdims=True)
Beta[:,:] = Lambda_shp/Lambda_rte

Expand Down Expand Up @@ -408,11 +412,12 @@ def fit_hpf(float a, float a_prime, float b_prime,
break

## last metrics once it finishes optimizing
eval_after_term(stop_crit, verbose, nthreads, full_llk, k, nY, nYv, has_valset,
Theta, Beta, errs,
Y, ix_u, ix_i,
Yval, ix_u_val, ix_i_val
)
last_llk = eval_after_term(
stop_crit, verbose, nthreads, full_llk, k, nY, nYv, has_valset,
Theta, Beta, errs,
Y, ix_u, ix_i,
Yval, ix_u_val, ix_i_val
)

cdef double end_tm = (time.time()-st_time)/60
if verbose:
Expand All @@ -428,7 +433,7 @@ def fit_hpf(float a, float a_prime, float b_prime,
temp = (Gamma_shp, Gamma_rte, Lambda_shp, Lambda_rte, k_rte, t_rte)
else:
temp = None
return i, temp
return i, temp, last_llk


### Functions for updates without a complete refit
Expand Down Expand Up @@ -543,9 +548,19 @@ def calc_llk(np.ndarray[float, ndim=1] Y, np.ndarray[size_t, ndim=1] ix_u, np.nd
&Y[0], &ix_u[0], &ix_i[0],
<size_t> Y.shape[0], k,
&o[0], nthreads, 0, full_llk)
o[0] -= Theta[ix_u].sum(axis=0).dot(Beta[ix_i].sum(axis=0))
cdef int kint = k
o[0] -= sum_prediction(&Theta[0,0], &Beta[0,0], &ix_u[0], &ix_i[0], <size_t> Y.shape[0], kint, nthreads)
return o[0]

### External prediction function
################################
def predict_arr(np.ndarray[float, ndim=2] M1, np.ndarray[float, ndim=2] M2, np.ndarray[size_t, ndim=1] ix_u, np.ndarray[size_t, ndim=1] ix_i, int nthreads):
cdef size_t n = ix_u.shape[0]
cdef int k = M1.shape[1]
cdef np.ndarray[float, ndim=1] out = np.zeros(n, dtype='float32')
predict_multiple(&out[0], &M1[0,0], &M2[0,0], &ix_u[0], &ix_i[0], n, k, nthreads)
return out

### Internal C functions
########################
@cython.boundscheck(False)
Expand Down Expand Up @@ -790,6 +805,32 @@ cdef void get_i_batch_pass2(size_t* st_ix_u, size_t* st_ix_out, size_t* out, siz
for i in range(n_uid):
out[st_out + i] = ix_i[st_ix_u[uid] + i]

@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
@cython.cdivision(True)
cdef void predict_multiple(float* out, float* M1, float* M2, size_t* ix_u, size_t* ix_i, size_t n, int k, int nthreads) nogil:

cdef int one = 1
cdef size_t kszt = k
cdef size_t i
for i in prange(n, schedule='static', num_threads=nthreads):
out[i] = sdot(&k, &M1[ix_u[i] * kszt], &one, &M2[ix_i[i] * kszt], &one)

@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
@cython.cdivision(True)
cdef long double sum_prediction(float* M1, float* M2, size_t* ix_u, size_t* ix_i, size_t n, int k, int nthreads) nogil:

cdef long double out = 0
cdef int one = 1
cdef size_t kszt = k
cdef size_t i
for i in prange(n, schedule='static', num_threads=nthreads):
out += sdot(&k, &M1[ix_u[i] * kszt], &one, &M2[ix_i[i] * kszt], &one)
return out


### Printing output
###################
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def build_extensions(self):
'scipy',
'cython'
],
version = '0.2.2.6',
version = '0.2.2.7',
description = 'Hierarchical Poisson matrix factorization for recommender systems',
author = 'David Cortes',
author_email = 'david.cortes.rivera@gmail.com',
Expand Down

0 comments on commit 69be92b

Please sign in to comment.