From 969084367e99a6df534d54e7baf62a7aecd9ed21 Mon Sep 17 00:00:00 2001 From: Benoit Sigoure Date: Mon, 6 Jun 2011 06:29:17 -0700 Subject: [PATCH] Simplify the write path. The idea behind IncomingDataPoints never really took off. This class was originally supposed to allow the TSD to maintain an in-memory cache of "recent" data that had been written to HBase. This in-memory cache was never implemented and cannot actually be easily implemented because we can no longer guarantee that a TSD sees all the data points going to a particular row. The idea was that this in-memory cache could be used to serve queries on recent data and to ride over HBase outages (i.e. serve recent data even if HBase was down). I don't see TSD doing this any time soon, so this change starts to bypass IncomingDataPoints, which does some complicated things that aren't actually necessary. Change-Id: Ifef6227e6f1d2766b14fe5b5701ece62ea5973c4 --- src/core/IncomingDataPoints.java | 49 +++++++++++++++------------- src/core/TSDB.java | 55 +++++++++++++++++++++++++++++++ src/core/TSDBInterface.java | 56 ++++++++++++++++++++++++++++++++ src/core/WritableDataPoints.java | 5 +-- src/tsd/PutDataPointRpc.java | 43 ++---------------------- 5 files changed, 141 insertions(+), 67 deletions(-) diff --git a/src/core/IncomingDataPoints.java b/src/core/IncomingDataPoints.java index dd36a8b6eb..3efe7263cc 100644 --- a/src/core/IncomingDataPoints.java +++ b/src/core/IncomingDataPoints.java @@ -88,7 +88,11 @@ final class IncomingDataPoints implements WritableDataPoints { this.values = new long[3]; } - public void setSeries(final String metric, final Map tags) { + /** + * Validates the given metric and tags. + * @throws IllegalArgumentException if any of the arguments aren't valid. + */ + static void checkMetricAndTags(final String metric, final Map tags) { if (tags.size() <= 0) { throw new IllegalArgumentException("Need at least one tags (metric=" + metric + ", tags=" + tags + ')'); @@ -102,7 +106,15 @@ public void setSeries(final String metric, final Map tags) { Tags.validateString("tag name", tag.getKey()); Tags.validateString("tag name", tag.getValue()); } + } + /** + * Returns a partially initialized row key for this metric and these tags. + * The only thing left to fill in is the base timestamp. + */ + static byte[] rowKeyTemplate(final TSDB tsdb, + final String metric, + final Map tags) { final short metric_width = tsdb.metrics.width(); final short tag_name_width = tsdb.tag_names.width(); final short tag_value_width = tsdb.tag_values.width(); @@ -111,46 +123,39 @@ public void setSeries(final String metric, final Map tags) { int row_size = (metric_width + Const.TIMESTAMP_BYTES + tag_name_width * num_tags + tag_value_width * num_tags); - if (row == null || row.length != row_size) { - row = new byte[row_size]; - } - size = 0; + final byte[] row = new byte[row_size]; short pos = 0; - copyInRowKey(pos, (AUTO_METRIC ? tsdb.metrics.getOrCreateId(metric) + copyInRowKey(row, pos, (AUTO_METRIC ? tsdb.metrics.getOrCreateId(metric) : tsdb.metrics.getId(metric))); pos += metric_width; pos += Const.TIMESTAMP_BYTES; for(final byte[] tag : Tags.resolveOrCreateAll(tsdb, tags)) { - copyInRowKey(pos, tag); + copyInRowKey(row, pos, tag); pos += tag.length; } + return row; + } + + public void setSeries(final String metric, final Map tags) { + checkMetricAndTags(metric, tags); + row = rowKeyTemplate(tsdb, metric, tags); + size = 0; } /** * Copies the specified byte array at the specified offset in the row key. + * @param row The row key into which to copy the bytes. * @param offset The offset in the row key to start writing at. * @param bytes The bytes to copy. */ - private void copyInRowKey(final short offset, final byte[] bytes) { + private static void copyInRowKey(final byte[] row, final short offset, final byte[] bytes) { System.arraycopy(bytes, 0, row, offset, bytes.length); } - /** - * Copies the specified integer at the specified offset in the row key. - * @param offset The offset in the row key to start writing at. - * @param n The value to copy. - */ - private void copyInRowKey(final short offset, final int n) { - row[offset + 0] = (byte) (n >>> 24); - row[offset + 1] = (byte) (n >>> 16); - row[offset + 2] = (byte) (n >>> 8); - row[offset + 3] = (byte) (n >>> 0); - } - /** * Implements {@link #addPoint} by storing a value with a specific flag. * @param timestamp The timestamp to associate with the value. @@ -194,7 +199,7 @@ private Deferred addPointInternal(final long timestamp, final long value // because the HBase client may still hold a reference to it in its // internal datastructures. row = Arrays.copyOf(row, row.length); - copyInRowKey(tsdb.metrics.width(), (int) base_time); + Bytes.setInt(row, (int) base_time, tsdb.metrics.width()); size = 0; //LOG.info("Starting a new row @ " + this); } @@ -206,7 +211,7 @@ private Deferred addPointInternal(final long timestamp, final long value // because the HBase client may still hold a reference to it in its // internal datastructures. row = Arrays.copyOf(row, row.length); - copyInRowKey(tsdb.metrics.width(), (int) base_time); + Bytes.setInt(row, (int) base_time, tsdb.metrics.width()); } if (values.length == size) { diff --git a/src/core/TSDB.java b/src/core/TSDB.java index 0d8bf1b1cd..55b66f22e5 100644 --- a/src/core/TSDB.java +++ b/src/core/TSDB.java @@ -12,15 +12,19 @@ // see . package net.opentsdb.core; +import java.util.Arrays; import java.util.List; +import java.util.Map; import com.stumbleupon.async.Deferred; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.hbase.async.Bytes; import org.hbase.async.HBaseClient; import org.hbase.async.HBaseException; +import org.hbase.async.PutRequest; import net.opentsdb.uid.UniqueId; import net.opentsdb.stats.Histogram; @@ -154,6 +158,57 @@ public WritableDataPoints newDataPoints() { return new IncomingDataPoints(this); } + public Deferred addPoint(final String metric, + final long timestamp, + final long value, + final Map tags) { + final short flags = 0x7; // An int stored on 8 bytes. + return addPointInternal(metric, timestamp, Bytes.fromLong(value), + tags, flags); + } + + public Deferred addPoint(final String metric, + final long timestamp, + final float value, + final Map tags) { + if (Float.isNaN(value) || Float.isInfinite(value)) { + throw new IllegalArgumentException("value is NaN or Infinite: " + value + + " for metric=" + metric + + " timestamp=" + timestamp); + } + final short flags = Const.FLAG_FLOAT | 0x3; // A float stored on 4 bytes. + return addPointInternal(metric, timestamp, + // Note: this is actually on 8 bytes :( + Bytes.fromLong(Float.floatToRawIntBits(value)), + tags, flags); + } + + private Deferred addPointInternal(final String metric, + final long timestamp, + final byte[] value, + final Map tags, + final short flags) { + if ((timestamp & 0xFFFFFFFF00000000L) != 0) { + // => timestamp < 0 || timestamp > Integer.MAX_VALUE + throw new IllegalArgumentException((timestamp < 0 ? "negative " : "bad") + + " timestamp=" + timestamp + + " when trying to add value=" + Arrays.toString(value) + '/' + flags + + " to metric=" + metric + ", tags=" + tags); + } + + IncomingDataPoints.checkMetricAndTags(metric, tags); + final byte[] row = IncomingDataPoints.rowKeyTemplate(this, metric, tags); + final long base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN)); + Bytes.setInt(row, (int) base_time, metrics.width()); + final short qualifier = (short) ((timestamp - base_time) << Const.FLAG_BITS + | flags); + final PutRequest point = new PutRequest(table, row, FAMILY, + Bytes.fromShort(qualifier), value); + // TODO(tsuna): Add a callback to time the latency of HBase and store the + // timing in a moving Histogram (once we have a class for this). + return client.put(point); + } + public Deferred flush() throws HBaseException { return client.flush(); } diff --git a/src/core/TSDBInterface.java b/src/core/TSDBInterface.java index 4b371c7328..4ce720a443 100644 --- a/src/core/TSDBInterface.java +++ b/src/core/TSDBInterface.java @@ -12,6 +12,8 @@ // see . package net.opentsdb.core; +import java.util.Map; + import com.stumbleupon.async.Deferred; import org.hbase.async.HBaseException; @@ -26,8 +28,62 @@ public interface TSDBInterface { */ Query newQuery(); + /** + * Adds a single integer value data point in the TSDB. + * @param metric A non-empty string. + * @param timestamp The timestamp associated with the value. + * @param value The value of the data point. + * @param tags The tags on this series. This map must be non-empty. + * @return A deferred object that indicates the completion of the request. + * The {@link Object} has not special meaning and can be {@code null} (think + * of it as {@code Deferred}). But you probably want to attach at + * least an errback to this {@code Deferred} to handle failures. + * @throws IllegalArgumentException if the timestamp is less than or equal + * to the previous timestamp added or 0 for the first timestamp, or if the + * difference with the previous timestamp is too large. + * @throws IllegalArgumentException if the metric name is empty or contains + * illegal characters. + * @throws IllegalArgumentException if the tags list is empty or one of the + * elements contains illegal characters. + * @throws HBaseException (deferred) if there was a problem while persisting + * data. + */ + Deferred addPoint(String metric, + long timestamp, + long value, + Map tags); + + /** + * Adds a single floating-point value data point in the TSDB. + * @param metric A non-empty string. + * @param timestamp The timestamp associated with the value. + * @param value The value of the data point. + * @param tags The tags on this series. This map must be non-empty. + * @return A deferred object that indicates the completion of the request. + * The {@link Object} has not special meaning and can be {@code null} (think + * of it as {@code Deferred}). But you probably want to attach at + * least an errback to this {@code Deferred} to handle failures. + * @throws IllegalArgumentException if the timestamp is less than or equal + * to the previous timestamp added or 0 for the first timestamp, or if the + * difference with the previous timestamp is too large. + * @throws IllegalArgumentException if the metric name is empty or contains + * illegal characters. + * @throws IllegalArgumentException if the value is NaN or infinite. + * @throws IllegalArgumentException if the tags list is empty or one of the + * elements contains illegal characters. + * @throws HBaseException (deferred) if there was a problem while persisting + * data. + */ + Deferred addPoint(String metric, + long timestamp, + float value, + Map tags); + /** * Returns a new {@link WritableDataPoints} instance suitable for this TSDB. + *

+ * If you want to add a single data-point, consider using {@link #addPoint} + * instead. */ WritableDataPoints newDataPoints(); diff --git a/src/core/WritableDataPoints.java b/src/core/WritableDataPoints.java index 1c19a25eb9..0a7344d5fe 100644 --- a/src/core/WritableDataPoints.java +++ b/src/core/WritableDataPoints.java @@ -32,10 +32,7 @@ public interface WritableDataPoints extends DataPoints { * adding data points to another time series without having to create a new * instance. * @param metric A non-empty string. - * @param tags The tags on this series. This non-empty list must have an - * even number of elements. The elements in the list must be alternating - * between a tag name and a tag value, starting with a tag name. Example: - * { "host", "web42", "iface", "eth2" } + * @param tags The tags on this series. This map must be non-empty. * @throws IllegalArgumentException if the metric name is empty or contains * illegal characters. * @throws IllegalArgumentException if the tags list is empty or one of the diff --git a/src/tsd/PutDataPointRpc.java b/src/tsd/PutDataPointRpc.java index 35134a1893..337237d285 100644 --- a/src/tsd/PutDataPointRpc.java +++ b/src/tsd/PutDataPointRpc.java @@ -36,44 +36,6 @@ final class PutDataPointRpc implements TelnetRpc { private static final AtomicLong illegal_arguments = new AtomicLong(); private static final AtomicLong unknown_metrics = new AtomicLong(); - /** - * Dirty rows for time series that are being written to. - * - * The key in the map is a string that uniquely identifies a time series. - * Right now we use the time series name concatenated with the stringified - * version of the map that stores all the tags, e.g. "foo{bar=a,quux=42}". - */ - private ConcurrentHashMap dirty_rows - = new ConcurrentHashMap(); - - /** - * Returns the dirty row for the given time series or creates a new one. - * @param tsdb The TSDB in which the data point should be created. - * @param metric The metric of the time series. - * @param tags The tags of the time series. - * @return the dirty row in which data points for the given time series - * should be appended. - */ - private WritableDataPoints getDirtyRow(final TSDB tsdb, - final String metric, - final HashMap tags) { - final String key = metric + tags; - WritableDataPoints row = dirty_rows.get(key); - if (row == null) { // Try to create a new row. - // TODO(tsuna): Properly evict old rows to save memory. - if (dirty_rows.size() >= 20000) { - dirty_rows.clear(); // free some RAM. - } - final WritableDataPoints new_row = tsdb.newDataPoints(); - new_row.setSeries(metric, tags); - row = dirty_rows.putIfAbsent(key, new_row); - if (row == null) { // We've just inserted a new row. - return new_row; // So use that. - } - } - return row; - } - public Deferred execute(final TSDB tsdb, final Channel chan, final String[] cmd) { requests.incrementAndGet(); @@ -155,11 +117,10 @@ private Deferred importDataPoint(final TSDB tsdb, final String[] words) Tags.parse(tags, words[i]); } } - final WritableDataPoints dp = getDirtyRow(tsdb, metric, tags); if (value.indexOf('.') < 0) { // integer value - return dp.addPoint(timestamp, Tags.parseLong(value)); + return tsdb.addPoint(metric, timestamp, Tags.parseLong(value), tags); } else { // floating point value - return dp.addPoint(timestamp, Float.parseFloat(value)); + return tsdb.addPoint(metric, timestamp, Float.parseFloat(value), tags); } } }