Skip to content

Commit

Permalink
sstables/twcs: add support to millisecond timestamp resolution
Browse files Browse the repository at this point in the history
That's blocking KairosDB users because it uses TWCS with millisecond
timestamp resolution.

Also older drivers use millisecond instead of the default microsecond.

Fixes scylladb#3152.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20180411171244.19958-1-raphaelsc@scylladb.com>
(cherry picked from commit 0c72781)
Signed-off-by: Glauber Costa <glauber@scylladb.com>
  • Loading branch information
raphaelsc authored and Glauber Costa committed Jul 9, 2018
1 parent 240b9f1 commit fd89822
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 9 deletions.
41 changes: 32 additions & 9 deletions sstables/time_window_compaction_strategy.hh
Expand Up @@ -56,7 +56,7 @@ extern logging::logger clogger;
using namespace std::chrono_literals;

class time_window_compaction_strategy_options {
private:
public:
static constexpr std::chrono::seconds DEFAULT_COMPACTION_WINDOW_UNIT(int window_size) { return window_size * 86400s; }
static constexpr int DEFAULT_COMPACTION_WINDOW_SIZE = 1;
static constexpr std::chrono::seconds DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS() { return 600s; }
Expand All @@ -65,14 +65,21 @@ private:
static constexpr auto COMPACTION_WINDOW_UNIT_KEY = "compaction_window_unit";
static constexpr auto COMPACTION_WINDOW_SIZE_KEY = "compaction_window_size";
static constexpr auto EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY = "expired_sstable_check_frequency_seconds";

private:
const std::unordered_map<sstring, std::chrono::seconds> valid_window_units = { { "MINUTES", 60s }, { "HOURS", 3600s }, { "DAYS", 86400s } };
// TODO: add support to timestamp resolution other than microseconds, but it's not that important
// because new clients only use this one.
const std::unordered_set<sstring> valid_timestamp_resolutions = { "MICROSECONDS" };

enum class timestamp_resolutions {
microsecond,
millisecond,
};
const std::unordered_map<sstring, timestamp_resolutions> valid_timestamp_resolutions = {
{ "MICROSECONDS", timestamp_resolutions::microsecond },
{ "MILLISECONDS", timestamp_resolutions::millisecond },
};

std::chrono::seconds sstable_window_size = DEFAULT_COMPACTION_WINDOW_UNIT(DEFAULT_COMPACTION_WINDOW_SIZE);
db_clock::duration expired_sstable_check_frequency = DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS();
timestamp_resolutions timestamp_resolution = timestamp_resolutions::microsecond;
public:
time_window_compaction_strategy_options(const std::map<sstring, sstring>& options) {
std::chrono::seconds window_unit;
Expand Down Expand Up @@ -107,9 +114,13 @@ public:
it = options.find(TIMESTAMP_RESOLUTION_KEY);
if (it != options.end() && !valid_timestamp_resolutions.count(it->second)) {
throw exceptions::syntax_exception(sstring("Invalid timestamp resolution ") + it->second + "for " + TIMESTAMP_RESOLUTION_KEY);
} else {
timestamp_resolution = valid_timestamp_resolutions.at(it->second);
}
}

std::chrono::seconds get_sstable_window_size() const { return sstable_window_size; }

friend class time_window_compaction_strategy;
};

Expand Down Expand Up @@ -154,6 +165,18 @@ public:
return compaction_descriptor(std::move(compaction_candidates));
}
private:
static timestamp_type
to_timestamp_type(time_window_compaction_strategy_options::timestamp_resolutions resolution, int64_t timestamp_from_sstable) {
switch (resolution) {
case time_window_compaction_strategy_options::timestamp_resolutions::microsecond:
return timestamp_type(timestamp_from_sstable);
case time_window_compaction_strategy_options::timestamp_resolutions::millisecond:
return std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::milliseconds(timestamp_from_sstable)).count();
default:
throw std::runtime_error("Timestamp resolution invalid for TWCS");
};
}

