Skip to content

Commit

Permalink
HWKALERTS-50, 52 1st working test for func avg!
Browse files Browse the repository at this point in the history
- fix systemId expectation on the ExternalCondition
- fix time range (period in minutes, not seconds)
- interact better with rxjava by using BlockingObservable to ensure
  work is done before the Runnable exits.
- add the rest of the expression funcs (not tested)
- fix test so data does not overlap on consecutive runs
- make sure to inject data before the trigger is enabled
  • Loading branch information
jshaughn committed Jul 2, 2015
1 parent 2bb9a1c commit c9fea6e
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.hawkular.metrics.core.api.MetricsService;
import org.jboss.logging.Logger;

import rx.Observable;
import rx.observables.BlockingObservable;

/**
* Manages the Metrics expression evaluations and interacts with the Alerts system.
Expand All @@ -62,6 +62,9 @@ public class Manager {
private static final String TAG_CATEGORY = "HawkularMetrics";
private static final String TAG_NAME = "MetricsCondition";

private static final long DAY = 24L * 60L * 1000L;
private static final long WEEK = 7L * DAY;

// private static final String THREAD_POOL_NAME = "HawkularAlertsMetricsExpression";
private static final Integer THREAD_POOL_SIZE = 20;

Expand Down Expand Up @@ -133,7 +136,7 @@ private void refresh() {
for (Condition condition : conditions) {
if (condition instanceof ExternalCondition) {
ExternalCondition externalCondition = (ExternalCondition) condition;
if (TAG_NAME.equals(externalCondition.getSystemId())) {
if (TAG_CATEGORY.equals(externalCondition.getSystemId())) {
log.info("Found Metrics ExternalCondition! " + externalCondition);
activeConditions.add(externalCondition);
if (expressionFutures.containsKey(externalCondition)) {
Expand Down Expand Up @@ -205,24 +208,110 @@ public void run() {
String tenantId = trigger.getTenantId();
MetricId metricId = new MetricId(expression.getMetric());
long end = System.currentTimeMillis();
long start = end - (expression.getPeriod() * 1000);
long start = end - (expression.getPeriod() * 60000);

log.info("Running External Metrics Condition: " + expression);
Double value = Double.NaN;
switch (func) {
case avg:
log.info("Running External Metrics Condition: " + expression);
Observable<Double> average = metrics.findGaugeDataAverage(tenantId, metricId, start, end);
average.first().subscribe(this::evaluate);
// The approach below did not work. I'm not 100% sure why not but I think it's due to
// the run() method exiting before the async call returns. Anyway, by converting to a
// BlockingObserver and using .last() as opposed to .single() we seem to be OK.
//
//metrics.findGaugeDataAverage(tenantId, metricId, start, end).first().subscribe(
// this::evaluate,
// t -> log.debugf("Failed get-query for %s: %s", expression, t.getMessage()));

value = metrics.findGaugeDataAverage(tenantId, metricId, start, end).toBlocking().last();
break;
case avgd: {
Double avgToday = metrics.findGaugeDataAverage(tenantId, metricId, start, end).toBlocking()
.last();
Double avgYesterday = metrics
.findGaugeDataAverage(tenantId, metricId, (start - DAY), (end - DAY)).toBlocking()
.last();
value = avgToday - avgYesterday;
break;
}
case avgdp: {
Double avgToday = metrics.findGaugeDataAverage(tenantId, metricId, start, end).toBlocking()
.last();
Double avgYesterday = metrics
.findGaugeDataAverage(tenantId, metricId, (start - DAY), (end - DAY)).toBlocking()
.last();
value = avgToday / avgYesterday;
break;
}
case avgw: {
Double avgToday = metrics.findGaugeDataAverage(tenantId, metricId, start, end).toBlocking()
.last();
Double avgLastWeek = metrics
.findGaugeDataAverage(tenantId, metricId, (start - WEEK), (end - WEEK)).toBlocking()
.last();
value = avgToday - avgLastWeek;
break;
}
case avgwp: {
Double avgToday = metrics.findGaugeDataAverage(tenantId, metricId, start, end).toBlocking()
.last();
Double avgLastWeek = metrics
.findGaugeDataAverage(tenantId, metricId, (start - WEEK), (end - WEEK)).toBlocking()
.last();
value = avgToday / avgLastWeek;
break;
}
case card:
log.errorf("Not Yet Supported Function: %s", func);
break;
case delta: {
BlockingObservable<Double> bo = metrics.findGaugeDataRange(tenantId, metricId, start, end)
.toBlocking();
Double min = bo.first();
Double max = bo.last();
value = max - min;
break;
}
case deltap: {
BlockingObservable<Double> bo = metrics.findGaugeDataRange(tenantId, metricId, start, end)
.toBlocking();
Double min = bo.first();
Double max = bo.last();
Double avg = metrics.findGaugeDataAverage(tenantId, metricId, start, end).toBlocking().last();
value = (max - min) / avg;
break;
}
case down:
log.errorf("Not Yet Supported Function: %s", func);
break;
case max:
value = metrics.findGaugeDataMax(tenantId, metricId, start, end).toBlocking().last();
break;
case min:
value = metrics.findGaugeDataMin(tenantId, metricId, start, end).toBlocking().last();
break;
case up:
log.errorf("Not Yet Supported Function: %s", func);
break;
default:
log.errorf("Unexpected Expression Function: %s", func);
break;
}
} catch (Exception e) {
log.errorf("Unexpected failure in Expression handling: %s", expression);

evaluate(value);

} catch (Throwable t) {
log.debugf("Failed data fetch for %s: %s", expression, t.getMessage());
}
}

public void evaluate(Double value) {
if (value.isNaN()) {
log.debugf("NaN value, Ignoring External Metrics evaluation of %s", expression);
return;
}

log.info("Running External Metrics Evaluation: " + expression + ":" + value);

if (!expression.isTrue(value)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ class ExternalMetricsITest extends AbstractExternalITestBase {
assertEquals(200, resp.status)

// ADD external metrics avg condition. Note: systemId must be "HawkularMetrics"
// Average over the last 5 minutes > 50, check every minute
// Average over the last 1 minute > 50, check every minute
ExternalCondition firingCond = new ExternalCondition("trigger-test-avg", Mode.FIRING, "external-data-test-avg",
"HawkularMetrics", "metric:1:avg(data-test-avg > 50),5");
"HawkularMetrics", "metric:1:avg(data-test-avg > 50),1");

resp = client.post(path: "triggers/trigger-test-avg/conditions", body: firingCond)
assertEquals(200, resp.status)
Expand All @@ -77,17 +77,11 @@ class ExternalMetricsITest extends AbstractExternalITestBase {
resp = client.post(path: "triggers/tags/", body: tag)
assertEquals(200, resp.status)

// ENABLE Trigger
triggerTestAvg.setEnabled(true);

resp = client.put(path: "triggers/trigger-test-avg/", body: triggerTestAvg)
assertEquals(200, resp.status)

// FETCH trigger and make sure it's as expected
// FETCH trigger to validate and get the tenantId
resp = client.get(path: "triggers/trigger-test-avg");
assertEquals(200, resp.status)
assertEquals("trigger-test-avg", resp.data.name)
assertEquals(true, resp.data.enabled)
assertEquals(false, resp.data.enabled)
assertEquals(true, resp.data.autoDisable);
def tenantId = resp.data.tenantId;
assert( null != tenantId )
Expand All @@ -96,13 +90,21 @@ class ExternalMetricsITest extends AbstractExternalITestBase {
resp = client.get(path: "", query: [startTime:start,triggerIds:"trigger-test-avg"] )
assertEquals(204, resp.status)

// Send in METRICS data to have the External Manager send in external data to fire the trigger
// Send in METRICS data before enabling the trigger because the external evaluations start as soon
// as the enabled Trigger is processed.
long now = System.currentTimeMillis();
DataPoint dp1 = new DataPoint( "data-test-avg", 45.0, now - 180000 ); // 3 minutes ago
DataPoint dp2 = new DataPoint( "data-test-avg", 55.0, now - 120000 ); // 2 minutes ago
DataPoint dp3 = new DataPoint( "data-test-avg", 65.0, now - 60000 ); // 1 minutes ago
DataPoint dp1 = new DataPoint( "data-test-avg", 50.0, now - 30000 ); // 30 seconds ago
DataPoint dp2 = new DataPoint( "data-test-avg", 60.0, now - 25000 ); // 25 seconds ago
DataPoint dp3 = new DataPoint( "data-test-avg", 73.0, now - 20000 ); // 20 seconds ago
sendMetricDataViaRest( tenantId, dp1, dp2, dp3 );

// ENABLE Trigger, this should get picked up by the listener and the expression should be submitted
// to a runner for processing...
triggerTestAvg.setEnabled(true);

resp = client.put(path: "triggers/trigger-test-avg/", body: triggerTestAvg)
assertEquals(200, resp.status)

// The alert processing happens async, so give it a little time before failing...
for ( int i=0; i < 10; ++i ) {
println "SLEEP!" ;
Expand Down

0 comments on commit c9fea6e

Please sign in to comment.