Skip to content

Commit

Permalink
Simplify the write path.
Browse files Browse the repository at this point in the history
The idea behind IncomingDataPoints never really took off.  This class
was originally supposed to allow the TSD to maintain an in-memory cache
of "recent" data that had been written to HBase.  This in-memory cache
was never implemented and cannot actually be easily implemented because
we can no longer guarantee that a TSD sees all the data points going to
a particular row.  The idea was that this in-memory cache could be used
to serve queries on recent data and to ride over HBase outages (i.e.
serve recent data even if HBase was down).  I don't see TSD doing this
any time soon, so this change starts to bypass IncomingDataPoints, which
does some complicated things that aren't actually necessary.

Change-Id: Ifef6227e6f1d2766b14fe5b5701ece62ea5973c4
  • Loading branch information
tsuna committed Jun 14, 2011
1 parent f23c1a1 commit 9690843
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 67 deletions.
49 changes: 27 additions & 22 deletions src/core/IncomingDataPoints.java
Expand Up @@ -88,7 +88,11 @@ final class IncomingDataPoints implements WritableDataPoints {
this.values = new long[3];
}

public void setSeries(final String metric, final Map<String, String> tags) {
/**
* Validates the given metric and tags.
* @throws IllegalArgumentException if any of the arguments aren't valid.
*/
static void checkMetricAndTags(final String metric, final Map<String, String> tags) {
if (tags.size() <= 0) {
throw new IllegalArgumentException("Need at least one tags (metric="
+ metric + ", tags=" + tags + ')');
Expand All @@ -102,7 +106,15 @@ public void setSeries(final String metric, final Map<String, String> tags) {
Tags.validateString("tag name", tag.getKey());
Tags.validateString("tag name", tag.getValue());
}
}

/**
* Returns a partially initialized row key for this metric and these tags.
* The only thing left to fill in is the base timestamp.
*/
static byte[] rowKeyTemplate(final TSDB tsdb,
final String metric,
final Map<String, String> tags) {
final short metric_width = tsdb.metrics.width();
final short tag_name_width = tsdb.tag_names.width();
final short tag_value_width = tsdb.tag_values.width();
Expand All @@ -111,46 +123,39 @@ public void setSeries(final String metric, final Map<String, String> tags) {
int row_size = (metric_width + Const.TIMESTAMP_BYTES
+ tag_name_width * num_tags
+ tag_value_width * num_tags);
if (row == null || row.length != row_size) {
row = new byte[row_size];
}
size = 0;
final byte[] row = new byte[row_size];

short pos = 0;

copyInRowKey(pos, (AUTO_METRIC ? tsdb.metrics.getOrCreateId(metric)
copyInRowKey(row, pos, (AUTO_METRIC ? tsdb.metrics.getOrCreateId(metric)
: tsdb.metrics.getId(metric)));
pos += metric_width;

pos += Const.TIMESTAMP_BYTES;

for(final byte[] tag : Tags.resolveOrCreateAll(tsdb, tags)) {
copyInRowKey(pos, tag);
copyInRowKey(row, pos, tag);
pos += tag.length;
}
return row;
}

public void setSeries(final String metric, final Map<String, String> tags) {
checkMetricAndTags(metric, tags);
row = rowKeyTemplate(tsdb, metric, tags);
size = 0;
}

/**
* Copies the specified byte array at the specified offset in the row key.
* @param row The row key into which to copy the bytes.
* @param offset The offset in the row key to start writing at.
* @param bytes The bytes to copy.
*/
private void copyInRowKey(final short offset, final byte[] bytes) {
private static void copyInRowKey(final byte[] row, final short offset, final byte[] bytes) {
System.arraycopy(bytes, 0, row, offset, bytes.length);
}

/**
* Copies the specified integer at the specified offset in the row key.
* @param offset The offset in the row key to start writing at.
* @param n The value to copy.
*/
private void copyInRowKey(final short offset, final int n) {
row[offset + 0] = (byte) (n >>> 24);
row[offset + 1] = (byte) (n >>> 16);
row[offset + 2] = (byte) (n >>> 8);
row[offset + 3] = (byte) (n >>> 0);
}

/**
* Implements {@link #addPoint} by storing a value with a specific flag.
* @param timestamp The timestamp to associate with the value.
Expand Down Expand Up @@ -194,7 +199,7 @@ private Deferred<Object> addPointInternal(final long timestamp, final long value
// because the HBase client may still hold a reference to it in its
// internal datastructures.
row = Arrays.copyOf(row, row.length);
copyInRowKey(tsdb.metrics.width(), (int) base_time);
Bytes.setInt(row, (int) base_time, tsdb.metrics.width());
size = 0;
//LOG.info("Starting a new row @ " + this);
}
Expand All @@ -206,7 +211,7 @@ private Deferred<Object> addPointInternal(final long timestamp, final long value
// because the HBase client may still hold a reference to it in its
// internal datastructures.
row = Arrays.copyOf(row, row.length);
copyInRowKey(tsdb.metrics.width(), (int) base_time);
Bytes.setInt(row, (int) base_time, tsdb.metrics.width());
}

if (values.length == size) {
Expand Down
55 changes: 55 additions & 0 deletions src/core/TSDB.java
Expand Up @@ -12,15 +12,19 @@
// see <http://www.gnu.org/licenses/>.
package net.opentsdb.core;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import com.stumbleupon.async.Deferred;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.hbase.async.Bytes;
import org.hbase.async.HBaseClient;
import org.hbase.async.HBaseException;
import org.hbase.async.PutRequest;

import net.opentsdb.uid.UniqueId;
import net.opentsdb.stats.Histogram;
Expand Down Expand Up @@ -154,6 +158,57 @@ public WritableDataPoints newDataPoints() {
return new IncomingDataPoints(this);
}

public Deferred<Object> addPoint(final String metric,
final long timestamp,
final long value,
final Map<String, String> tags) {
final short flags = 0x7; // An int stored on 8 bytes.
return addPointInternal(metric, timestamp, Bytes.fromLong(value),
tags, flags);
}

public Deferred<Object> addPoint(final String metric,
final long timestamp,
final float value,
final Map<String, String> tags) {
if (Float.isNaN(value) || Float.isInfinite(value)) {
throw new IllegalArgumentException("value is NaN or Infinite: " + value
+ " for metric=" + metric
+ " timestamp=" + timestamp);
}
final short flags = Const.FLAG_FLOAT | 0x3; // A float stored on 4 bytes.
return addPointInternal(metric, timestamp,
// Note: this is actually on 8 bytes :(
Bytes.fromLong(Float.floatToRawIntBits(value)),
tags, flags);
}

private Deferred<Object> addPointInternal(final String metric,
final long timestamp,
final byte[] value,
final Map<String, String> tags,
final short flags) {
if ((timestamp & 0xFFFFFFFF00000000L) != 0) {
// => timestamp < 0 || timestamp > Integer.MAX_VALUE
throw new IllegalArgumentException((timestamp < 0 ? "negative " : "bad")
+ " timestamp=" + timestamp
+ " when trying to add value=" + Arrays.toString(value) + '/' + flags
+ " to metric=" + metric + ", tags=" + tags);
}

IncomingDataPoints.checkMetricAndTags(metric, tags);
final byte[] row = IncomingDataPoints.rowKeyTemplate(this, metric, tags);
final long base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN));
Bytes.setInt(row, (int) base_time, metrics.width());
final short qualifier = (short) ((timestamp - base_time) << Const.FLAG_BITS
| flags);
final PutRequest point = new PutRequest(table, row, FAMILY,
Bytes.fromShort(qualifier), value);
// TODO(tsuna): Add a callback to time the latency of HBase and store the
// timing in a moving Histogram (once we have a class for this).
return client.put(point);
}

public Deferred<Object> flush() throws HBaseException {
return client.flush();
}
Expand Down
56 changes: 56 additions & 0 deletions src/core/TSDBInterface.java
Expand Up @@ -12,6 +12,8 @@
// see <http://www.gnu.org/licenses/>.
package net.opentsdb.core;

import java.util.Map;

import com.stumbleupon.async.Deferred;

import org.hbase.async.HBaseException;
Expand All @@ -26,8 +28,62 @@ public interface TSDBInterface {
*/
Query newQuery();

/**
* Adds a single integer value data point in the TSDB.
* @param metric A non-empty string.
* @param timestamp The timestamp associated with the value.
* @param value The value of the data point.
* @param tags The tags on this series. This map must be non-empty.
* @return A deferred object that indicates the completion of the request.
* The {@link Object} has not special meaning and can be {@code null} (think
* of it as {@code Deferred<Void>}). But you probably want to attach at
* least an errback to this {@code Deferred} to handle failures.
* @throws IllegalArgumentException if the timestamp is less than or equal
* to the previous timestamp added or 0 for the first timestamp, or if the
* difference with the previous timestamp is too large.
* @throws IllegalArgumentException if the metric name is empty or contains
* illegal characters.
* @throws IllegalArgumentException if the tags list is empty or one of the
* elements contains illegal characters.
* @throws HBaseException (deferred) if there was a problem while persisting
* data.
*/
Deferred<Object> addPoint(String metric,
long timestamp,
long value,
Map<String, String> tags);

/**
* Adds a single floating-point value data point in the TSDB.
* @param metric A non-empty string.
* @param timestamp The timestamp associated with the value.
* @param value The value of the data point.
* @param tags The tags on this series. This map must be non-empty.
* @return A deferred object that indicates the completion of the request.
* The {@link Object} has not special meaning and can be {@code null} (think
* of it as {@code Deferred<Void>}). But you probably want to attach at
* least an errback to this {@code Deferred} to handle failures.
* @throws IllegalArgumentException if the timestamp is less than or equal
* to the previous timestamp added or 0 for the first timestamp, or if the
* difference with the previous timestamp is too large.
* @throws IllegalArgumentException if the metric name is empty or contains
* illegal characters.
* @throws IllegalArgumentException if the value is NaN or infinite.
* @throws IllegalArgumentException if the tags list is empty or one of the
* elements contains illegal characters.
* @throws HBaseException (deferred) if there was a problem while persisting
* data.
*/
Deferred<Object> addPoint(String metric,
long timestamp,
float value,
Map<String, String> tags);

/**
* Returns a new {@link WritableDataPoints} instance suitable for this TSDB.
* <p>
* If you want to add a single data-point, consider using {@link #addPoint}
* instead.
*/
WritableDataPoints newDataPoints();

Expand Down
5 changes: 1 addition & 4 deletions src/core/WritableDataPoints.java
Expand Up @@ -32,10 +32,7 @@ public interface WritableDataPoints extends DataPoints {
* adding data points to another time series without having to create a new
* instance.
* @param metric A non-empty string.
* @param tags The tags on this series. This non-empty list must have an
* even number of elements. The elements in the list must be alternating
* between a tag name and a tag value, starting with a tag name. Example:
* <code>{ "host", "web42", "iface", "eth2" }</code>
* @param tags The tags on this series. This map must be non-empty.
* @throws IllegalArgumentException if the metric name is empty or contains
* illegal characters.
* @throws IllegalArgumentException if the tags list is empty or one of the
Expand Down
43 changes: 2 additions & 41 deletions src/tsd/PutDataPointRpc.java
Expand Up @@ -36,44 +36,6 @@ final class PutDataPointRpc implements TelnetRpc {
private static final AtomicLong illegal_arguments = new AtomicLong();
private static final AtomicLong unknown_metrics = new AtomicLong();

/**
* Dirty rows for time series that are being written to.
*
* The key in the map is a string that uniquely identifies a time series.
* Right now we use the time series name concatenated with the stringified
* version of the map that stores all the tags, e.g. "foo{bar=a,quux=42}".
*/
private ConcurrentHashMap<String, WritableDataPoints> dirty_rows
= new ConcurrentHashMap<String, WritableDataPoints>();

/**
* Returns the dirty row for the given time series or creates a new one.
* @param tsdb The TSDB in which the data point should be created.
* @param metric The metric of the time series.
* @param tags The tags of the time series.
* @return the dirty row in which data points for the given time series
* should be appended.
*/
private WritableDataPoints getDirtyRow(final TSDB tsdb,
final String metric,
final HashMap<String, String> tags) {
final String key = metric + tags;
WritableDataPoints row = dirty_rows.get(key);
if (row == null) { // Try to create a new row.
// TODO(tsuna): Properly evict old rows to save memory.
if (dirty_rows.size() >= 20000) {
dirty_rows.clear(); // free some RAM.
}
final WritableDataPoints new_row = tsdb.newDataPoints();
new_row.setSeries(metric, tags);
row = dirty_rows.putIfAbsent(key, new_row);
if (row == null) { // We've just inserted a new row.
return new_row; // So use that.
}
}
return row;
}

public Deferred<Object> execute(final TSDB tsdb, final Channel chan,
final String[] cmd) {
requests.incrementAndGet();
Expand Down Expand Up @@ -155,11 +117,10 @@ private Deferred<Object> importDataPoint(final TSDB tsdb, final String[] words)
Tags.parse(tags, words[i]);
}
}
final WritableDataPoints dp = getDirtyRow(tsdb, metric, tags);
if (value.indexOf('.') < 0) { // integer value
return dp.addPoint(timestamp, Tags.parseLong(value));
return tsdb.addPoint(metric, timestamp, Tags.parseLong(value), tags);
} else { // floating point value
return dp.addPoint(timestamp, Float.parseFloat(value));
return tsdb.addPoint(metric, timestamp, Float.parseFloat(value), tags);
}
}
}

0 comments on commit 9690843

Please sign in to comment.