Skip to content

Commit

Permalink
Modify IncomingDataPoints.addPointInternal() to accept millisecond ti…
Browse files Browse the repository at this point in the history
…mestamps

so users can bulk import data. Also disabled the data point tracking code that
was meant for pre-compacting data during imports. That can be added later.
Modify IncomingDataPoints.addPoint() to support variable length integers
instead of forcing everything to 8 bytes.
Modify the Text Importer unit tests to validate the variable length and
millisecond timestamps.

Signed-off-by: Chris Larsen <clarsen@euphoriaaudio.com>
  • Loading branch information
manolama committed Dec 30, 2013
1 parent 5d84b29 commit 729d070
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 51 deletions.
86 changes: 47 additions & 39 deletions src/core/IncomingDataPoints.java
Expand Up @@ -64,6 +64,9 @@ final class IncomingDataPoints implements WritableDataPoints {

/** Each value in the row. */
private long[] values;

/** Track the last timestamp written for this series */
private long last_ts;

/** Number of data points in this row. */
private short size;
Expand All @@ -77,8 +80,11 @@ final class IncomingDataPoints implements WritableDataPoints {
*/
IncomingDataPoints(final TSDB tsdb) {
this.tsdb = tsdb;
this.qualifiers = new short[3];
this.values = new long[3];
// the qualifiers and values were meant for pre-compacting the rows. We
// could implement this later, but for now we don't need to track the values
// as they'll just consume space during an import
//this.qualifiers = new short[3];
//this.values = new long[3];
}

/**
Expand Down Expand Up @@ -240,56 +246,48 @@ private long updateBaseTime(final long timestamp) {
*/
private Deferred<Object> addPointInternal(final long timestamp, final byte[] value,
final short flags) {
// This particular code path only expects integers on 8 bytes or floating
// point values on 4 bytes.
assert value.length == 8 || value.length == 4 : Bytes.pretty(value);
if (row == null) {
throw new IllegalStateException("setSeries() never called!");
}
if ((timestamp & 0xFFFFFFFF00000000L) != 0) {
// => timestamp < 0 || timestamp > Integer.MAX_VALUE
final boolean ms_timestamp = (timestamp & Const.SECOND_MASK) != 0;

// we only accept unix epoch timestamps in seconds or milliseconds
if (ms_timestamp &&
(timestamp < 1000000000000L || timestamp > 9999999999999L)) {
throw new IllegalArgumentException((timestamp < 0 ? "negative " : "bad")
+ " timestamp=" + timestamp
+ " when trying to add value=" + Arrays.toString(value) + " to " + this);
}

long base_time;
if (size > 0) {
base_time = baseTime();
final long last_ts = base_time + (delta(qualifiers[size - 1]));
if (timestamp <= last_ts) {
throw new IllegalArgumentException("New timestamp=" + timestamp
+ " is less than or equal to previous=" + last_ts
+ " when trying to add value=" + Arrays.toString(value)
+ " to " + this);
} else if (timestamp - base_time >= Const.MAX_TIMESPAN) {
// Need to start a new row as we've exceeded Const.MAX_TIMESPAN.
base_time = updateBaseTime(timestamp);
size = 0;
//LOG.info("Starting a new row @ " + this);
}
// always maintain last_ts in milliseconds
if ((ms_timestamp ? timestamp : timestamp * 1000) <= last_ts) {
throw new IllegalArgumentException("New timestamp=" + timestamp
+ " is less than or equal to previous=" + last_ts
+ " when trying to add value=" + Arrays.toString(value)
+ " to " + this);
}
last_ts = (ms_timestamp ? timestamp : timestamp * 1000);

long base_time = baseTime();
long incoming_base_time;
if (ms_timestamp) {
// drop the ms timestamp to seconds to calculate the base timestamp
incoming_base_time = ((timestamp / 1000) -
((timestamp / 1000) % Const.MAX_TIMESPAN));
} else {
// This is the first data point, let's record the starting timestamp.
base_time = updateBaseTime(timestamp);
Bytes.setInt(row, (int) base_time, tsdb.metrics.width());
incoming_base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN));
}

if (values.length == size) {
grow();

if (incoming_base_time - base_time >= Const.MAX_TIMESPAN) {
// Need to start a new row as we've exceeded Const.MAX_TIMESPAN.
base_time = updateBaseTime((ms_timestamp ? timestamp / 1000: timestamp));
}

// Java is so stupid with its auto-promotion of int to float.
final short qualifier = (short) ((timestamp - base_time) << Const.FLAG_BITS
| flags);
qualifiers[size] = qualifier;
values[size] = (value.length == 8
? Bytes.getLong(value)
: Bytes.getInt(value) & 0x00000000FFFFFFFFL);
size++;
final byte[] qualifier = Internal.buildQualifier(timestamp, flags);

final PutRequest point = new PutRequest(tsdb.table, row, TSDB.FAMILY,
Bytes.fromShort(qualifier),
value);
qualifier, value);
// TODO(tsuna): The following timing is rather useless. First of all,
// the histogram never resets, so it tends to converge to a certain
// distribution and never changes. What we really want is a moving
Expand Down Expand Up @@ -330,8 +328,18 @@ private long baseTime() {
}

public Deferred<Object> addPoint(final long timestamp, final long value) {
final short flags = 0x7; // An int stored on 8 bytes.
return addPointInternal(timestamp, Bytes.fromLong(value), flags);
final byte[] v;
if (Byte.MIN_VALUE <= value && value <= Byte.MAX_VALUE) {
v = new byte[] { (byte) value };
} else if (Short.MIN_VALUE <= value && value <= Short.MAX_VALUE) {
v = Bytes.fromShort((short) value);
} else if (Integer.MIN_VALUE <= value && value <= Integer.MAX_VALUE) {
v = Bytes.fromInt((int) value);
} else {
v = Bytes.fromLong(value);
}
final short flags = (short) (v.length - 1); // Just the length.
return addPointInternal(timestamp, v, flags);
}

public Deferred<Object> addPoint(final long timestamp, final float value) {
Expand Down
179 changes: 167 additions & 12 deletions test/tools/TestTextImporter.java
Expand Up @@ -155,31 +155,156 @@ public void before() throws Exception {
}

@Test
public void importFileGoodIntegers() throws Exception {
public void importFileGoodIntegers1Byte() throws Exception {
String data =
"sys.cpu.user 1356998400 24 host=web01\n" +
"sys.cpu.user 1356998400 42 host=web02";
"sys.cpu.user 1356998400 0 host=web01\n" +
"sys.cpu.user 1356998400 127 host=web02";
setData(data);
Integer points = (Integer)importFile.invoke(null, client, tsdb, "file");
assertEquals(2, (int)points);

byte[] row = new byte[] { 0, 0, 1, 0x50, (byte) 0xE2, 0x27, 0,
0, 0, 1, 0, 0, 1};
byte[] value = storage.getColumn(row, new byte[] { 0, 0 });
assertNotNull(value);
assertEquals(0, value[0]);
row = new byte[] { 0, 0, 1, 0x50, (byte) 0xE2, 0x27, 0,
0, 0, 1, 0, 0, 2};
value = storage.getColumn(row, new byte[] { 0, 0 });
assertNotNull(value);
assertEquals(127, value[0]);
}

@Test
public void importFileGoodIntegers1ByteNegative() throws Exception {
String data =
"sys.cpu.user 1356998400 -0 host=web01\n" +
"sys.cpu.user 1356998400 -128 host=web02";
setData(data);
Integer points = (Integer)importFile.invoke(null, client, tsdb, "file");
assertEquals(2, (int)points);

byte[] row = new byte[] { 0, 0, 1, 0x50, (byte) 0xE2, 0x27, 0,
0, 0, 1, 0, 0, 1};
byte[] value = storage.getColumn(row, new byte[] { 0, 0 });
assertNotNull(value);
assertEquals(0, value[0]);
row = new byte[] { 0, 0, 1, 0x50, (byte) 0xE2, 0x27, 0,
0, 0, 1, 0, 0, 2};
value = storage.getColumn(row, new byte[] { 0, 0 });
assertNotNull(value);
assertEquals(-128, value[0]);
}

@Test
public void importFileGoodIntegers2Byte() throws Exception {
String data =
"sys.cpu.user 1356998400 128 host=web01\n" +
"sys.cpu.user 1356998400 32767 host=web02";
setData(data);
Integer points = (Integer)importFile.invoke(null, client, tsdb, "file");
assertEquals(2, (int)points);

byte[] row = new byte[] { 0, 0, 1, 0x50, (byte) 0xE2, 0x27, 0,
0, 0, 1, 0, 0, 1};
byte[] value = storage.getColumn(row, new byte[] { 0, 1 });
assertNotNull(value);
assertEquals(128, Bytes.getShort(value));
row = new byte[] { 0, 0, 1, 0x50, (byte) 0xE2, 0x27, 0,
0, 0, 1, 0, 0, 2};
value = storage.getColumn(row, new byte[] { 0, 1 });
assertNotNull(value);
assertEquals(32767, Bytes.getShort(value));
}

@Test
public void importFileGoodIntegers2ByteNegative() throws Exception {
String data =
"sys.cpu.user 1356998400 -129 host=web01\n" +
"sys.cpu.user 1356998400 -32768 host=web02";
setData(data);
Integer points = (Integer)importFile.invoke(null, client, tsdb, "file");
assertEquals(2, (int)points);

byte[] row = new byte[] { 0, 0, 1, 0x50, (byte) 0xE2, 0x27, 0,
0, 0, 1, 0, 0, 1};
byte[] value = storage.getColumn(row, new byte[] { 0, 1 });
assertNotNull(value);
assertEquals(-129, Bytes.getShort(value));
row = new byte[] { 0, 0, 1, 0x50, (byte) 0xE2, 0x27, 0,
0, 0, 1, 0, 0, 2};
value = storage.getColumn(row, new byte[] { 0, 1 });
assertNotNull(value);
assertEquals(-32768, Bytes.getShort(value));
}

@Test
public void importFileGoodIntegers4Byte() throws Exception {
String data =
"sys.cpu.user 1356998400 32768 host=web01\n" +
"sys.cpu.user 1356998400 2147483647 host=web02";
setData(data);
Integer points = (Integer)importFile.invoke(null, client, tsdb, "file");
assertEquals(2, (int)points);

byte[] row = new byte[] { 0, 0, 1, 0x50, (byte) 0xE2, 0x27, 0,
0, 0, 1, 0, 0, 1};
byte[] value = storage.getColumn(row, new byte[] { 0, 3 });
assertNotNull(value);
assertEquals(32768, Bytes.getInt(value));
row = new byte[] { 0, 0, 1, 0x50, (byte) 0xE2, 0x27, 0,
0, 0, 1, 0, 0, 2};
value = storage.getColumn(row, new byte[] { 0, 3 });
assertNotNull(value);
assertEquals(2147483647, Bytes.getInt(value));
}

@Test
public void importFileGoodIntegers4ByteNegative() throws Exception {
String data =
"sys.cpu.user 1356998400 -32769 host=web01\n" +
"sys.cpu.user 1356998400 -2147483648 host=web02";
setData(data);
Integer points = (Integer)importFile.invoke(null, client, tsdb, "file");
assertEquals(2, (int)points);

byte[] row = new byte[] { 0, 0, 1, 0x50, (byte) 0xE2, 0x27, 0,
0, 0, 1, 0, 0, 1};
byte[] value = storage.getColumn(row, new byte[] { 0, 3 });
assertNotNull(value);
assertEquals(-32769, Bytes.getInt(value));
row = new byte[] { 0, 0, 1, 0x50, (byte) 0xE2, 0x27, 0,
0, 0, 1, 0, 0, 2};
value = storage.getColumn(row, new byte[] { 0, 3 });
assertNotNull(value);
assertEquals(-2147483648, Bytes.getInt(value));
}

@Test
public void importFileGoodIntegers8Byte() throws Exception {
String data =
"sys.cpu.user 1356998400 2147483648 host=web01\n" +
"sys.cpu.user 1356998400 9223372036854775807 host=web02";
setData(data);
Integer points = (Integer)importFile.invoke(null, client, tsdb, "file");
assertEquals(2, (int)points);
byte[] row = new byte[] { 0, 0, 1, 0x50, (byte) 0xE2, 0x27, 0,
0, 0, 1, 0, 0, 1};
byte[] value = storage.getColumn(row, new byte[] { 0, 7 });
assertNotNull(value);
assertEquals(24, value[7]);
assertEquals(2147483648L, Bytes.getLong(value));
row = new byte[] { 0, 0, 1, 0x50, (byte) 0xE2, 0x27, 0,
0, 0, 1, 0, 0, 2};
value = storage.getColumn(row, new byte[] { 0, 7 });
assertNotNull(value);
assertEquals(42, value[7]);
assertEquals(9223372036854775807L, Bytes.getLong(value));
}

@Test
public void importFileGoodIntegersNegative() throws Exception {
public void importFileGoodIntegers8ByteNegative() throws Exception {
String data =
"sys.cpu.user 1356998400 -24 host=web01\n" +
"sys.cpu.user 1356998400 -42 host=web02";
"sys.cpu.user 1356998400 -2147483649 host=web01\n" +
"sys.cpu.user 1356998400 -9223372036854775808 host=web02";
setData(data);
Integer points = (Integer)importFile.invoke(null, client, tsdb, "file");
assertEquals(2, (int)points);
Expand All @@ -188,12 +313,42 @@ public void importFileGoodIntegersNegative() throws Exception {
0, 0, 1, 0, 0, 1};
byte[] value = storage.getColumn(row, new byte[] { 0, 7 });
assertNotNull(value);
assertEquals(-24, value[7]);
assertEquals(-2147483649L, Bytes.getLong(value));
row = new byte[] { 0, 0, 1, 0x50, (byte) 0xE2, 0x27, 0,
0, 0, 1, 0, 0, 2};
value = storage.getColumn(row, new byte[] { 0, 7 });
assertNotNull(value);
assertEquals(-42, value[7]);
assertEquals(-9223372036854775808L, Bytes.getLong(value));
}

@Test
public void importFileMSTimestamp() throws Exception {
String data =
"sys.cpu.user 1356998400500 24 host=web01\n" +
"sys.cpu.user 1356998400500 42 host=web02";
setData(data);
Integer points = (Integer)importFile.invoke(null, client, tsdb, "file");
assertEquals(2, (int)points);

byte[] row = new byte[] { 0, 0, 1, 0x50, (byte) 0xE2, 0x27, 0,
0, 0, 1, 0, 0, 1};
byte[] value = storage.getColumn(row, new byte[] { (byte) 0xF0, 0, 0x7D, 0 });
assertNotNull(value);
assertEquals(24, value[0]);
row = new byte[] { 0, 0, 1, 0x50, (byte) 0xE2, 0x27, 0,
0, 0, 1, 0, 0, 2};
value = storage.getColumn(row, new byte[] { (byte) 0xF0, 0, 0x7D, 0 });
assertNotNull(value);
assertEquals(42, value[0]);
}

@Test (expected = IllegalArgumentException.class)
public void importFileMSTimestampTooBig() throws Exception {
String data =
"sys.cpu.user 13569984005001 24 host=web01\n" +
"sys.cpu.user 13569984005001 42 host=web02";
setData(data);
importFile.invoke(null, client, tsdb, "file");
}

@Test
Expand Down Expand Up @@ -329,10 +484,10 @@ public void importFile0Timestamp() throws Exception {
}

@Test (expected = RuntimeException.class)
public void importFileMSTimestamp() throws Exception {
public void importFileNegativeTimestamp() throws Exception {
String data =
"sys.cpu.user 1356998400 24 host=web01\n" +
"sys.cpu.user 1356998400500 42 host=web02";
"sys.cpu.user -1356998400 42 host=web02";
setData(data);
importFile.invoke(null, client, tsdb, "file");
}
Expand Down

0 comments on commit 729d070

Please sign in to comment.