std::vector<shared_sstable>
get_next_non_expired_sstables(column_family& cf, std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point gc_before) {
auto most_interesting = get_compaction_candidates(cf, non_expiring_sstables);
Expand All @@ -178,7 +201,7 @@ private:
}

std::vector<shared_sstable> get_compaction_candidates(column_family& cf, std::vector<shared_sstable> candidate_sstables) {
auto p = get_buckets(std::move(candidate_sstables), _options.sstable_window_size);
auto p = get_buckets(std::move(candidate_sstables), _options);
// Update the highest window seen, if necessary
_highest_window_seen = std::max(_highest_window_seen, p.second);

Expand All @@ -204,16 +227,16 @@ public:
// @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader),
// and the right is the highest timestamp seen
static std::pair<std::map<timestamp_type, std::vector<shared_sstable>>, timestamp_type>
get_buckets(std::vector<shared_sstable> files, std::chrono::seconds sstable_window_size) {
get_buckets(std::vector<shared_sstable> files, time_window_compaction_strategy_options& options) {
std::map<timestamp_type, std::vector<shared_sstable>> buckets;

timestamp_type max_timestamp = 0;
// Create map to represent buckets
// For each sstable, add sstable to the time bucket
// Where the bucket is the file's max timestamp rounded to the nearest window bucket
for (auto&& f : files) {
timestamp_type ts = f->get_stats_metadata().max_timestamp;
timestamp_type lower_bound = get_window_lower_bound(sstable_window_size, ts);
timestamp_type ts = to_timestamp_type(options.timestamp_resolution, f->get_stats_metadata().max_timestamp);
timestamp_type lower_bound = get_window_lower_bound(options.sstable_window_size, ts);
buckets[lower_bound].push_back(std::move(f));
max_timestamp = std::max(max_timestamp, lower_bound);
}
Expand Down
37 changes: 37 additions & 0 deletions tests/sstable_datafile_test.cc
Expand Up @@ -3178,6 +3178,43 @@ SEASTAR_TEST_CASE(time_window_strategy_time_window_tests) {
return make_ready_future<>();
}

SEASTAR_TEST_CASE(time_window_strategy_ts_resolution_check) {
auto ts = 1451001601000L; // 2015-12-25 @ 00:00:01, in milliseconds
auto ts_in_ms = std::chrono::milliseconds(ts);
auto ts_in_us = std::chrono::duration_cast<std::chrono::microseconds>(ts_in_ms);

auto s = schema_builder("tests", "time_window_strategy")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type).build();

{
std::map<sstring, sstring> opts = { { time_window_compaction_strategy_options::TIMESTAMP_RESOLUTION_KEY, "MILLISECONDS" }, };
time_window_compaction_strategy_options options(opts);

auto sst = make_sstable(s, "", 1, la, big);
sstables::test(sst).set_values("key1", "key1", build_stats(ts_in_ms.count(), ts_in_ms.count(), std::numeric_limits<int32_t>::max()));

auto ret = time_window_compaction_strategy::get_buckets({ sst }, options);
auto expected = time_window_compaction_strategy::get_window_lower_bound(options.get_sstable_window_size(), ts_in_us.count());

BOOST_REQUIRE(ret.second == expected);
}

{
std::map<sstring, sstring> opts = { { time_window_compaction_strategy_options::TIMESTAMP_RESOLUTION_KEY, "MICROSECONDS" }, };
time_window_compaction_strategy_options options(opts);

auto sst = make_sstable(s, "", 1, la, big);
sstables::test(sst).set_values("key1", "key1", build_stats(ts_in_us.count(), ts_in_us.count(), std::numeric_limits<int32_t>::max()));

auto ret = time_window_compaction_strategy::get_buckets({ sst }, options);
auto expected = time_window_compaction_strategy::get_window_lower_bound(options.get_sstable_window_size(), ts_in_us.count());

BOOST_REQUIRE(ret.second == expected);
}
return make_ready_future<>();
}

SEASTAR_TEST_CASE(time_window_strategy_correctness_test) {
using namespace std::chrono;

Expand Down

0 comments on commit fd89822

Please sign in to comment.