diff --git a/src/tdigest.c b/src/tdigest.c index 60a8ea5..40ef04d 100644 --- a/src/tdigest.c +++ b/src/tdigest.c @@ -10,41 +10,125 @@ #include TD_MALLOC_INCLUDE -void bbzero(void *to, size_t count) { memset(to, 0, count); } +#define MAX(x, y) (((x) > (y)) ? (x) : (y)) +#define MIN(x, y) (((x) < (y)) ? (x) : (y)) -static inline bool is_very_small(double val) { return !(val > .000000001 || val < -.000000001); } +static inline double weighted_average_sorted(double x1, double w1, double x2, double w2) { + const double x = (x1 * w1 + x2 * w2) / (w1 + w2); + return MAX(x1, MIN(x, x2)); +} + +static inline double weighted_average(double x1, double w1, double x2, double w2) { + if (x1 <= x2) { + return weighted_average_sorted(x1, w1, x2, w2); + } else { + return weighted_average_sorted(x2, w2, x1, w1); + } +} + +static void inline swap(double *arr, int i, int j) { + const double temp = arr[i]; + arr[i] = arr[j]; + arr[j] = temp; +} + +static unsigned int partition(double *means, double *weights, unsigned int start, unsigned int end, + unsigned int pivot_idx) { + const double pivotMean = means[pivot_idx]; + swap(means, pivot_idx, end); + swap(weights, pivot_idx, end); + + int i = start - 1; + + for (unsigned int j = start; j < end; j++) { + // If current element is smaller than the pivot + if (means[j] < pivotMean) { + // increment index of smaller element + i++; + swap(means, i, j); + swap(weights, i, j); + } + } + swap(means, i + 1, end); + swap(weights, i + 1, end); + return i + 1; +} + +/** + * Standard quick sort except that sorting rearranges parallel arrays + * + * @param means Values to sort on + * @param weights The auxillary values to sort. + * @param start The beginning of the values to sort + * @param end The value after the last value to sort + */ +void td_qsort(double *means, double *weights, unsigned int start, unsigned int end) { + if (start < end) { + // two elements can be directly compared + if ((end - start) == 1) { + if (means[start] > means[end]) { + swap(means, start, end); + swap(weights, start, end); + } + return; + } + // generating a random number as a pivot was very expensive vs the array size + // const unsigned int pivot_idx = start + rand()%(end - start + 1); + const unsigned int pivot_idx = (end + start) / 2; // central pivot + const unsigned int new_pivot_idx = partition(means, weights, start, end, pivot_idx); + if (new_pivot_idx > start) { + td_qsort(means, weights, start, new_pivot_idx - 1); + } + td_qsort(means, weights, new_pivot_idx + 1, end); + } +} static inline int cap_from_compression(double compression) { return (6 * (int)(compression)) + 10; } -static bool should_td_compress(td_histogram_t *h) { +static inline bool should_td_compress(td_histogram_t *h) { return ((h->merged_nodes + h->unmerged_nodes) == h->cap); } -static int next_node(td_histogram_t *h) { return h->merged_nodes + h->unmerged_nodes; } +static inline int next_node(td_histogram_t *h) { return h->merged_nodes + h->unmerged_nodes; } void td_compress(td_histogram_t *h); -int td_number_centroids(td_histogram_t *h) { return next_node(h); } - -//////////////////////////////////////////////////////////////////////////////// -// Constructors -//////////////////////////////////////////////////////////////////////////////// +int td_centroid_count(td_histogram_t *h) { return next_node(h); } -static size_t td_required_buf_size(double compression) { - return sizeof(td_histogram_t) + (cap_from_compression(compression) * sizeof(node_t)); +void td_reset(td_histogram_t *h) { + if (!h) { + return; + } + h->min = __DBL_MAX__; + h->max = __DBL_MIN__; + h->merged_nodes = 0; + h->merged_weight = 0; + h->unmerged_nodes = 0; + h->unmerged_weight = 0; + h->total_compressions = 0; } int td_init(double compression, td_histogram_t **result) { - size_t buf_size = td_required_buf_size(compression); - td_histogram_t *histogram = (td_histogram_t *)((char *)(__td_malloc(buf_size))); + + const double capacity = cap_from_compression(compression); + td_histogram_t *histogram; + histogram = (td_histogram_t *)__td_calloc(1, sizeof(td_histogram_t)); if (!histogram) { return 1; } - bbzero((void *)(histogram), buf_size); - histogram->cap = (buf_size - sizeof(td_histogram_t)) / sizeof(node_t); + histogram->cap = capacity; histogram->compression = compression; td_reset(histogram); + histogram->nodes_mean = (double *)__td_calloc(capacity, sizeof(double)); + if (!histogram->nodes_mean) { + return 1; + } + histogram->nodes_weight = (double *)__td_calloc(capacity, sizeof(double)); + if (!histogram->nodes_weight) { + return 1; + } *result = histogram; + return 0; } @@ -60,118 +144,226 @@ void td_merge(td_histogram_t *into, td_histogram_t *from) { td_compress(into); td_compress(from); for (int i = 0; i < from->merged_nodes; i++) { - node_t *n = &from->nodes[i]; - td_add(into, n->mean, n->count); - } -} - -int td_centroid_count(td_histogram_t *h) { return next_node(h); } - -void td_reset(td_histogram_t *h) { - if (h == NULL) { - return; + const double mean = from->nodes_mean[i]; + const double count = from->nodes_weight[i]; + td_add(into, mean, count); } - h->min = __DBL_MAX__; - h->max = __DBL_MIN__; - h->merged_nodes = 0; - h->merged_weight = 0; - h->unmerged_nodes = 0; - h->unmerged_weight = 0; - h->total_compressions = 0; } double td_size(td_histogram_t *h) { return h->merged_weight + h->unmerged_weight; } double td_cdf(td_histogram_t *h, double val) { td_compress(h); + // no data to examine if (h->merged_nodes == 0) { return NAN; } - double k = 0; - int i = 0; - node_t *n = NULL; - for (i = 0; i < h->merged_nodes; i++) { - n = &h->nodes[i]; - if (n->mean >= val) { - break; + // bellow lower bound + if (val < h->min) { + return 0; + } + // above upper bound + if (val > h->max) { + return 1; + } + if (h->merged_nodes == 1) { + // exactly one centroid, should have max==min + const double width = h->max - h->min; + if (val - h->min <= width) { + // min and max are too close together to do any viable interpolation + return 0.5; + } else { + // interpolate if somehow we have weight > 0 and max != min + return (val - h->min) / width; } - k += n->count; } - if (n == NULL) { - return NAN; + const int n = h->merged_nodes; + // check for the left tail + const double left_centroid_mean = h->nodes_mean[0]; + const double left_centroid_weight = h->nodes_weight[0]; + if (val < left_centroid_mean) { + // note that this is different than h->nodes_mean[0] > min + // ... this guarantees we divide by non-zero number and interpolation works + const double width = left_centroid_mean - h->min; + if (width > 0) { + // must be a sample exactly at min + if (val == h->min) { + return 0.5 / h->merged_weight; + } else { + return (1 + (val - h->min) / width * (left_centroid_weight / 2 - 1)) / + h->merged_weight; + } + } else { + // this should be redundant with the check val < h->min + return 0; + } } - if (val == n->mean) { - // technically this needs to find all of the nodes which contain this value and sum their - // weight - double count_at_value = n->count; - for (i += 1; i < h->merged_nodes && h->nodes[i].mean == n->mean; i++) { - count_at_value += h->nodes[i].count; + // and the right tail + const double right_centroid_mean = h->nodes_mean[n - 1]; + const double right_centroid_weight = h->nodes_weight[n - 1]; + if (val > right_centroid_mean) { + const double width = h->max - right_centroid_mean; + if (width > 0) { + if (val == h->max) { + return 1 - 0.5 / h->merged_weight; + } else { + // there has to be a single sample exactly at max + const double dq = (1 + (h->max - val) / width * (right_centroid_weight / 2 - 1)) / + h->merged_weight; + return 1 - dq; + } + } else { + return 1; } - return (k + (count_at_value / 2)) / h->merged_weight; - } else if (val > n->mean) { // past the largest - return 1; - } else if (i == 0) { - return 0; } - // we want to figure out where along the line from the prev node to this node, the value falls - node_t *nr = n; - node_t *nl = n - 1; - k -= (nl->count / 2); - // we say that at zero we're at nl->mean - // and at (nl->count/2 + nr->count/2) we're at nr - double m = (nr->mean - nl->mean) / (nl->count / 2 + nr->count / 2); - double x = (val - nl->mean) / m; - return (k + x) / h->merged_weight; + // we know that there are at least two centroids and mean[0] < x < mean[n-1] + // that means that there are either one or more consecutive centroids all at exactly x + // or there are consecutive centroids, c0 < x < c1 + double weightSoFar = 0; + for (int it = 0; it < n - 1; it++) { + // weightSoFar does not include weight[it] yet + if (h->nodes_mean[it] == val) { + // we have one or more centroids == x, treat them as one + // dw will accumulate the weight of all of the centroids at x + double dw = 0; + while (it < n && h->nodes_mean[it] == val) { + dw += h->nodes_weight[it]; + it++; + } + return (weightSoFar + dw / 2) / h->merged_weight; + } else if (h->nodes_mean[it] <= val && val < h->nodes_mean[it + 1]) { + const double node_weight = h->nodes_weight[it]; + const double node_weight_next = h->nodes_weight[it + 1]; + const double node_mean = h->nodes_mean[it]; + const double node_mean_next = h->nodes_mean[it + 1]; + // landed between centroids ... check for floating point madness + if (node_mean_next - node_mean > 0) { + // note how we handle singleton centroids here + // the point is that for singleton centroids, we know that their entire + // weight is exactly at the centroid and thus shouldn't be involved in + // interpolation + double leftExcludedW = 0; + double rightExcludedW = 0; + if (node_weight == 1) { + if (node_weight_next == 1) { + // two singletons means no interpolation + // left singleton is in, right is out + return (weightSoFar + 1) / h->merged_weight; + } else { + leftExcludedW = 0.5; + } + } else if (node_weight_next == 1) { + rightExcludedW = 0.5; + } + double dw = (node_weight + node_weight_next) / 2; + + // adjust endpoints for any singleton + double dwNoSingleton = dw - leftExcludedW - rightExcludedW; + + double base = weightSoFar + node_weight / 2 + leftExcludedW; + return (base + dwNoSingleton * (val - node_mean) / (node_mean_next - node_mean)) / + h->merged_weight; + } else { + // this is simply caution against floating point madness + // it is conceivable that the centroids will be different + // but too near to allow safe interpolation + double dw = (node_weight + node_weight_next) / 2; + return (weightSoFar + dw) / h->merged_weight; + } + } else { + weightSoFar += h->nodes_weight[it]; + } + } + return 1 - 0.5 / h->merged_weight; } double td_quantile(td_histogram_t *h, double q) { td_compress(h); - if (q < 0 || q > 1 || h->merged_nodes == 0) { + // q should be in [0,1] + if (q < 0.0 || q > 1.0 || h->merged_nodes == 0) { return NAN; } - // if left of the first node, use the first node - // if right of the last node, use the last node, use it - double goal = q * h->merged_weight; - double k = 0; - int i = 0; - node_t *n = NULL; - for (i = 0; i < h->merged_nodes; i++) { - n = &h->nodes[i]; - if (k + n->count > goal) { - break; - } - k += n->count; + // with one data point, all quantiles lead to Rome + if (h->merged_nodes == 1) { + return h->nodes_mean[0]; } - if (n == NULL) { - return NAN; + + // if values were stored in a sorted array, index would be the offset we are interested in + const double index = q * h->merged_weight; + + // beyond the boundaries, we return min or max + // usually, the first centroid will have unit weight so this will make it moot + if (index < 1) { + return h->min; } - double delta_k = goal - k - (n->count / 2); - if (is_very_small(delta_k)) { - return n->mean; + + // we know that there are at least two centroids now + const int n = h->merged_nodes; + + // if the left centroid has more than one sample, we still know + // that one sample occurred at min so we can do some interpolation + const double left_centroid_weight = h->nodes_weight[0]; + if (left_centroid_weight > 1 && index < left_centroid_weight / 2) { + // there is a single sample at min so we interpolate with less weight + return h->min + (index - 1) / (left_centroid_weight / 2 - 1) * (h->nodes_mean[0] - h->min); } - bool right = delta_k > 0; - if ((right && ((i + 1) == h->merged_nodes)) || (!right && (i == 0))) { - return n->mean; + + // usually the last centroid will have unit weight so this test will make it moot + if (index > h->merged_weight - 1) { + return h->max; } - node_t *nl; - node_t *nr; - if (right) { - nl = n; - nr = &h->nodes[i + 1]; - k += (n->count / 2); - } else { - nl = &h->nodes[i - 1]; - nr = n; - k -= (n->count / 2); + + // if the right-most centroid has more than one sample, we still know + // that one sample occurred at max so we can do some interpolation + const double right_centroid_weight = h->nodes_weight[n - 1]; + if (right_centroid_weight > 1 && h->merged_weight - index <= right_centroid_weight / 2) { + return h->max - (h->merged_weight - index - 1) / (right_centroid_weight / 2 - 1) * + (h->max - h->nodes_mean[n - 1]); } - double x = goal - k; - // we have two points (0, nl->mean), (nr->count, nr->mean) - // and we want x - double m = (nr->mean - nl->mean) / (nl->count / 2 + nr->count / 2); - return m * x + nl->mean; + + // in between extremes we interpolate between centroids + double weightSoFar = left_centroid_weight / 2; + for (int i = 0; i < n - 1; i++) { + const double node_weight = h->nodes_weight[i]; + const double node_weight_next = h->nodes_weight[i + 1]; + const double node_mean = h->nodes_mean[i]; + const double node_mean_next = h->nodes_mean[i + 1]; + const double dw = (node_weight + node_weight_next) / 2; + if (weightSoFar + dw > index) { + // centroids i and i+1 bracket our current point + // check for unit weight + double leftUnit = 0; + if (node_weight == 1) { + if (index - weightSoFar < 0.5) { + // within the singleton's sphere + return node_mean; + } else { + leftUnit = 0.5; + } + } + double rightUnit = 0; + if (node_weight_next == 1) { + if (weightSoFar + dw - index <= 0.5) { + // no interpolation needed near singleton + return node_mean_next; + } + rightUnit = 0.5; + } + const double z1 = index - weightSoFar - leftUnit; + const double z2 = weightSoFar + dw - index - rightUnit; + return weighted_average(node_mean, z2, node_mean_next, z1); + } + weightSoFar += dw; + } + + // weightSoFar = totalWeight - weight[n-1]/2 (very nearly) + // so we interpolate out to max value ever seen + const double z1 = index - h->merged_weight - right_centroid_weight / 2.0; + const double z2 = right_centroid_weight / 2 - z1; + return weighted_average(h->nodes_mean[n - 1], z1, h->max, z2); } -void td_add(td_histogram_t *h, double mean, double count) { +void td_add(td_histogram_t *h, double mean, double weight) { if (should_td_compress(h)) { td_compress(h); } @@ -181,26 +373,11 @@ void td_add(td_histogram_t *h, double mean, double count) { if (mean > h->max) { h->max = mean; } - h->nodes[next_node(h)] = (node_t){ - .mean = mean, - .count = count, - }; + const int pos = next_node(h); + h->nodes_mean[pos] = mean; + h->nodes_weight[pos] = weight; h->unmerged_nodes++; - h->unmerged_weight += count; -} - -static int compare_nodes(const void *v1, const void *v2) { - node_t *n1 = (node_t *)(v1); - node_t *n2 = (node_t *)(v2); - const double n1m = n1->mean; - const double n2m = n2->mean; - if (n1m < n2m) { - return -1; - } - if (n1m > n2m) { - return 1; - } - return 0; + h->unmerged_weight += weight; } void td_compress(td_histogram_t *h) { @@ -208,39 +385,43 @@ void td_compress(td_histogram_t *h) { return; } int N = h->merged_nodes + h->unmerged_nodes; - qsort((void *)(h->nodes), N, sizeof(node_t), &compare_nodes); - double total_count = h->merged_weight + h->unmerged_weight; - double denom = 2 * MM_PI * total_count * log(total_count); - double normalizer = h->compression / denom; + td_qsort(h->nodes_mean, h->nodes_weight, 0, N - 1); + const double total_weight = h->merged_weight + h->unmerged_weight; + const double denom = 2 * MM_PI * total_weight * log(total_weight); + // Compute the normalizer given compression and number of points. + const double normalizer = h->compression / denom; int cur = 0; - double count_so_far = 0; + double weight_so_far = 0; + for (int i = 1; i < N; i++) { - double proposed_count = h->nodes[cur].count + h->nodes[i].count; - double z = proposed_count * normalizer; - double q0 = count_so_far / total_count; - double q2 = (count_so_far + proposed_count) / total_count; - bool should_add = (z <= (q0 * (1 - q0))) && (z <= (q2 * (1 - q2))); + const double proposed_weight = h->nodes_weight[cur] + h->nodes_weight[i]; + const double z = proposed_weight * normalizer; + // quantile up to cur + const double q0 = weight_so_far / total_weight; + // quantile up to cur + i + const double q2 = (weight_so_far + proposed_weight) / total_weight; + // Convert a quantile to the k-scale + const bool should_add = (z <= (q0 * (1 - q0))) && (z <= (q2 * (1 - q2))); // next point will fit // so merge into existing centroid if (should_add) { - h->nodes[cur].count += h->nodes[i].count; - double delta = h->nodes[i].mean - h->nodes[cur].mean; - double weighted_delta = (delta * h->nodes[i].count) / h->nodes[cur].count; - h->nodes[cur].mean += weighted_delta; + h->nodes_weight[cur] += h->nodes_weight[i]; + const double delta = h->nodes_mean[i] - h->nodes_mean[cur]; + const double weighted_delta = (delta * h->nodes_weight[i]) / h->nodes_weight[cur]; + h->nodes_mean[cur] += weighted_delta; } else { - count_so_far += h->nodes[cur].count; + weight_so_far += h->nodes_weight[cur]; cur++; - h->nodes[cur] = h->nodes[i]; + h->nodes_weight[cur] = h->nodes_weight[i]; + h->nodes_mean[cur] = h->nodes_mean[i]; } if (cur != i) { - h->nodes[i] = (node_t){ - .mean = 0, - .count = 0, - }; + h->nodes_weight[i] = 0.0; + h->nodes_mean[i] = 0.0; } } h->merged_nodes = cur + 1; - h->merged_weight = total_count; + h->merged_weight = total_weight; h->unmerged_nodes = 0; h->unmerged_weight = 0; h->total_compressions++; @@ -252,22 +433,20 @@ double td_max(td_histogram_t *h) { return h->max; } int td_compression(td_histogram_t *h) { return h->compression; } -// TODO: -const double *td_centroids_weight(td_histogram_t *h) { return NULL; } +const double *td_centroids_weight(td_histogram_t *h) { return h->nodes_weight; } -// TODO: -const double *td_centroids_mean(td_histogram_t *h) { return NULL; } +const double *td_centroids_mean(td_histogram_t *h) { return h->nodes_mean; } double td_centroids_weight_at(td_histogram_t *h, int pos) { if (pos < 0 || pos > h->merged_nodes) { return NAN; } - return h->nodes[pos].count; + return h->nodes_weight[pos]; } double td_centroids_mean_at(td_histogram_t *h, int pos) { if (pos < 0 || pos > h->merged_nodes) { return NAN; } - return h->nodes[pos].mean; + return h->nodes_mean[pos]; } diff --git a/src/tdigest.h b/src/tdigest.h index 0a0349c..fb68cf7 100644 --- a/src/tdigest.h +++ b/src/tdigest.h @@ -25,11 +25,6 @@ #define MM_PI 3.14159265358979323846 -typedef struct node { - double mean; - double count; -} node_t; - struct td_histogram { // compression is a setting used to configure the size of centroids when merged. double compression; @@ -50,7 +45,8 @@ struct td_histogram { double merged_weight; double unmerged_weight; - node_t nodes[]; + double *nodes_mean; + double *nodes_weight; }; typedef struct td_histogram td_histogram_t;