Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
tree: 148cba11ab
Fetching contributors…

Cannot retrieve contributors at this time

414 lines (355 sloc) 13.996 kb
/*
* Copyright 2012 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
#define FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
#include <glog/logging.h>
namespace folly {
template <typename VT, typename TT>
BucketedTimeSeries<VT, TT>::BucketedTimeSeries(size_t numBuckets,
TimeType duration)
: firstTime_(1),
latestTime_(0),
duration_(duration) {
// For tracking all-time data we only use total_, and don't need to bother
// with buckets_
if (!isAllTime()) {
// Round numBuckets down to duration_.count().
//
// There is no point in having more buckets than our timestamp
// granularity: otherwise we would have buckets that could never be used.
if (numBuckets > duration_.count()) {
numBuckets = duration_.count();
}
buckets_.resize(numBuckets, Bucket());
}
}
template <typename VT, typename TT>
void BucketedTimeSeries<VT, TT>::addValue(TimeType now, const ValueType& val) {
addValueAggregated(now, val, 1);
}
template <typename VT, typename TT>
void BucketedTimeSeries<VT, TT>::addValue(TimeType now,
const ValueType& val,
int64_t times) {
addValueAggregated(now, val * times, times);
}
template <typename VT, typename TT>
void BucketedTimeSeries<VT, TT>::addValueAggregated(TimeType now,
const ValueType& sum,
int64_t nsamples) {
// Make sure time doesn't go backwards
now = std::max(now, latestTime_);
if (isAllTime()) {
if (empty()) {
firstTime_ = now;
}
latestTime_ = now;
total_.add(sum, nsamples);
return;
}
// Update the buckets
size_t curBucket = update(now);
buckets_[curBucket].add(sum, nsamples);
// Update the aggregate sum/count
total_.add(sum, nsamples);
}
template <typename VT, typename TT>
size_t BucketedTimeSeries<VT, TT>::update(TimeType now) {
if (empty()) {
// This is the first data point.
firstTime_ = now;
}
// For all-time data, all we need to do is update latestTime_
if (isAllTime()) {
latestTime_ = std::max(latestTime_, now);
return 0;
}
// Make sure time doesn't go backwards.
// If the time is less than or equal to the latest time we have already seen,
// we don't need to do anything.
if (now <= latestTime_) {
return getBucketIdx(latestTime_);
}
// We could cache nextBucketStart as a member variable, so we don't have to
// recompute it each time update() is called with a new timestamp value.
// This makes things faster when update() (or addValue()) is called once
// per second, but slightly slower when update() is called multiple times a
// second. We care more about optimizing the cases where addValue() is being
// called frequently. If addValue() is only being called once every few
// seconds, it doesn't matter as much if it is fast.
// Get info about the bucket that latestTime_ points at
size_t currentBucket;
TimeType currentBucketStart;
TimeType nextBucketStart;
getBucketInfo(latestTime_, &currentBucket,
&currentBucketStart, &nextBucketStart);
// Update latestTime_
latestTime_ = now;
if (now < nextBucketStart) {
// We're still in the same bucket.
// We're done after updating latestTime_.
return currentBucket;
} else if (now >= currentBucketStart + duration_) {
// It's been a while. We have wrapped, and all of the buckets need to be
// cleared.
for (Bucket& bucket : buckets_) {
bucket.clear();
}
total_.clear();
return getBucketIdx(latestTime_);
} else {
// clear all the buckets between the last time and current time, meaning
// buckets in the range [(currentBucket+1), newBucket]. Note that
// the bucket (currentBucket+1) is always the oldest bucket we have. Since
// our array is circular, loop when we reach the end.
size_t newBucket = getBucketIdx(now);
size_t idx = currentBucket;
while (idx != newBucket) {
++idx;
if (idx >= buckets_.size()) {
idx = 0;
}
total_ -= buckets_[idx];
buckets_[idx].clear();
}
return newBucket;
}
}
template <typename VT, typename TT>
void BucketedTimeSeries<VT, TT>::clear() {
for (Bucket& bucket : buckets_) {
bucket.clear();
}
total_.clear();
// Set firstTime_ larger than latestTime_,
// to indicate that the timeseries is empty
firstTime_ = TimeType(1);
latestTime_ = TimeType(0);
}
template <typename VT, typename TT>
TT BucketedTimeSeries<VT, TT>::elapsed() const {
if (empty()) {
return TimeType(0);
}
if (isAllTime()) {
return latestTime_ - firstTime_ + TimeType(1);
}
size_t currentBucket;
TimeType currentBucketStart;
TimeType nextBucketStart;
getBucketInfo(latestTime_, &currentBucket,
&currentBucketStart, &nextBucketStart);
// Subtract 1 duration from the start of the next bucket to find the
// earliest possible data point we could be tracking.
TimeType earliestTime = nextBucketStart - duration_;
// We're never tracking data before firstTime_
earliestTime = std::max(earliestTime, firstTime_);
return latestTime_ - earliestTime + TimeType(1);
}
template <typename VT, typename TT>
VT BucketedTimeSeries<VT, TT>::sum(TimeType start, TimeType end) const {
ValueType sum = ValueType();
forEachBucket(start, end, [&](const Bucket& bucket,
TimeType bucketStart,
TimeType nextBucketStart) {
sum += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
bucket.sum);
return true;
});
return sum;
}
template <typename VT, typename TT>
uint64_t BucketedTimeSeries<VT, TT>::count(TimeType start, TimeType end) const {
uint64_t count = 0;
forEachBucket(start, end, [&](const Bucket& bucket,
TimeType bucketStart,
TimeType nextBucketStart) {
count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
bucket.count);
return true;
});
return count;
}
template <typename VT, typename TT>
template <typename ReturnType>
ReturnType BucketedTimeSeries<VT, TT>::avg(TimeType start, TimeType end) const {
ValueType sum = ValueType();
uint64_t count = 0;
forEachBucket(start, end, [&](const Bucket& bucket,
TimeType bucketStart,
TimeType nextBucketStart) {
sum += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
bucket.sum);
count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
bucket.count);
return true;
});
if (count == 0) {
return ReturnType(0);
}
return detail::avgHelper<ReturnType>(sum, count);
}
/*
* A note about some of the bucket index calculations below:
*
* buckets_.size() may not divide evenly into duration_. When this happens,
* some buckets will be wider than others. We still want to spread the data
* out as evenly as possible among the buckets (as opposed to just making the
* last bucket be significantly wider than all of the others).
*
* To make the division work out, we pretend that the buckets are each
* duration_ wide, so that the overall duration becomes
* buckets.size() * duration_.
*
* To transform a real timestamp into the scale used by our buckets,
* we have to multiply by buckets_.size(). To figure out which bucket it goes
* into, we then divide by duration_.
*/
template <typename VT, typename TT>
size_t BucketedTimeSeries<VT, TT>::getBucketIdx(TimeType time) const {
// For all-time data we don't use buckets_. Everything is tracked in total_.
DCHECK(!isAllTime());
time %= duration_;
return time.count() * buckets_.size() / duration_.count();
}
/*
* Compute the bucket index for the specified time, as well as the earliest
* time that falls into this bucket.
*/
template <typename VT, typename TT>
void BucketedTimeSeries<VT, TT>::getBucketInfo(
TimeType time, size_t *bucketIdx,
TimeType* bucketStart, TimeType* nextBucketStart) const {
typedef typename TimeType::rep TimeInt;
DCHECK(!isAllTime());
// Keep these two lines together. The compiler should be able to compute
// both the division and modulus with a single operation.
TimeType timeMod = time % duration_;
TimeInt numFullDurations = time / duration_;
TimeInt scaledTime = timeMod.count() * buckets_.size();
// Keep these two lines together. The compiler should be able to compute
// both the division and modulus with a single operation.
*bucketIdx = scaledTime / duration_.count();
TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
TimeType bucketStartMod((scaledBucketStart + buckets_.size() - 1) /
buckets_.size());
TimeType nextBucketStartMod((scaledNextBucketStart + buckets_.size() - 1) /
buckets_.size());
TimeType durationStart(numFullDurations * duration_.count());
*bucketStart = bucketStartMod + durationStart;
*nextBucketStart = nextBucketStartMod + durationStart;
}
template <typename VT, typename TT>
template <typename Function>
void BucketedTimeSeries<VT, TT>::forEachBucket(Function fn) const {
if (isAllTime()) {
fn(total_, firstTime_, latestTime_ + TimeType(1));
return;
}
typedef typename TimeType::rep TimeInt;
// Compute durationStart, latestBucketIdx, and scaledNextBucketStart,
// the same way as in getBucketInfo().
TimeType timeMod = latestTime_ % duration_;
TimeInt numFullDurations = latestTime_ / duration_;
TimeType durationStart(numFullDurations * duration_.count());
TimeInt scaledTime = timeMod.count() * buckets_.size();
size_t latestBucketIdx = scaledTime / duration_.count();
TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
// Walk through the buckets, starting one past the current bucket.
// The next bucket is from the previous cycle, so subtract 1 duration
// from durationStart.
size_t idx = latestBucketIdx;
durationStart -= duration_;
TimeType nextBucketStart =
TimeType((scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
durationStart;
while (true) {
++idx;
if (idx >= buckets_.size()) {
idx = 0;
durationStart += duration_;
scaledNextBucketStart = duration_.count();
} else {
scaledNextBucketStart += duration_.count();
}
TimeType bucketStart = nextBucketStart;
nextBucketStart = TimeType((scaledNextBucketStart + buckets_.size() - 1) /
buckets_.size()) + durationStart;
// Should we bother skipping buckets where firstTime_ >= nextBucketStart?
// For now we go ahead and invoke the function with these buckets.
// sum and count should always be 0 in these buckets.
DCHECK_LE(bucketStart.count(), latestTime_.count());
bool ret = fn(buckets_[idx], bucketStart, nextBucketStart);
if (!ret) {
break;
}
if (idx == latestBucketIdx) {
// all done
break;
}
}
}
/*
* Adjust the input value from the specified bucket to only account
* for the desired range.
*
* For example, if the bucket spans time [10, 20), but we only care about the
* range [10, 16), this will return 60% of the input value.
*/
template<typename VT, typename TT>
VT BucketedTimeSeries<VT, TT>::rangeAdjust(
TimeType bucketStart, TimeType nextBucketStart,
TimeType start, TimeType end, ValueType input) const {
// If nextBucketStart is greater than latestTime_, treat nextBucketStart as
// if it were latestTime_. This makes us more accurate when someone is
// querying for all of the data up to latestTime_. Even though latestTime_
// may only be partially through the bucket, we don't want to adjust
// downwards in this case, because the bucket really only has data up to
// latestTime_.
if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) {
nextBucketStart = latestTime_ + TimeType(1);
}
if (start <= bucketStart && end >= nextBucketStart) {
// The bucket is wholly contained in the [start, end) interval
return input;
}
TimeType intervalStart = std::max(start, bucketStart);
TimeType intervalEnd = std::min(end, nextBucketStart);
return input * (intervalEnd - intervalStart) /
(nextBucketStart - bucketStart);
}
template <typename VT, typename TT>
template <typename Function>
void BucketedTimeSeries<VT, TT>::forEachBucket(TimeType start, TimeType end,
Function fn) const {
forEachBucket([&start, &end, &fn] (const Bucket& bucket, TimeType bucketStart,
TimeType nextBucketStart) {
if (start >= nextBucketStart) {
return true;
}
if (end <= bucketStart) {
return false;
}
bool ret = fn(bucket, bucketStart, nextBucketStart);
return ret;
});
}
} // folly
#endif // FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
Jump to Line
Something went wrong with that request. Please try again.