Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Binary parser] only convert timestamp to string when we persist metrics #382

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 0 additions & 24 deletions external_parser/event_processors/joined_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,29 +359,5 @@ struct joined_event {
typed_data->calc_and_set_cost(examples, default_reward, reward_function,
interaction_metadata, outcome_events);
}

void calculate_metrics(dsjson_metrics *metrics) {
if (!metrics || !is_joined_event_learnable()) {
return;
}

if (metrics->FirstEventId.empty()) {
metrics->FirstEventId = interaction_metadata.event_id;
} else {
metrics->LastEventId = interaction_metadata.event_id;
}

// TODO does this potentially need to check and set client time utc if
// that option is on?
if (metrics->FirstEventTime.empty()) {
metrics->FirstEventTime = date::format(
"%FT%TZ",
date::floor<std::chrono::microseconds>(joined_event_timestamp));
} else {
metrics->LastEventTime = date::format(
"%FT%TZ",
date::floor<std::chrono::microseconds>(joined_event_timestamp));
}
}
};
} // namespace joined_event
51 changes: 45 additions & 6 deletions external_parser/joiners/example_joiner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,25 @@ bool example_joiner::process_joined(v_array<example *> &examples) {
auto clear_event_id_on_exit = VW::scope_exit([&] {
if (je) {
if (_vw->example_parser->metrics) {
je->calculate_metrics(_vw->example_parser->metrics.get());
if (!je->is_joined_event_learnable()) {
_joiner_metrics.number_of_skipped_events++;
} else {
// TODO does this potentially need to check and set client time utc if
// that option is on?
if (_joiner_metrics.first_event_id.empty()) {
_joiner_metrics.first_event_id =
std::move(je->interaction_metadata.event_id);
_joiner_metrics.first_event_timestamp =
std::move(je->joined_event_timestamp);
} else {
_joiner_metrics.last_event_id =
std::move(je->interaction_metadata.event_id);
_joiner_metrics.last_event_timestamp =
std::move(je->joined_event_timestamp);
}
}
}

if (_binary_to_json &&
je->interaction_metadata.payload_type == v2::PayloadType_CB) {
log_converter::build_cb_json(_outfile, *je);
Expand Down Expand Up @@ -471,10 +488,6 @@ bool example_joiner::process_joined(v_array<example *> &examples) {
_reward_calculation.value());

if (!je->is_joined_event_learnable()) {
if (_vw->example_parser->metrics) { //TODO: Check if this is valid for ccb
_vw->example_parser->metrics->NumberOfSkippedEvents++;
}

_current_je_is_skip_learn = true;
clear_examples = true;
return false;
Expand All @@ -495,8 +508,34 @@ bool example_joiner::process_joined(v_array<example *> &examples) {
return true;
}

void example_joiner::persist_metrics() {
if (_vw->example_parser->metrics) {
_vw->example_parser->metrics->NumberOfSkippedEvents =
_joiner_metrics.number_of_skipped_events;

if (!_joiner_metrics.first_event_id.empty()) {
_vw->example_parser->metrics->FirstEventId =
std::move(_joiner_metrics.first_event_id);

_vw->example_parser->metrics->FirstEventTime = std::move(
date::format("%FT%TZ", date::floor<std::chrono::microseconds>(
_joiner_metrics.first_event_timestamp)));
}
if (!_joiner_metrics.last_event_id.empty()) {
_vw->example_parser->metrics->LastEventId =
std::move(_joiner_metrics.last_event_id);

_vw->example_parser->metrics->LastEventTime = std::move(
date::format("%FT%TZ", date::floor<std::chrono::microseconds>(
_joiner_metrics.last_event_timestamp)));
}
}
}

bool example_joiner::processing_batch() { return !_batch_event_order.empty(); }
bool example_joiner::current_event_is_skip_learn() {return _current_je_is_skip_learn;}
bool example_joiner::current_event_is_skip_learn() {
return _current_je_is_skip_learn;
}
void example_joiner::on_new_batch() {}
void example_joiner::on_batch_read() {}

Expand Down
2 changes: 2 additions & 0 deletions external_parser/joiners/example_joiner.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class example_joiner : public i_joiner {

metrics::joiner_metrics get_metrics() override;

void persist_metrics() override;

private:
bool process_dedup(const v2::Event &event, const v2::Metadata &metadata);

Expand Down
2 changes: 2 additions & 0 deletions external_parser/joiners/i_joiner.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,7 @@ class i_joiner {

virtual void on_batch_read() = 0;

virtual void persist_metrics() {}

virtual metrics::joiner_metrics get_metrics() = 0;
};
7 changes: 7 additions & 0 deletions external_parser/metrics/metrics.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
#pragma once

#include "event_processors/timestamp_helper.h"

namespace metrics {
struct joiner_metrics {
size_t number_of_skipped_events = 0;
TimePoint last_event_timestamp = TimePoint();
TimePoint first_event_timestamp = TimePoint();
std::string first_event_id = "";
std::string last_event_id = "";
};
} // namespace metrics
4 changes: 1 addition & 3 deletions external_parser/parse_example_binary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,7 @@ bool binary_parser::advance_to_next_payload_type(io_buf *input,

void binary_parser::persist_metrics(
std::vector<std::pair<std::string, size_t>> &) {
// metrics::joiner_metrics joiner_metrics = _example_joiner->get_metrics();
// this isn't currently needed but leaving it here since we might want to
// persist more metrics in the future
_example_joiner->persist_metrics();
}

bool binary_parser::parse_examples(vw *all, v_array<example *> &examples) {
Expand Down