Skip to content

Commit

Permalink
Align the start of downsampling
Browse files Browse the repository at this point in the history
Aligned the start time of downsampling by the given downsampling interval
so that the same set of data points belongs to the same downsampling period
regardless of the start time of a user-requested time range.
- Made the start time of a downsampling period its representative timestamp
  without computing the average of the timestamps of the data points of the
  period.
- Factored downsampling code out from Span.java to its own class to make it
  easier to write unit tests.
- Made the type of downsampling result double regardless of the original type
  to get rid of the loop that checked the type of the raw input data points.
  By using double, we could handle very small integers and very big integers
  within a reasonable error margin. Some values are too small to round off.
  For example, the integer average of four values (2, 2, 2, 1) is 1, which is
  far from the real average of 1.75. Some values are so big that it could
  cause long-integer overflow while downsampling.
- Factored out the piece of code that computes the rate of changes to make
  it easier to understand and test.
- Factored out the piece of code that iterates over a list of spans to aggregate.
- Added unit tests against the AggregationIter class.

Signed-off-by: Chris Larsen <clarsen@euphoriaaudio.com>
  • Loading branch information
jesse5e authored and manolama committed Jun 25, 2014
1 parent aa8b603 commit 9c35480
Show file tree
Hide file tree
Showing 17 changed files with 2,911 additions and 937 deletions.
11 changes: 11 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dist_noinst_SCRIPTS = src/create_table.sh src/upgrade_1to2.sh src/mygnuplot.sh \
dist_noinst_DATA = pom.xml.in build-aux/rpm/opentsdb.conf \
build-aux/rpm/logback.xml build-aux/rpm/init.d/opentsdb
tsdb_SRC := \
src/core/AggregationIterator.java \
src/core/Aggregator.java \
src/core/Aggregators.java \
src/core/ByteBufferList.java \
Expand All @@ -39,12 +40,16 @@ tsdb_SRC := \
src/core/DataPoint.java \
src/core/DataPoints.java \
src/core/DataPointsIterator.java \
src/core/Downsampler.java \
src/core/IncomingDataPoint.java \
src/core/IncomingDataPoints.java \
src/core/IllegalDataException.java \
src/core/Internal.java \
src/core/MutableDataPoint.java \
src/core/MutableDoubleDataPoint.java \
src/core/Query.java \
src/core/RateOptions.java \
src/core/RateSpan.java \
src/core/RowKey.java \
src/core/RowSeq.java \
src/core/SeekableView.java \
Expand Down Expand Up @@ -136,9 +141,15 @@ tsdb_DEPS = \
$(ZOOKEEPER)

test_SRC := \
test/core/SeekableViewsForTest.java \
test/core/TestAggregationIterator.java \
test/core/TestAggregators.java \
test/core/TestCompactionQueue.java \
test/core/TestDownsampler.java \
test/core/TestInternal.java \
test/core/TestMutableDataPoint.java \
test/core/TestMutableDoubleDataPoint.java \
test/core/TestRateSpan.java \
test/core/TestRowSeq.java \
test/core/TestSpan.java \
test/core/TestTags.java \
Expand Down
689 changes: 689 additions & 0 deletions src/core/AggregationIterator.java

Large diffs are not rendered by default.

225 changes: 225 additions & 0 deletions src/core/Downsampler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
// This file is part of OpenTSDB.
// Copyright (C) 2014 The OpenTSDB Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 2.1 of the License, or (at your
// option) any later version. This program is distributed in the hope that it
// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details. You should have received a copy
// of the GNU Lesser General Public License along with this program. If not,
// see <http://www.gnu.org/licenses/>.
package net.opentsdb.core;

import java.util.NoSuchElementException;


