Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async delete of datapoints through HTTP #503

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/core/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ public interface Query {
*/
long getEndTime();

/**
* Sets whether or not the data queried will be deleted.
* @param delete True if data should be deleted, false otherwise.
*/
void setDelete(boolean delete);

/**
* Returns whether or not the data queried will be deleted.
* @return A boolean
*/
boolean getDelete();

/**
* Sets the time series to the query.
* @param metric The metric to retrieve from the TSDB.
Expand Down
66 changes: 47 additions & 19 deletions src/core/SaltScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import net.opentsdb.meta.Annotation;

import org.hbase.async.Bytes.ByteMap;
import org.hbase.async.DeleteRequest;
import org.hbase.async.KeyValue;
import org.hbase.async.Scanner;
import org.slf4j.Logger;
Expand Down Expand Up @@ -85,11 +86,14 @@ public class SaltScanner {
* are done.*/
private long start_time; // milliseconds.

/** Whether or not to delete the queried data */
private boolean delete;

/** A holder for storing the first exception thrown by a scanner if something
* goes pear shaped. Make sure to synchronize on this object when checking
* for null or assigning from a scanner's callback. */
private volatile Exception exception;

/**
* Default ctor that performs some validation. Call {@link scan} after
* construction to actually start fetching data.
Expand All @@ -103,6 +107,24 @@ public class SaltScanner {
public SaltScanner(final TSDB tsdb, final byte[] metric,
final List<Scanner> scanners,
final TreeMap<byte[], Span> spans) {
this(tsdb, metric, scanners, spans, false);
}

/**
* Default ctor that performs some validation. Call {@link scan} after
* construction to actually start fetching data.
* @param tsdb The TSDB to which we belong
* @param metric The metric we're expecting to fetch
* @param scanners A list of HBase scanners, one for each bucket
* @param spans The span map to store results in
* @param delete Whether or not to delete the queried data
* @throws IllegalArgumentException if any required data was missing or
* we had invalid parameters.
*/
public SaltScanner(final TSDB tsdb, final byte[] metric,
final List<Scanner> scanners,
final TreeMap<byte[], Span> spans,
boolean delete) {
if (Const.SALT_WIDTH() < 1) {
throw new IllegalArgumentException(
"Salting is disabled. Use the regular scanner");
Expand Down Expand Up @@ -137,6 +159,7 @@ public SaltScanner(final TSDB tsdb, final byte[] metric,
this.spans = spans;
this.metric = metric;
this.tsdb = tsdb;
this.delete = delete;
}

/**
Expand Down Expand Up @@ -301,25 +324,30 @@ public Object call(final ArrayList<ArrayList<KeyValue>> rows)
return null;
}

List<Annotation> notes = annotations.get(key);
if (notes == null) {
notes = new ArrayList<Annotation>();
annotations.put(key, notes);
}
if (delete) {
final DeleteRequest del = new DeleteRequest(tsdb.dataTable(), key);
tsdb.getClient().delete(del);
} else {
List<Annotation> notes = annotations.get(key);
if (notes == null) {
notes = new ArrayList<Annotation>();
annotations.put(key, notes);
}

final KeyValue compacted;
try{
compacted = tsdb.compact(row, notes);
} catch (final IllegalDataException idex) {
LOG.error("Caught IllegalDataException exception while parsing the "
+ "row " + key + ", skipping it on scanner " + this, idex);
scanner.close();
handleException(idex);
return null;
}

if (compacted != null) { // Can be null if we ignored all KVs.
kvs.add(compacted);
final KeyValue compacted;
try{
compacted = tsdb.compact(row, notes);
} catch (final IllegalDataException idex) {
LOG.error("Caught IllegalDataException exception while parsing the "
+ "row " + key + ", skipping it on scanner " + this, idex);
scanner.close();
handleException(idex);
return null;
}

if (compacted != null) { // Can be null if we ignored all KVs.
kvs.add(compacted);
}
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/core/TSQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public final class TSQuery {

/** Whether or not to include stats summary in the output */
private boolean show_summary;

/** Whether or not to delete the queried data */
private boolean delete = false;

/** The query status for tracking over all performance of this query */
private QueryStats query_stats;
Expand Down Expand Up @@ -349,6 +352,11 @@ public boolean getShowStats() {
public boolean getShowSummary() {
return this.show_summary;
}

/** @return Whether or not to delete the queried data */
public boolean getDelete() {
return this.delete;
}

/** @return the query stats object. Ignored during JSON serialization */
@JsonIgnore
Expand Down Expand Up @@ -430,6 +438,11 @@ public void setShowSummary(boolean show_summary) {
this.show_summary = show_summary;
}

/** @param delete whether or not to delete the queried data */
public void setDelete(boolean delete) {
this.delete = delete;
}

/** @param query_stats the query stats object to associate with this query */
public void setQueryStats(final QueryStats query_stats) {
this.query_stats = query_stats;
Expand Down
50 changes: 39 additions & 11 deletions src/core/TsdbQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.hbase.async.Bytes;
import org.hbase.async.DeleteRequest;
import org.hbase.async.HBaseException;
import org.hbase.async.KeyValue;
import org.hbase.async.Scanner;
Expand Down Expand Up @@ -75,6 +76,9 @@ final class TsdbQuery implements Query {
/** End time (UNIX timestamp in seconds) on 32 bits ("unsigned" int). */
private long end_time = UNSET;

/** Whether or not to delete the queried data */
private boolean delete;

/** ID of the metric being looked up. */
private byte[] metric;

Expand Down Expand Up @@ -191,6 +195,24 @@ public long getEndTime() {
return end_time;
}

/**
* Sets whether or not the data queried will be deleted.
* @param delete True if data should be deleted, false otherwise.
*/
@Override
public void setDelete(boolean delete) {
this.delete = delete;
}

/**
* Returns whether or not the data queried will be deleted.
* @return A boolean
*/
@Override
public boolean getDelete() {
return delete;
}

@Override
public void setTimeSeries(final String metric,
final Map<String, String> tags,
Expand Down Expand Up @@ -274,6 +296,7 @@ public Deferred<Object> configureFromQuery(final TSQuery query,
final TSSubQuery sub_query = query.getQueries().get(index);
setStartTime(query.startTime());
setEndTime(query.endTime());
setDelete(query.getDelete());
query_index = index;

// set common options
Expand Down Expand Up @@ -663,17 +686,22 @@ public Object call(final ArrayList<ArrayList<KeyValue>> rows)
+ " our scanner (" + scanner + ")! " + row + " does not start"
+ " with " + Arrays.toString(metric));
}
Span datapoints = spans.get(key);
if (datapoints == null) {
datapoints = new Span(tsdb);
spans.put(key, datapoints);
}
final KeyValue compacted =
tsdb.compact(row, datapoints.getAnnotations());
seenAnnotation |= !datapoints.getAnnotations().isEmpty();
if (compacted != null) { // Can be null if we ignored all KVs.
datapoints.addRow(compacted);
nrows++;
if (delete) {
final DeleteRequest del = new DeleteRequest(tsdb.dataTable(), key);
tsdb.getClient().delete(del);
} else {
Span datapoints = spans.get(key);
if (datapoints == null) {
datapoints = new Span(tsdb);
spans.put(key, datapoints);
}
final KeyValue compacted =
tsdb.compact(row, datapoints.getAnnotations());
seenAnnotation |= !datapoints.getAnnotations().isEmpty();
if (compacted != null) { // Can be null if we ignored all KVs.
datapoints.addRow(compacted);
nrows++;
}
}
}

Expand Down
24 changes: 21 additions & 3 deletions src/tsd/QueryRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

import org.hbase.async.Bytes.ByteMap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.hbase.async.DeleteRequest;
import org.hbase.async.HBaseClient;
import org.hbase.async.KeyValue;
import org.hbase.async.Scanner;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
Expand All @@ -33,6 +37,7 @@

import net.opentsdb.core.DataPoints;
import net.opentsdb.core.IncomingDataPoint;
import net.opentsdb.core.Internal;
import net.opentsdb.core.Query;
import net.opentsdb.core.QueryException;
import net.opentsdb.core.RateOptions;
Expand All @@ -46,6 +51,8 @@
import net.opentsdb.uid.NoSuchUniqueName;
import net.opentsdb.uid.UniqueId;
import net.opentsdb.utils.DateTime;
import net.opentsdb.uid.UniqueId.UniqueIdType;
import net.opentsdb.utils.JSON;

/**
* Handles queries for timeseries datapoints. Each request is parsed into a
Expand All @@ -70,12 +77,18 @@ final class QueryRpc implements HttpRpc {
public void execute(final TSDB tsdb, final HttpQuery query)
throws IOException {

// only accept GET/POST
if (query.method() != HttpMethod.GET && query.method() != HttpMethod.POST) {
// only accept GET/POST/DELETE
if (query.method() != HttpMethod.GET && query.method() != HttpMethod.POST &&
query.method() != HttpMethod.DELETE) {
throw new BadRequestException(HttpResponseStatus.METHOD_NOT_ALLOWED,
"Method not allowed", "The HTTP method [" + query.method().getName() +
"] is not permitted for this endpoint");
}
if (query.method() == HttpMethod.DELETE && !tsdb.getConfig().allow_delete()) {
throw new BadRequestException(HttpResponseStatus.BAD_REQUEST,
"Bad request",
"Deleting data is not enabled (tsd.http.query.allow_delete=false)");
}

final String[] uri = query.explodeAPIPath();
final String endpoint = uri.length > 1 ? uri[1] : "";
Expand Down Expand Up @@ -109,7 +122,7 @@ private void handleQuery(final TSDB tsdb, final HttpQuery query) {
} else {
data_query = this.parseQuery(tsdb, query);
}

// validate and then compile the queries
try {
LOG.debug(data_query.toString());
Expand Down Expand Up @@ -229,6 +242,11 @@ public Object call(final List<Annotation> annotations) throws Exception {
return data_query.buildQueriesAsync(tsdb).addCallback(new BuildCB());
}
}

if (query.method() == HttpMethod.DELETE &&
tsdb.getConfig().allow_delete()) {
data_query.setDelete(true);
}

// if we the caller wants to search for global annotations, fire that off
// first then scan for the notes, then pass everything off to the formatter
Expand Down
12 changes: 11 additions & 1 deletion src/utils/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ public class Config {

/** tsd.http.request.enable_chunked */
private boolean enable_chunked_requests = false;


/** tsd.http.query.allow_delete */
private boolean allow_delete = false;

/** tsd.storage.fix_duplicates */
private boolean fix_duplicates = false;

Expand Down Expand Up @@ -197,6 +200,11 @@ public boolean enable_tsuid_incrementing() {
public boolean enable_tsuid_tracking() {
return enable_tsuid_tracking;
}

/** @return whether or not deleting data is allowed */
public boolean allow_delete() {
return allow_delete;
}

/** @return whether or not chunked requests are supported */
public boolean enable_chunked_requests() {
Expand Down Expand Up @@ -487,6 +495,7 @@ protected void setDefaults() {
default_map.put("tsd.storage.compaction.max_concurrent_flushes", "10000");
default_map.put("tsd.storage.compaction.flush_speed", "2");
default_map.put("tsd.http.show_stack_trace", "true");
default_map.put("tsd.http.query.allow_delete", "false");
default_map.put("tsd.http.request.enable_chunked", "false");
default_map.put("tsd.http.request.max_chunk", "4096");
default_map.put("tsd.http.request.cors_domains", "");
Expand Down Expand Up @@ -592,6 +601,7 @@ protected void loadStaticVariables() {
auto_tagk = this.getBoolean("tsd.core.auto_create_tagks");
auto_tagv = this.getBoolean("tsd.core.auto_create_tagvs");
enable_compactions = this.getBoolean("tsd.storage.enable_compaction");
allow_delete = this.getBoolean("tsd.http.query.allow_delete");
enable_chunked_requests = this.getBoolean("tsd.http.request.enable_chunked");
enable_realtime_ts = this.getBoolean("tsd.core.meta.enable_realtime_ts");
enable_realtime_uid = this.getBoolean("tsd.core.meta.enable_realtime_uid");
Expand Down
19 changes: 19 additions & 0 deletions test/core/TestTsdbQueryQueries.java
Original file line number Diff line number Diff line change
Expand Up @@ -1328,4 +1328,23 @@ public void runInterpolationMsDownsampled() throws Exception {
}
assertEquals(151, dps[0].size());
}

@Test
public void deleteDatapoints() throws Exception {
setDataPointStorage();

final HashMap<String, String> tags = new HashMap<String, String>(1);

tags.put("host", "web01");
tsdb.addPoint(METRIC_STRING, 1356998400, 42, tags).joinUninterruptibly();
query.setStartTime(1356998400);
query.setTimeSeries(METRIC_STRING, tags, Aggregators.SUM, false);

final DataPoints[] dps1 = query.run();
assertEquals(1, dps1.length);

query.setDelete(true);
final DataPoints[] dps2 = query.run();
assertEquals(0, dps2.length);
}
}