Skip to content

Commit

Permalink
better naming, no forced compression
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexanderSaydakov committed Feb 13, 2024
1 parent 5f94bdd commit 9bfe6e6
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 68 deletions.
15 changes: 7 additions & 8 deletions tdigest/include/tdigest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
namespace datasketches {

// this is equivalent of K_2 (default) in the Java implementation mentioned below
// Generates cluster sizes proportional to q*(1-q).
// The use of a normalizing function results in a strictly bounded number of clusters no matter how many samples.
struct scale_function {
double k(double q, double normalizer) const {
return limit([normalizer] (double q) { return std::log(q / (1 - q)) * normalizer; }, q, 1e-15, 1 - 1e-15);
Expand Down Expand Up @@ -99,12 +101,11 @@ class tdigest {
using vector_bytes = std::vector<uint8_t, typename std::allocator_traits<Allocator>::template rebind_alloc<uint8_t>>;

struct centroid_cmp {
centroid_cmp(bool reverse): reverse_(reverse) {}
centroid_cmp() {}
bool operator()(const centroid& a, const centroid& b) const {
if (a.get_mean() < b.get_mean()) return !reverse_;
return reverse_;
if (a.get_mean() < b.get_mean()) return true;
return false;
}
bool reverse_;
};

/**
Expand Down Expand Up @@ -218,7 +219,7 @@ class tdigest {
T max_;
size_t centroids_capacity_;
vector_centroid centroids_;
uint64_t total_weight_;
uint64_t centroids_weight_;
size_t buffer_capacity_;
vector_centroid buffer_;
uint64_t buffered_weight_;
Expand All @@ -236,9 +237,7 @@ class tdigest {
// for deserialize
tdigest(bool reverse_merge, uint16_t k, T min, T max, vector_centroid&& centroids, uint64_t total_weight_, const Allocator& allocator);

void merge_new_values();
void merge_new_values(bool force, uint16_t k);
void merge_new_values(uint16_t k);
void merge_buffered();

static double weighted_average(double x1, double w1, double x2, double w2);

Expand Down
117 changes: 57 additions & 60 deletions tdigest/include/tdigest_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ tdigest(false, k, std::numeric_limits<T>::infinity(), -std::numeric_limits<T>::i
template<typename T, typename A>
void tdigest<T, A>::update(T value) {
if (std::isnan(value)) return;
if (buffer_.size() >= buffer_capacity_ - centroids_.size()) merge_new_values();
if (buffer_.size() >= buffer_capacity_ - centroids_.size()) merge_buffered();
buffer_.push_back(centroid(value, 1));
++buffered_weight_;
min_ = std::min(min_, value);
Expand All @@ -52,7 +52,7 @@ void tdigest<T, A>::merge(tdigest& other) {
std::copy(other.centroids_.begin(), other.centroids_.end(), std::back_inserter(buffer_));
buffered_weight_ += other.get_total_weight();
if (num > buffer_capacity_) {
merge_new_values(internal_k_);
merge_buffered();
} else {
min_ = std::min(min_, other.get_min_value());
max_ = std::max(max_, other.get_max_value());
Expand All @@ -61,7 +61,7 @@ void tdigest<T, A>::merge(tdigest& other) {

template<typename T, typename A>
void tdigest<T, A>::compress() {
merge_new_values(true, k_);
merge_buffered();
}

template<typename T, typename A>
Expand All @@ -83,7 +83,7 @@ T tdigest<T, A>::get_max_value() const {

template<typename T, typename A>
uint64_t tdigest<T, A>::get_total_weight() const {
return total_weight_ + buffered_weight_;
return centroids_weight_ + buffered_weight_;
}

template<typename T, typename A>
Expand All @@ -95,13 +95,13 @@ double tdigest<T, A>::get_rank(T value) const {
// one centroid and value == min_ == max_
if ((centroids_.size() + buffer_.size()) == 1) return 0.5;

const_cast<tdigest*>(this)->merge_new_values(); // side effect
const_cast<tdigest*>(this)->merge_buffered(); // side effect

// left tail
const T first_mean = centroids_.front().get_mean();
if (value < first_mean) {
if (first_mean - min_ > 0) {
if (value == min_) return 0.5 / total_weight_;
if (value == min_) return 0.5 / centroids_weight_;
return (1.0 + (value - min_) / (first_mean - min_) * (centroids_.front().get_weight() / 2.0 - 1.0)); // ?
}
return 0; // should never happen
Expand All @@ -111,15 +111,15 @@ double tdigest<T, A>::get_rank(T value) const {
const T last_mean = centroids_.back().get_mean();
if (value > last_mean) {
if (max_ - last_mean > 0) {
if (value == max_) return 1.0 - 0.5 / total_weight_;
return 1 - ((1 + (max_ - value) / (max_ - last_mean) * (centroids_.back().get_weight() / 2.0 - 1.0)) / total_weight_); // ?
if (value == max_) return 1.0 - 0.5 / centroids_weight_;
return 1 - ((1 + (max_ - value) / (max_ - last_mean) * (centroids_.back().get_weight() / 2.0 - 1.0)) / centroids_weight_); // ?
}
return 1; // should never happen
}

auto lower = std::lower_bound(centroids_.begin(), centroids_.end(), centroid(value, 1), centroid_cmp(false));
auto lower = std::lower_bound(centroids_.begin(), centroids_.end(), centroid(value, 1), centroid_cmp());
if (lower == centroids_.end()) throw std::logic_error("lower == end in get_rank()");
auto upper = std::upper_bound(lower, centroids_.end(), centroid(value, 1), centroid_cmp(false));
auto upper = std::upper_bound(lower, centroids_.end(), centroid(value, 1), centroid_cmp());
if (upper == centroids_.begin()) throw std::logic_error("upper == begin in get_rank()");
if (value < lower->get_mean()) --lower;
if (upper == centroids_.end() || (upper != centroids_.begin() && !((upper - 1)->get_mean() < value))) --upper;
Expand All @@ -138,9 +138,9 @@ double tdigest<T, A>::get_rank(T value) const {
weight_delta -= lower->get_weight() / 2.0;
weight_delta += upper->get_weight() / 2.0;
if (upper->get_mean() - lower->get_mean() > 0) {
return (weight_below + weight_delta * (value - lower->get_mean()) / (upper->get_mean() - lower->get_mean())) / total_weight_;
return (weight_below + weight_delta * (value - lower->get_mean()) / (upper->get_mean() - lower->get_mean())) / centroids_weight_;
}
return (weight_below + weight_delta / 2.0) / total_weight_;
return (weight_below + weight_delta / 2.0) / centroids_weight_;
}

template<typename T, typename A>
Expand All @@ -149,20 +149,20 @@ T tdigest<T, A>::get_quantile(double rank) const {
if ((rank < 0.0) || (rank > 1.0)) {
throw std::invalid_argument("Normalized rank cannot be less than 0 or greater than 1");
}
const_cast<tdigest*>(this)->merge_new_values(); // side effect
const_cast<tdigest*>(this)->merge_buffered(); // side effect
if (centroids_.size() == 1) return centroids_.front().get_mean();

// at least 2 centroids
const double weight = rank * total_weight_;
const double weight = rank * centroids_weight_;
if (weight < 1) return min_;
if (weight > total_weight_ - 1.0) return max_;
if (weight > centroids_weight_ - 1.0) return max_;
const double first_weight = centroids_.front().get_weight();
if (first_weight > 1 && weight < first_weight / 2.0) {
return min_ + (weight - 1.0) / (first_weight / 2.0 - 1.0) * (centroids_.front().get_mean() - min_);
}
const double last_weight = centroids_.back().get_weight();
if (last_weight > 1 && total_weight_ - weight <= last_weight / 2.0) {
return max_ + (total_weight_ - weight - 1.0) / (last_weight / 2.0 - 1.0) * (max_ - centroids_.back().get_mean());
if (last_weight > 1 && centroids_weight_ - weight <= last_weight / 2.0) {
return max_ + (centroids_weight_ - weight - 1.0) / (last_weight / 2.0 - 1.0) * (max_ - centroids_.back().get_mean());
}

// interpolate between extremes
Expand All @@ -187,7 +187,7 @@ T tdigest<T, A>::get_quantile(double rank) const {
}
weight_so_far += dw;
}
const double w1 = weight - total_weight_ - centroids_.back().get_weight() / 2.0;
const double w1 = weight - centroids_weight_ - centroids_.back().get_weight() / 2.0;
const double w2 = centroids_.back().get_weight() / 2.0 - w1;
return weighted_average(centroids_.back().get_weight(), w1, max_, w2);
}
Expand All @@ -209,77 +209,76 @@ string<A> tdigest<T, A>::to_string(bool print_centroids) const {
os << " Buffered : " << buffer_.size() << std::endl;
os << " Centroids capacity : " << centroids_capacity_ << std::endl;
os << " Buffer capacity : " << buffer_capacity_ << std::endl;
os << " Total Weight : " << total_weight_ << std::endl;
os << " Centroids Weight : " << centroids_weight_ << std::endl;
os << " Buffered Weight : " << buffered_weight_ << std::endl;
os << " Total Weight : " << get_total_weight() << std::endl;
os << " Reverse Merge : " << (reverse_merge_ ? "true" : "false") << std::endl;
if (!is_empty()) {
os << " Min : " << min_ << std::endl;
os << " Max : " << max_ << std::endl;
}
os << "### End t-Digest summary" << std::endl;
if (print_centroids) {
os << "Centroids:" << std::endl;
int i = 0;
for (auto centroid: centroids_) {
os << i << ": " << centroid.get_mean() << ", " << centroid.get_weight() << std::endl;
++i;
if (centroids_.size() > 0) {
os << "Centroids:" << std::endl;
int i = 0;
for (const auto& c: centroids_) {
os << i++ << ": " << c.get_mean() << ", " << c.get_weight() << std::endl;
}
}
if (buffer_.size() > 0) {
os << "Buffer:" << std::endl;
int i = 0;
for (const auto& b: buffer_) {
os << i++ << ": " << b.get_mean() << ", " << b.get_weight() << std::endl;
}
}
}
return string<A>(os.str().c_str(), allocator_);
}

template<typename T, typename A>
void tdigest<T, A>::merge_new_values() {
merge_new_values(false, internal_k_);
}

template<typename T, typename A>
void tdigest<T, A>::merge_new_values(bool force, uint16_t k) {
if (total_weight_ == 0 && buffered_weight_ == 0) return;
if (force || buffered_weight_ > 0) merge_new_values(k);
}

template<typename T, typename A>
void tdigest<T, A>::merge_new_values(uint16_t k) {
const bool reverse = USE_ALTERNATING_SORT & reverse_merge_;
for (const auto& centroid: centroids_) buffer_.push_back(centroid);
void tdigest<T, A>::merge_buffered() {
if (buffered_weight_ == 0) return;
const bool reverse = USE_ALTERNATING_SORT && reverse_merge_;
std::copy(centroids_.begin(), centroids_.end(), std::back_inserter(buffer_));
centroids_.clear();
std::stable_sort(buffer_.begin(), buffer_.end(), centroid_cmp(reverse));
total_weight_ += buffered_weight_;
std::stable_sort(buffer_.begin(), buffer_.end(), centroid_cmp());
if (reverse) std::reverse(buffer_.begin(), buffer_.end());
centroids_weight_ += buffered_weight_;
auto it = buffer_.begin();
centroids_.push_back(*it);
++it;
double weight_so_far = 0;
const double normalizer = scale_function().normalizer(k, total_weight_);
const double normalizer = scale_function().normalizer(internal_k_, centroids_weight_);
double k1 = scale_function().k(0, normalizer);
double w_limit = total_weight_ * scale_function().q(k1 + 1, normalizer);
double w_limit = centroids_weight_ * scale_function().q(k1 + 1, normalizer);
while (it != buffer_.end()) {
const double proposed_weight = centroids_.back().get_weight() + it->get_weight();
const double projected_weight = weight_so_far + proposed_weight;
bool add_this;
if (USE_WEIGHT_LIMIT) {
const double q0 = weight_so_far / total_weight_;
const double q2 = (weight_so_far + proposed_weight) / total_weight_;
add_this = proposed_weight <= total_weight_ * std::min(scale_function().max(q0, normalizer), scale_function().max(q2, normalizer));
} else {
add_this = projected_weight <= w_limit;
}
if (std::distance(buffer_.begin(), it) == 1 || std::distance(buffer_.end(), it) == 1) {
add_this = false;
} else if (USE_WEIGHT_LIMIT) {
const double q0 = weight_so_far / centroids_weight_;
const double q2 = (weight_so_far + proposed_weight) / centroids_weight_;
add_this = proposed_weight <= centroids_weight_ * std::min(scale_function().max(q0, normalizer), scale_function().max(q2, normalizer));
} else {
add_this = weight_so_far + proposed_weight <= w_limit;
}
if (add_this) {
centroids_.back().add(*it);
} else {
weight_so_far += centroids_.back().get_weight();
if (!USE_WEIGHT_LIMIT) {
k1 = scale_function().k(weight_so_far / total_weight_, normalizer);
w_limit = total_weight_ * scale_function().q(k1 + 1, normalizer);
k1 = scale_function().k(weight_so_far / centroids_weight_, normalizer);
w_limit = centroids_weight_ * scale_function().q(k1 + 1, normalizer);
}
centroids_.push_back(*it);
}
++it;
}
if (reverse) std::reverse(centroids_.begin(), centroids_.end());
if (total_weight_ > 0) {
if (centroids_weight_ > 0) {
min_ = std::min(min_, centroids_.front().get_mean());
max_ = std::max(max_, centroids_.back().get_mean());
}
Expand All @@ -295,7 +294,7 @@ double tdigest<T, A>::weighted_average(double x1, double w1, double x2, double w

template<typename T, typename A>
void tdigest<T, A>::serialize(std::ostream& os) const {
const_cast<tdigest*>(this)->merge_new_values(); // side effect
const_cast<tdigest*>(this)->merge_buffered(); // side effect
write(os, is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NON_EMPTY);
write(os, SERIAL_VERSION);
write(os, SKETCH_TYPE);
Expand All @@ -319,7 +318,7 @@ void tdigest<T, A>::serialize(std::ostream& os) const {

template<typename T, typename A>
auto tdigest<T, A>::serialize(unsigned header_size_bytes) const -> vector_bytes {
const_cast<tdigest*>(this)->merge_new_values(); // side effect
const_cast<tdigest*>(this)->merge_buffered(); // side effect
const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NON_EMPTY;
const size_t size_bytes = preamble_longs * sizeof(uint64_t) + sizeof(T) * 2 + sizeof(centroid) * centroids_.size();
vector_bytes bytes(size_bytes, 0, allocator_);
Expand Down Expand Up @@ -556,7 +555,7 @@ min_(min),
max_(max),
centroids_capacity_(0),
centroids_(std::move(centroids)),
total_weight_(total_weight),
centroids_weight_(total_weight),
buffer_capacity_(0),
buffer_(allocator),
buffered_weight_(0)
Expand All @@ -572,10 +571,8 @@ buffered_weight_(0)
double scale = std::max(1.0, static_cast<double>(buffer_capacity_) / centroids_capacity_ - 1.0);
if (!USE_TWO_LEVEL_COMPRESSION) scale = 1;
internal_k_ = std::ceil(std::sqrt(scale) * k_);
if (centroids_capacity_ < internal_k_ + fudge) {
centroids_capacity_ = internal_k_ + fudge;
}
if (buffer_capacity_ < 2 * centroids_capacity_) buffer_capacity_ = 2 * centroids_capacity_;
centroids_capacity_ = std::max(centroids_capacity_, internal_k_ + fudge);
buffer_capacity_ = std::max(buffer_capacity_, 2 * centroids_capacity_);
centroids_.reserve(centroids_capacity_);
buffer_.reserve(buffer_capacity_);
}
Expand Down

0 comments on commit 9bfe6e6

Please sign in to comment.