/**
* Iterator that downsamples data points using an {@link Aggregator}.
*/
public class Downsampler implements SeekableView {

/** Function to use for downsampling. */
private final Aggregator downsampler;
/** Iterator to iterate the values of the current interval. */
private final ValuesInInterval values_in_interval;
// NOTE: Uses MutableDoubleDataPoint to reduce memory allocation overhead.
// TODO: use primitives for data_point instead in order to reduce memory and
// CPU overhead even more.
/** The current downsample result. */
private MutableDoubleDataPoint data_point = new MutableDoubleDataPoint();

/**
* Ctor.
* @param source The iterator to access the underlying data.
* @param interval_ms The interval in milli seconds wanted between each data
* point.
* @param downsampler The downsampling function to use.
*/
Downsampler(final SeekableView source,
final long interval_ms,
final Aggregator downsampler) {
this.values_in_interval = new ValuesInInterval(source, interval_ms);
this.downsampler = downsampler;
}

/**
* Downsamples for the current interval.
* @return A {@link DataPoint} as a downsample result.
*/
private DataPoint downsample() {
double downsample_value = downsampler.runDouble(values_in_interval);
data_point.reset(values_in_interval.getIntervalTimestamp(),
downsample_value);
values_in_interval.moveToNextInterval();
return data_point;
}

// ------------------ //
// Iterator interface //
// ------------------ //

public boolean hasNext() {
return values_in_interval.hasNextValue();
}

public DataPoint next() {
if (hasNext()) {
return downsample();
}
throw new NoSuchElementException("no more data points in " + this);
}

public void remove() {
throw new UnsupportedOperationException();
}

// ---------------------- //
// SeekableView interface //
// ---------------------- //

public void seek(final long timestamp) {
values_in_interval.seekInterval(timestamp);
}

@Override
public String toString() {
final StringBuilder buf = new StringBuilder();
buf.append("Downsampler: ")
.append("interval_ms=").append(values_in_interval.interval_ms)
.append(", downsampler=").append(downsampler)
.append(", current data=(").append(data_point)
.append("), values_in_interval=").append(values_in_interval);
return buf.toString();
}

/** Iterates source values for an interval. */
private static class ValuesInInterval implements Aggregator.Doubles {

/** The iterator of original source values. */
private final SeekableView source;
/** The sampling interval in milliseconds. */
private final long interval_ms;
/** The end of the current interval. */
private long timestamp_end_interval = Long.MIN_VALUE;
/** True if the last value was successfully extracted from the source. */
private boolean has_next_value_from_source = false;
/** The last data point extracted from the source. */
private MutableDoubleDataPoint next_dp = new MutableDoubleDataPoint();

/** True if it is initialized for iterating intervals. */
private boolean initialized = false;

/**
* Constructor.
* @param source The iterator to access the underlying data.
* @param interval_ms Downsampling interval.
*/
ValuesInInterval(final SeekableView source, final long interval_ms) {
this.source = source;
this.interval_ms = interval_ms;
this.timestamp_end_interval = interval_ms;
}

/** Initializes to iterate intervals. */
private void initializeIfNotDone() {
// NOTE: Delay initialization is required to not access any data point
// from the source until a user requests it explicitly to avoid the severe
// performance penalty by accessing the unnecessary first data of a span.
if (!initialized) {
initialized = true;
moveToNextValue();
resetEndOfInterval();
}
}

/** Extracts the next value from the source. */
private void moveToNextValue() {
if (source.hasNext()) {
has_next_value_from_source = true;
next_dp.reset(source.next());
} else {
has_next_value_from_source = false;
}
}

/**
* Resets the current interval with the interval of the timestamp of
* the next value read from source. It is the first value of the next
* interval. */
private void resetEndOfInterval() {
if (has_next_value_from_source) {
long timestamp = next_dp.timestamp();
// Sets the end of the interval of the timestamp.
timestamp_end_interval = alignTimestamp(timestamp) + interval_ms;
}
}

/** Moves to the next available interval. */
void moveToNextInterval() {
initializeIfNotDone();
resetEndOfInterval();
}

/** Advances the interval iterator to the given timestamp. */
void seekInterval(long timestamp) {
// To make sure that the interval of the given timestamp is fully filled,
// rounds up the seeking timestamp to the smallest timestamp that is
// a multiple of the interval and is greater than or equal to the given
// timestamp..
source.seek(alignTimestamp(timestamp + interval_ms - 1));
initialized = false;
}

/** Returns the representative timestamp of the current interval. */
private long getIntervalTimestamp() {
// NOTE: It is well-known practice taking the start time of
// a downsample interval as a representative timestamp of it. It also
// provides the correct context for seek.
return alignTimestamp(timestamp_end_interval - interval_ms);
}

/** Returns timestamp aligned by interval. */
private long alignTimestamp(long timestamp) {
return timestamp - (timestamp % interval_ms);
}

// ---------------------- //
// Doubles interface //
// ---------------------- //

@Override
public boolean hasNextValue() {
initializeIfNotDone();
return has_next_value_from_source &&
next_dp.timestamp() < timestamp_end_interval;
}

@Override
public double nextDoubleValue() {
if (hasNextValue()) {
double value = next_dp.toDouble();
moveToNextValue();
return value;
}
throw new NoSuchElementException("no more values in interval of "
+ timestamp_end_interval);
}

@Override
public String toString() {
final StringBuilder buf = new StringBuilder();
buf.append("ValuesInInterval: ")
.append("interval_ms=").append(interval_ms)
.append(", timestamp_end_interval=").append(timestamp_end_interval)
.append(", has_next_value_from_source=")
.append(has_next_value_from_source);
if (has_next_value_from_source) {
buf.append(", nextValue=(").append(next_dp).append(')');
}
buf.append(", source=").append(source);
return buf.toString();
}
}
}
Loading

0 comments on commit 9c35480

Please sign in to comment.