Skip to content

Commit

Permalink
only convert timestamp to string before exiting (#382)
Browse files Browse the repository at this point in the history
  • Loading branch information
olgavrou committed Aug 3, 2021
1 parent 1946cd7 commit bfc0099
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 33 deletions.
24 changes: 0 additions & 24 deletions external_parser/event_processors/joined_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,29 +359,5 @@ struct joined_event {
typed_data->calc_and_set_cost(examples, default_reward, reward_function,
interaction_metadata, outcome_events);
}

void calculate_metrics(dsjson_metrics *metrics) {
if (!metrics || !is_joined_event_learnable()) {
return;
}

if (metrics->FirstEventId.empty()) {
metrics->FirstEventId = interaction_metadata.event_id;
} else {
metrics->LastEventId = interaction_metadata.event_id;
}

// TODO does this potentially need to check and set client time utc if
// that option is on?
if (metrics->FirstEventTime.empty()) {
metrics->FirstEventTime = date::format(
"%FT%TZ",
date::floor<std::chrono::microseconds>(joined_event_timestamp));
} else {
metrics->LastEventTime = date::format(
"%FT%TZ",
date::floor<std::chrono::microseconds>(joined_event_timestamp));
}
}
};
} // namespace joined_event
51 changes: 45 additions & 6 deletions external_parser/joiners/example_joiner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,25 @@ bool example_joiner::process_joined(v_array<example *> &examples) {
auto clear_event_id_on_exit = VW::scope_exit([&] {
if (je) {
if (_vw->example_parser->metrics) {
je->calculate_metrics(_vw->example_parser->metrics.get());
if (!je->is_joined_event_learnable()) {
_joiner_metrics.number_of_skipped_events++;
} else {
// TODO does this potentially need to check and set client time utc if
// that option is on?
if (_joiner_metrics.first_event_id.empty()) {
_joiner_metrics.first_event_id =
std::move(je->interaction_metadata.event_id);
_joiner_metrics.first_event_timestamp =
std::move(je->joined_event_timestamp);
} else {
_joiner_metrics.last_event_id =
std::move(je->interaction_metadata.event_id);
_joiner_metrics.last_event_timestamp =
std::move(je->joined_event_timestamp);
}
}
}

if (_binary_to_json &&
je->interaction_metadata.payload_type == v2::PayloadType_CB) {
log_converter::build_cb_json(_outfile, *je);
Expand Down Expand Up @@ -471,10 +488,6 @@ bool example_joiner::process_joined(v_array<example *> &examples) {
_reward_calculation.value());

if (!je->is_joined_event_learnable()) {
if (_vw->example_parser->metrics) { //TODO: Check if this is valid for ccb
_vw->example_parser->metrics->NumberOfSkippedEvents++;
}

_current_je_is_skip_learn = true;
clear_examples = true;
return false;
Expand All @@ -495,8 +508,34 @@ bool example_joiner::process_joined(v_array<example *> &examples) {
return true;
}

void example_joiner::persist_metrics() {
if (_vw->example_parser->metrics) {
_vw->example_parser->metrics->NumberOfSkippedEvents =
_joiner_metrics.number_of_skipped_events;

if (!_joiner_metrics.first_event_id.empty()) {
_vw->example_parser->metrics->FirstEventId =
std::move(_joiner_metrics.first_event_id);

_vw->example_parser->metrics->FirstEventTime = std::move(
date::format("%FT%TZ", date::floor<std::chrono::microseconds>(
_joiner_metrics.first_event_timestamp)));
}
if (!_joiner_metrics.last_event_id.empty()) {
_vw->example_parser->metrics->LastEventId =
std::move(_joiner_metrics.last_event_id);

_vw->example_parser->metrics->LastEventTime = std::move(
date::format("%FT%TZ", date::floor<std::chrono::microseconds>(
_joiner_metrics.last_event_timestamp)));
}
}
}

bool example_joiner::processing_batch() { return !_batch_event_order.empty(); }
bool example_joiner::current_event_is_skip_learn() {return _current_je_is_skip_learn;}
bool example_joiner::current_event_is_skip_learn() {
return _current_je_is_skip_learn;
}
void example_joiner::on_new_batch() {}
void example_joiner::on_batch_read() {}

Expand Down
2 changes: 2 additions & 0 deletions external_parser/joiners/example_joiner.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class example_joiner : public i_joiner {

metrics::joiner_metrics get_metrics() override;

void persist_metrics() override;

private:
bool process_dedup(const v2::Event &event, const v2::Metadata &metadata);

Expand Down
2 changes: 2 additions & 0 deletions external_parser/joiners/i_joiner.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,7 @@ class i_joiner {

virtual void on_batch_read() = 0;

virtual void persist_metrics() {}

virtual metrics::joiner_metrics get_metrics() = 0;
};
7 changes: 7 additions & 0 deletions external_parser/metrics/metrics.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
#pragma once

#include "event_processors/timestamp_helper.h"

namespace metrics {
struct joiner_metrics {
size_t number_of_skipped_events = 0;
TimePoint last_event_timestamp = TimePoint();
TimePoint first_event_timestamp = TimePoint();
std::string first_event_id = "";
std::string last_event_id = "";
};
} // namespace metrics
4 changes: 1 addition & 3 deletions external_parser/parse_example_binary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,7 @@ bool binary_parser::advance_to_next_payload_type(io_buf *input,

void binary_parser::persist_metrics(
std::vector<std::pair<std::string, size_t>> &) {
// metrics::joiner_metrics joiner_metrics = _example_joiner->get_metrics();
// this isn't currently needed but leaving it here since we might want to
// persist more metrics in the future
_example_joiner->persist_metrics();
}

bool binary_parser::parse_examples(vw *all, v_array<example *> &examples) {
Expand Down

0 comments on commit bfc0099

Please sign in to comment.