Skip to content
This repository has been archived by the owner on Dec 21, 2023. It is now read-only.

12 12 user exclude indexing #1262

Merged
merged 5 commits into from Dec 19, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
260 changes: 86 additions & 174 deletions src/unity/toolkits/evaluation/metrics.cpp
Expand Up @@ -5,13 +5,12 @@
*/
#include <sframe/sarray.hpp>
#include <sframe/sframe.hpp>
#include <unity/lib/gl_sframe.hpp>
#include <unity/lib/gl_sarray.hpp>
#include <unity/toolkits/util/precision_recall.hpp>

// ML-Data
#include <unity/toolkits/ml_data_2/ml_data.hpp>
#include <unity/toolkits/ml_data_2/ml_data_iterators.hpp>

#include <ml_data/column_indexer.hpp>

// Evaluation
#include <unity/toolkits/util/indexed_sframe_tools.hpp>
Expand Down Expand Up @@ -123,201 +122,114 @@ variant_type _supervised_streaming_evaluator(
//////////////////////////////////////////////////////////////////////////////////////

sframe precision_recall_by_user(
const sframe& validation_data,
const sframe& recommend_output,
const sframe& _validation_data,
const sframe& _recommend_output,
const std::vector<size_t>& cutoffs) {

turi::timer timer;
timer timer;
timer.start();

std::map<std::string, flexible_type> opts
= { {"sort_by_first_two_columns", true} };

const std::string& user_column = recommend_output.column_name(0);
const std::string& item_column = recommend_output.column_name(1);

std::map<std::string, v2::ml_column_mode> col_modes =
{ {user_column, v2::ml_column_mode::CATEGORICAL},
{item_column, v2::ml_column_mode::CATEGORICAL} };

std::vector<std::pair<size_t, size_t> > recommendations;
std::shared_ptr<v2::ml_metadata> metadata;

{
// Map the recommend output first
v2::ml_data md_rec(opts);
md_rec.set_data(recommend_output.select_columns({user_column,item_column}),
"", {}, col_modes);

md_rec.fill();
metadata = md_rec.metadata();

// Dump this into a vector.
recommendations.resize(md_rec.num_rows());

in_parallel([&](size_t thread_idx, size_t num_threads) {

for(auto it = md_rec.get_iterator(thread_idx, num_threads); !it.done(); ++it) {
std::vector<v2::ml_data_entry> v;
it.fill_observation(v);

recommendations[it.row_index()] = {v[0].index, v[1].index};
}
});

DASSERT_TRUE(std::is_sorted(recommendations.begin(), recommendations.end()));
}
gl_sframe recommend_output(_recommend_output);
gl_sframe validation_data(_validation_data);

v2::ml_data md_val(metadata);
md_val.fill(validation_data.select_columns({user_column,item_column}));

size_t num_threads = thread::cpu_count();
const auto& column_names = recommend_output.column_names();

sframe ret;
ret.open_for_write({user_column, "cutoff", "precision", "recall", "count"},
{metadata->column_type(user_column),
flex_type_enum::INTEGER,
flex_type_enum::FLOAT,
flex_type_enum::FLOAT,
flex_type_enum::INTEGER},
"", num_threads);
if (column_names != recommend_output.column_names()) {
log_and_throw("Column names in test SFrame do not match recommender SFrame.");
}

const std::string& user_column = column_names[0];
const std::string& item_column = column_names[1];

typedef decltype(ret.get_output_iterator(0)) out_iter_type;
std::vector<out_iter_type> output_iterators(num_threads);

std::vector<std::vector<flexible_type> > out_vv(num_threads);
// First, index the columns.
gl_sframe indexed_validation_data_1, indexed_recommend_output_1;

auto add_to_output = [&](
size_t thread_idx,
size_t user,
const std::vector<size_t>& pr, const std::vector<size_t>& vr) {
auto user_indexer = std::make_shared<ml_data_internal::column_indexer>(
user_column, ml_column_mode::CATEGORICAL,
_recommend_output.column_type(user_column));

// Only record ones that are non-empty in the predictions.
if(pr.empty()) {
return;
}

auto& it_out = output_iterators[thread_idx];
auto& out_v = out_vv[thread_idx];
auto item_indexer = std::make_shared<ml_data_internal::column_indexer>(
item_column, ml_column_mode::CATEGORICAL,
_recommend_output.column_type(item_column));

const std::vector<std::pair<double, double> > prv =
turi::recsys::precision_and_recall(vr, pr, cutoffs);
user_indexer->initialize();
item_indexer->initialize();

for(size_t j = 0; j < cutoffs.size(); ++j, ++it_out) {
out_v = {metadata->indexer(0)->map_index_to_value(user), cutoffs[j],
prv[j].first, prv[j].second, vr.size()};
*it_out = out_v;
}
std::function<flexible_type(const flexible_type&)> user_index_f =
[=](const flexible_type& f) -> flexible_type {
return user_indexer->map_value_to_index(thread::thread_id(), f);
};
std::function<flexible_type(const flexible_type&)> item_index_f =
[=](const flexible_type& f) -> flexible_type {
return item_indexer->map_value_to_index(thread::thread_id(), f);
};

// We need to record all the recommendations that were given that have
// no presence in
std::vector<int> recommendations_processed(metadata->index_size(0), 0);
atomic<size_t> num_users_processed = 0;

in_parallel([&](size_t thread_idx, size_t num_threads) {

// Set the output iterator
output_iterators[thread_idx] = ret.get_output_iterator(thread_idx);

// do squirrels barf nuts?
auto val_it = md_val.get_block_iterator(thread_idx, num_threads);

std::vector<v2::ml_data_entry> v;
std::vector<size_t> vr, pr;

if(val_it.done()) { return; }

val_it.fill_observation(v);
size_t user = v[0].index;

// Find the starting tracking iterator for this place
auto rec_it = std::lower_bound(
recommendations.begin(),
recommendations.end(), std::pair<size_t, size_t>{user, 0});

while(!val_it.done()) {

vr.clear();

val_it.fill_observation(v);
user = v[0].index;
vr.push_back(v[1].index);
++val_it;

for(; !val_it.done() && !val_it.is_start_of_new_block(); ++val_it) {
val_it.fill_observation(v);
DASSERT_EQ(v[0].index, user);
vr.push_back(v[1].index);
}
indexed_validation_data_1[user_column] =
validation_data[user_column].apply(user_index_f, flex_type_enum::INTEGER);
indexed_validation_data_1[item_column] =
validation_data[item_column].apply(item_index_f, flex_type_enum::INTEGER);

// Find the starting location of the recommendations for this user
while(rec_it != recommendations.end() && rec_it->first < user) {
++rec_it;
}
sframe indexed_validation_data =
indexed_validation_data_1.materialize_to_sframe();

// Now, dump them into the predictions.
pr.clear();
indexed_recommend_output_1[user_column] = recommend_output[user_column].apply(
user_index_f, flex_type_enum::INTEGER);
indexed_recommend_output_1[item_column] = recommend_output[item_column].apply(
item_index_f, flex_type_enum::INTEGER);

// Copy the recommended items into a buffer.
while(rec_it != recommendations.end() && rec_it->first == user) {
pr.push_back(rec_it->second);
++rec_it;
}
sframe indexed_recommend_output =
indexed_recommend_output_1.materialize_to_sframe();

// Record that it's been processed.
DASSERT_LT(user, recommendations_processed.size());
DASSERT_EQ(recommendations_processed[user], 0);
recommendations_processed[user] = 1;
++num_users_processed;

add_to_output(thread_idx, user, pr, vr);
}
});
user_indexer->finalize();
item_indexer->finalize();

// Have we been able to process everything? If not, add in the leftovers.
if(num_users_processed < recommendations_processed.size()) {

in_parallel([&](size_t thread_idx, size_t num_threads) {

// Block it up by threads
size_t start_idx = (thread_idx * recommendations_processed.size()) / num_threads;
size_t end_idx = ((thread_idx+1) * recommendations_processed.size()) / num_threads;

std::vector<size_t> pr;

for(size_t user = start_idx; user < end_idx; ++user) {

if(recommendations_processed[user]) {
continue;
}

// Find these recommendations in the block.
auto rec_it = std::lower_bound(
recommendations.begin(),
recommendations.end(), std::pair<size_t, size_t>{user, 0});

// The only way the above iterator would have missed it is if it's here,
// but not in the validation set.
DASSERT_EQ(rec_it->first, user);

// Copy the recommended items into a buffer.
while(rec_it != recommendations.end() && rec_it->first == user) {
pr.push_back(rec_it->second);
++rec_it;
}

add_to_output(thread_idx, user, pr, {});
size_t num_users = user_indexer->indexed_column_size();

// should preserve the order
indexed_column_groupby pred_ranks(
indexed_recommend_output.select_column(USER_COLUMN_INDEX),
indexed_recommend_output.select_column(ITEM_COLUMN_INDEX), false, false);

indexed_column_groupby val_ranks(
indexed_validation_data.select_column(USER_COLUMN_INDEX),
indexed_validation_data.select_column(ITEM_COLUMN_INDEX), false, false);

sframe ret;
ret.open_for_write(
{user_column, "cutoff", "precision", "recall", "count"},
{user_indexer->column_type(), flex_type_enum::INTEGER,
flex_type_enum::FLOAT, flex_type_enum::FLOAT, flex_type_enum::INTEGER});

size_t num_segments = ret.num_segments();


parallel_for(0, num_segments, [&](size_t sidx) {

size_t start_idx = (sidx * num_users) / num_segments;
size_t end_idx = ((sidx + 1) * num_users) / num_segments;

auto it_out = ret.get_output_iterator(sidx);
std::vector<flexible_type> out_v;

for (size_t i = start_idx; i < end_idx; ++i) {
const std::vector<size_t>& vr = val_ranks.dest_group(i);
const std::vector<size_t>& pr = pred_ranks.dest_group(i);

const std::vector<std::pair<double, double>> prv =
recsys::precision_and_recall(vr, pr, cutoffs);

for (size_t j = 0; j < cutoffs.size(); ++j, ++it_out) {
out_v = {user_indexer->map_index_to_value(i), cutoffs[j], prv[j].first,
prv[j].second, vr.size()};
*it_out = out_v;
}
});
}
}
});

ret.close();

return ret;
return ret;
}


}
}