Skip to content

Commit

Permalink
HWKALERTS-236 Watchers for events and alerts (#309)
Browse files Browse the repository at this point in the history
- Add support for filtering alerts by stime field without requiring status
- Introduce a REST stream watchers feature for alerts and events
- Make this feature available for crosstenant operations
  • Loading branch information
lucasponce authored and jshaughn committed Mar 27, 2017
1 parent 1d0eda1 commit 0cf11b8
Show file tree
Hide file tree
Showing 14 changed files with 1,003 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2016 Red Hat, Inc. and/or its affiliates
* Copyright 2015-2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -34,6 +34,7 @@ public enum Field {
CTIME("ctime"),
SEVERITY("severity"),
STATUS("status"),
STIME("stime"),
CONTEXT("context");

private String text;
Expand Down Expand Up @@ -110,6 +111,8 @@ public int compare(Alert o1, Alert o2) {
return o1.getAlertId().compareTo(o2.getAlertId()) * iOrder;
case CTIME:
return (int) ((o1.getCtime() - o2.getCtime()) * iOrder);
case STIME:
return (int) ((o1.getCurrentLifecycle().getStime() - o2.getCurrentLifecycle().getStime()) * iOrder);
case SEVERITY:
if (o1.getSeverity() == null && o2.getSeverity() == null) {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class AlertsCriteria {
Long endResolvedTime = null;
Long startAckTime = null;
Long endAckTime = null;
Long startStatusTime = null;
Long endStatusTime = null;
String alertId = null;
Collection<String> alertIds = null;
Alert.Status status = null;
Expand All @@ -53,7 +55,8 @@ public AlertsCriteria() {

public AlertsCriteria(Long startTime, Long endTime, String alertIds, String triggerIds,
String statuses, String severities, String tagQuery, Long startResolvedTime,
Long endResolvedTime, Long startAckTime, Long endAckTime, Boolean thin) {
Long endResolvedTime, Long startAckTime, Long endAckTime, Long startStatusTime,
Long endStatusTime, Boolean thin) {
setStartTime(startTime);
setEndTime(endTime);
if (!isEmpty(alertIds)) {
Expand Down Expand Up @@ -81,6 +84,8 @@ public AlertsCriteria(Long startTime, Long endTime, String alertIds, String trig
setEndResolvedTime(endResolvedTime);
setStartAckTime(startAckTime);
setEndAckTime(endAckTime);
setStartStatusTime(startStatusTime);
setEndStatusTime(endStatusTime);
if (null != thin) {
setThin(thin.booleanValue());
}
Expand Down Expand Up @@ -161,6 +166,30 @@ public void setEndAckTime(Long endAckTime) {
this.endAckTime = endAckTime;
}

public Long getStartStatusTime() {
return startStatusTime;
}

/**
* @param startStatusTime fetched Alerts must have at least one statusTime in the lifecycle greater than or equal to
* startStatusTime.
*/
public void setStartStatusTime(Long startStatusTime) {
this.startStatusTime = startStatusTime;
}

public Long getEndStatusTime() {
return endStatusTime;
}

/**
* @param endStatusTime fetched Alerts must have at least one statusTime in the lifecycle less than or equal to
* endStatusTime.
*/
public void setEndStatusTime(Long endStatusTime) {
this.endStatusTime = endStatusTime;
}

public String getAlertId() {
return alertId;
}
Expand Down Expand Up @@ -299,6 +328,10 @@ public boolean hasAckTimeCriteria() {
return (null != startAckTime || null != endAckTime);
}

public boolean hasStatusTimeCriteria() {
return (null != startStatusTime || null != endStatusTime);
}

public boolean hasCriteria() {
return hasAlertIdCriteria()
|| hasStatusCriteria()
Expand All @@ -307,17 +340,32 @@ public boolean hasCriteria() {
|| hasCTimeCriteria()
|| hasTriggerIdCriteria()
|| hasResolvedTimeCriteria()
|| hasAckTimeCriteria();
|| hasAckTimeCriteria()
|| hasStatusTimeCriteria();
}

@Override
public String toString() {
return "AlertsCriteria [startTime=" + startTime + ", endTime=" + endTime + ", alertId=" + alertId
+ ", alertIds=" + alertIds + ", status=" + status + ", statusSet=" + statusSet + ", severity="
+ severity + ", severities=" + severities + ", triggerId=" + triggerId + ", triggerIds=" + triggerIds
+ ", tagQuery=" + tagQuery + ", startAckTime=" + startAckTime
+ ", endAckTime=" + endAckTime + ", " + "startResolvedTime=" + startResolvedTime
+ ", endResolvedTime=" + endResolvedTime + ", " + "thin=" + thin + "]";
return "AlertsCriteria{" +
"startTime=" + startTime +
", endTime=" + endTime +
", startResolvedTime=" + startResolvedTime +
", endResolvedTime=" + endResolvedTime +
", startAckTime=" + startAckTime +
", endAckTime=" + endAckTime +
", startStatusTime=" + startStatusTime +
", endStatusTime=" + endStatusTime +
", alertId='" + alertId + '\'' +
", alertIds=" + alertIds +
", status=" + status +
", statusSet=" + statusSet +
", severity=" + severity +
", severities=" + severities +
", triggerId='" + triggerId + '\'' +
", triggerIds=" + triggerIds +
", tagQuery='" + tagQuery + '\'' +
", thin=" + thin +
'}';
}

private static boolean isEmpty(String s) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public void init() {
definitions.registerListener(events -> {
log.debugf("Receiving %s", events);
events.stream().forEach(e -> {
log.debugf("Received %s", e);
String tenantId = e.getTargetTenantId();
String triggerId = e.getTargetId();
TriggerKey triggerKey = new TriggerKey(tenantId, triggerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public void addAlerts(Collection<Alert> alerts) throws Exception {
PreparedStatement insertAlert = CassStatement.get(session, CassStatement.INSERT_ALERT);
PreparedStatement insertAlertTrigger = CassStatement.get(session, CassStatement.INSERT_ALERT_TRIGGER);
PreparedStatement insertAlertCtime = CassStatement.get(session, CassStatement.INSERT_ALERT_CTIME);
PreparedStatement insertAlertStime = CassStatement.get(session, CassStatement.INSERT_ALERT_STIME);
PreparedStatement insertTag = CassStatement.get(session, CassStatement.INSERT_TAG);

try {
Expand All @@ -183,6 +184,7 @@ public void addAlerts(Collection<Alert> alerts) throws Exception {
batch.add(insertAlert.bind(a.getTenantId(), a.getAlertId(), JsonUtil.toJson(a)));
batch.add(insertAlertTrigger.bind(a.getTenantId(), a.getAlertId(), a.getTriggerId()));
batch.add(insertAlertCtime.bind(a.getTenantId(), a.getAlertId(), a.getCtime()));
batch.add(insertAlertStime.bind(a.getTenantId(), a.getAlertId(), a.getCurrentLifecycle().getStime()));

a.getTags().entrySet().stream().forEach(tag -> {
batch.add(insertTag.bind(a.getTenantId(), TagType.ALERT.name(),
Expand Down Expand Up @@ -709,6 +711,22 @@ private List<Alert> getAlerts(String tenantId, AlertsCriteria criteria) throws E
activeFilter = true;
}

/*
Get alertsIds filteres by status time clause
*/
if (criteria.hasStatusTimeCriteria()) {
Set<String> alertIdsFilteredByStatusTime = filterByStatusTime(tenantId, criteria);
if (activeFilter) {
alertIds.retainAll(alertIdsFilteredByStatusTime);
} else {
alertIds.addAll(alertIdsFilteredByStatusTime);
}
if (alertIds.isEmpty()) {
return alerts;
}
activeFilter = true;
}

/*
Below this point we filter manually because the remaining filters have a low cardinality of
values, and are not efficiently handled with database indexes and the intersection-based approach.
Expand Down Expand Up @@ -949,6 +967,37 @@ private void filterByStatuses(String tenantId, AlertsCriteria criteria, Collecti
}
}

private Set<String> filterByStatusTime(String tenantId, AlertsCriteria criteria) throws Exception {
Set<String> result = Collections.emptySet();

if (criteria.getStartStatusTime() != null || criteria.getEndStatusTime() != null) {
result = new HashSet<>();

BoundStatement boundStime;
if (criteria.getStartStatusTime() != null && criteria.getEndStatusTime() != null) {
PreparedStatement selectAlertSTimeStartEnd = CassStatement.get(session,
CassStatement.SELECT_ALERT_STIME_START_END);
boundStime = selectAlertSTimeStartEnd.bind(tenantId, criteria.getStartStatusTime(),
criteria.getEndStatusTime());
} else if (criteria.getStartStatusTime() != null) {
PreparedStatement selectAlertSTimeStart = CassStatement.get(session,
CassStatement.SELECT_ALERT_STIME_START);
boundStime = selectAlertSTimeStart.bind(tenantId, criteria.getStartStatusTime());
} else {
PreparedStatement selectAlertSTimeEnd = CassStatement.get(session,
CassStatement.SELECT_ALERT_STIME_END);
boundStime = selectAlertSTimeEnd.bind(tenantId, criteria.getEndStatusTime());
}

ResultSet rsAlertsStimes = session.execute(boundStime);
for (Row row : rsAlertsStimes) {
String alertId = row.getString("alertId");
result.add(alertId);
}
}
return result;
}

private Set<String> filterByEvents(EventsCriteria criteria) {
Set<String> result = Collections.emptySet();
if (isEmpty(criteria.getEventIds())) {
Expand Down Expand Up @@ -1668,6 +1717,7 @@ public int deleteAlerts(String tenantId, AlertsCriteria criteria) throws Excepti
PreparedStatement deleteAlertCtime = CassStatement.get(session, CassStatement.DELETE_ALERT_CTIME);
PreparedStatement deleteAlertTrigger = CassStatement.get(session, CassStatement.DELETE_ALERT_TRIGGER);
PreparedStatement deleteAlertLifecycle = CassStatement.get(session, CassStatement.DELETE_ALERT_LIFECYCLE);
PreparedStatement deleteAlertStime = CassStatement.get(session, CassStatement.DELETE_ALERT_STIME);
if (deleteAlert == null || deleteAlertCtime == null || deleteAlertTrigger == null
|| deleteAlertLifecycle == null) {
throw new RuntimeException("delete*Alerts PreparedStatement is null");
Expand All @@ -1683,6 +1733,7 @@ public int deleteAlerts(String tenantId, AlertsCriteria criteria) throws Excepti
batch.add(deleteAlertTrigger.bind(tenantId, a.getTriggerId(), id));
a.getLifecycle().stream().forEach(l -> {
batch.add(deleteAlertLifecycle.bind(tenantId, l.getStatus().name(), l.getStime(), a.getAlertId()));
batch.add(deleteAlertStime.bind(tenantId, l.getStime(), a.getAlertId()));
});
i += batch.size();
if (i > batchSize) {
Expand Down Expand Up @@ -1830,6 +1881,8 @@ private Alert updateAlertStatus(Alert alert) throws Exception {
try {
PreparedStatement insertAlertLifecycle = CassStatement.get(session,
CassStatement.INSERT_ALERT_LIFECYCLE);
PreparedStatement insertAlertStime = CassStatement.get(session,
CassStatement.INSERT_ALERT_STIME);
PreparedStatement updateAlert = CassStatement.get(session,
CassStatement.UPDATE_ALERT);

Expand All @@ -1838,12 +1891,13 @@ private Alert updateAlertStatus(Alert alert) throws Exception {
if (lifecycle != null) {
futures.add(session.executeAsync(insertAlertLifecycle.bind(alert.getTenantId(), alert.getAlertId(),
lifecycle.getStatus().name(), lifecycle.getStime())));
futures.add(session.executeAsync(insertAlertStime.bind(alert.getTenantId(), alert.getAlertId(),
lifecycle.getStime())));
}
futures.add(session.executeAsync(updateAlert.bind(JsonUtil.toJson(alert), alert.getTenantId(),
alert.getAlertId())));

@SuppressWarnings("unused")
List<ResultSet> rsAlertsStatusToDelete = Futures.allAsList(futures).get();
Futures.allAsList(futures).get();

} catch (Exception e) {
msgLog.errorDatabaseException(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class CassStatement {
public static final String DELETE_ALERT;
public static final String DELETE_ALERT_CTIME;
public static final String DELETE_ALERT_LIFECYCLE;
public static final String DELETE_ALERT_STIME;
public static final String DELETE_ALERT_TRIGGER;
public static final String DELETE_CONDITIONS;
public static final String DELETE_CONDITIONS_MODE;
Expand All @@ -70,6 +71,7 @@ public class CassStatement {
public static final String INSERT_ALERT;
public static final String INSERT_ALERT_CTIME;
public static final String INSERT_ALERT_LIFECYCLE;
public static final String INSERT_ALERT_STIME;
public static final String INSERT_ALERT_TRIGGER;
public static final String INSERT_CONDITION_AVAILABILITY;
public static final String INSERT_CONDITION_COMPARE;
Expand Down Expand Up @@ -116,6 +118,9 @@ public class CassStatement {
public static final String SELECT_ALERT_LIFECYCLE_END;
public static final String SELECT_ALERT_LIFECYCLE_START;
public static final String SELECT_ALERT_LIFECYCLE_START_END;
public static final String SELECT_ALERT_STIME_END;
public static final String SELECT_ALERT_STIME_START;
public static final String SELECT_ALERT_STIME_START_END;
public static final String SELECT_ALERT_TRIGGER;
public static final String SELECT_ALERTS_BY_TENANT;
public static final String SELECT_CONDITION_ID;
Expand Down Expand Up @@ -188,6 +193,9 @@ public class CassStatement {
DELETE_ALERT_LIFECYCLE = "DELETE FROM " + keyspace + ".alerts_lifecycle "
+ "WHERE tenantId = ? AND status = ? AND stime = ? AND alertId = ? ";

DELETE_ALERT_STIME = "DELETE FROM " + keyspace + ".alerts_stimes "
+ "WHERE tenantId = ? AND stime = ? AND alertId = ? ";

DELETE_ALERT_TRIGGER = "DELETE FROM " + keyspace + ".alerts_triggers "
+ "WHERE tenantId = ? AND triggerId = ? AND alertId = ? ";

Expand Down Expand Up @@ -257,6 +265,9 @@ public class CassStatement {
INSERT_ALERT_LIFECYCLE = "INSERT INTO " + keyspace + ".alerts_lifecycle "
+ "(tenantId, alertId, status, stime) VALUES (?, ?, ?, ?) ";

INSERT_ALERT_STIME = "INSERT INTO " + keyspace + ".alerts_stimes "
+ "(tenantId, alertId, stime) VALUES (?, ?, ?) ";

INSERT_ALERT_TRIGGER = "INSERT INTO " + keyspace + ".alerts_triggers "
+ "(tenantId, alertId, triggerId) VALUES (?, ?, ?) ";

Expand Down Expand Up @@ -406,6 +417,15 @@ public class CassStatement {
SELECT_ALERT_LIFECYCLE_START_END = "SELECT alertId FROM " + keyspace + ".alerts_lifecycle "
+ "WHERE tenantId = ? AND status = ? AND stime >= ? AND stime <= ? ";

SELECT_ALERT_STIME_END = "SELECT alertId FROM " + keyspace + ".alerts_stimes "
+ "WHERE tenantId = ? AND stime <= ? ";

SELECT_ALERT_STIME_START = "SELECT alertId FROM " + keyspace + ".alerts_stimes "
+ "WHERE tenantId = ? AND stime >= ? ";

SELECT_ALERT_STIME_START_END = "SELECT alertId FROM " + keyspace + ".alerts_stimes "
+ "WHERE tenantId = ? AND stime >= ? AND stime <= ? ";

SELECT_ALERTS_BY_TENANT = "SELECT payload FROM " + keyspace + ".alerts " + "WHERE tenantId = ? ";

SELECT_ALERT_TRIGGER = "SELECT alertId FROM " + keyspace + ".alerts_triggers "
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2016 Red Hat, Inc. and/or its affiliates
* Copyright 2015-2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -25,3 +25,4 @@ include '/org/hawkular/alerts/schema/updates/schema-1.2.1.groovy'
include '/org/hawkular/alerts/schema/updates/schema-1.2.3.groovy'
include '/org/hawkular/alerts/schema/updates/schema-1.4.0.groovy'
include '/org/hawkular/alerts/schema/updates/schema-1.5.0.groovy'
include '/org/hawkular/alerts/schema/updates/schema-1.6.0.groovy'
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2015-2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

setKeyspace keyspace

schemaChange {
version '5.0'
author 'lponce'
tags '1.6.x'
cql """
CREATE TABLE alerts_stimes (
tenantId text,
alertId text,
stime bigint,
PRIMARY KEY (tenantId, stime, alertId)
)
"""
verify { tableExists(keyspace, "alerts_stimes") }
}
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ class ActionsITest extends AbstractITestBase {

// Check actions generated
// This used to fail randomly, therefore try several times before failing
for ( int i=0; i < 20; ++i ) {
for ( int i=0; i < 30; ++i ) {
resp = client.get(path: "actions/history",
query: [startTime:start,actionPlugins:"email",
actionIds:"global-action-notify-to-admins,global-action-notify-to-developers"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class AlertsITest extends AbstractITestBase {

resp = client.get(path: "", query: [endAckTime:now, startAckTime:"0"] )
assert resp.status == 200 : resp.status

resp = client.get(path: "", query: [endStatusTime:now, startStatusTime:"0"] )
assert resp.status == 200 : resp.status
}

@Test
Expand Down Expand Up @@ -89,6 +92,9 @@ class AlertsITest extends AbstractITestBase {

resp = client.put(path: "delete", query: [endAckTime:now, startAckTime:"0"] )
assert resp.status == 200 : resp.status

resp = client.put(path: "delete", query: [endStatusTime:now, startStatusTime:"0"] )
assert resp.status == 200 : resp.status
}

}

0 comments on commit 0cf11b8

Please sign in to comment.