Skip to content

Commit

Permalink
HWKALERTS-50 work continuing, not yet working...
Browse files Browse the repository at this point in the history
- added a Producer for finally getting a MetricsService injected into the
  alerts-metrics Manager. And finally got a session started for the
  hawkular_metrics keyspace.
  - required a tweak to metrics core to create a session that does not
    try to install the schema.
- figure out how to establish a local MetricsSession for the hawular_metrics
- fix bug in fetching triggers by tags cross-tenant
- fix issue in ExternalCondition constructor
- fix persistence issue
  - add ExternalCondition handling to setConditions/insert
  - TODO: overloading condition.operator, pattern. Should we add
          explicit fields to the schema?
- fix issue in test, needed to tag the test trigger as a metrics-trigger.
- make the Manager bean @startup so that it gets created immediately,
  registers its listener and starts looking for expressions to process.
  • Loading branch information
jshaughn committed Jul 2, 2015
1 parent a20bf3f commit 2bb9a1c
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public ExternalCondition(String triggerId, Mode triggerMode, String dataId, Stri
public ExternalCondition(String triggerId, Mode triggerMode, int conditionSetSize, int conditionSetIndex,
String dataId, String systemId, String expression) {
super(triggerId, triggerMode, conditionSetSize, conditionSetIndex, Type.EXTERNAL);

this.systemId = systemId;
this.dataId = dataId;
this.expression = expression;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,22 +646,45 @@ public Collection<Trigger> getAllTriggersByTag(String category, String name) thr
if (selectTagsTriggersAllByTag == null) {
throw new RuntimeException("selectTagsTriggersAllByTag PreparedStatement is null");
}
List<Trigger> triggers = new ArrayList<>();
Map<String, Set<String>> tenantTriggerIdsMap = new HashMap<>();
try {
BoundStatement bs = isEmpty(category) ?
selectTagsTriggersAllByTag.bind(name) :
selectTagsTriggersAllByTag.bind(category, name);
ResultSet rsTriggers = session.execute(bs);
for (Row row : rsTriggers) {
Trigger trigger = mapTrigger(row);
selectTriggerActions(trigger);
triggers.add(trigger);
ResultSet rsTriggerIds = session.execute(bs);
for (Row row : rsTriggerIds) {
String tenantId = row.getString("tenantId");
Set<String> triggerIds = row.getSet("triggers", String.class);
Set<String> storedTriggerIds = tenantTriggerIdsMap.get(tenantId);
if (null != storedTriggerIds) {
triggerIds.addAll(storedTriggerIds);
}
tenantTriggerIdsMap.put(tenantId, triggerIds);
}

// Now, generate a cross-tenant result set if Triggers using the tenantIds and triggerIds
List<Trigger> triggers = new ArrayList<>();
PreparedStatement selectTrigger = CassStatement.get(session, CassStatement.SELECT_TRIGGER);
for (Map.Entry<String, Set<String>> entry : tenantTriggerIdsMap.entrySet()) {
String tenantId = entry.getKey();
Set<String> triggerIds = entry.getValue();
List<ResultSetFuture> futures = triggerIds.stream().map(triggerId ->
session.executeAsync(selectTrigger.bind(tenantId, triggerId)))
.collect(Collectors.toList());
List<ResultSet> rsTriggers = Futures.allAsList(futures).get();
rsTriggers.stream().forEach(r -> {
for (Row row : r) {
triggers.add(mapTrigger(row));
}
});
}

return triggers;

} catch (Exception e) {
msgLog.errorDatabaseException(e.getMessage());
throw e;
}
return triggers;
}

private void selectTriggerActions(Trigger trigger) throws Exception {
Expand Down Expand Up @@ -1148,13 +1171,16 @@ public Collection<Condition> setConditions(String tenantId, String triggerId, Tr
PreparedStatement insertConditionAvailability = CassStatement.get(session,
CassStatement.INSERT_CONDITION_AVAILABILITY);
PreparedStatement insertConditionCompare = CassStatement.get(session, CassStatement.INSERT_CONDITION_COMPARE);
PreparedStatement insertConditionExternal = CassStatement
.get(session, CassStatement.INSERT_CONDITION_EXTERNAL);
PreparedStatement insertConditionString = CassStatement.get(session, CassStatement.INSERT_CONDITION_STRING);
PreparedStatement insertConditionThreshold = CassStatement.get(session,
CassStatement.INSERT_CONDITION_THRESHOLD);
PreparedStatement insertConditionThresholdRange = CassStatement.get(session,
CassStatement.INSERT_CONDITION_THRESHOLD_RANGE);
if (insertConditionAvailability == null
|| insertConditionCompare == null
|| insertConditionExternal == null
|| insertConditionString == null
|| insertConditionThreshold == null
|| insertConditionThresholdRange == null) {
Expand Down Expand Up @@ -1196,6 +1222,14 @@ public Collection<Condition> setConditions(String tenantId, String triggerId, Tr
cCond.getConditionSetIndex(), cCond.getConditionId(), cCond.getDataId(),
cCond.getOperator().name(), cCond.getData2Id(), cCond.getData2Multiplier())));

} else if (cond instanceof ExternalCondition) {

ExternalCondition eCond = (ExternalCondition) cond;
futures.add(session.executeAsync(insertConditionExternal.bind(eCond.getTenantId(), eCond
.getTriggerId(), eCond.getTriggerMode().name(), eCond.getConditionSetSize(),
eCond.getConditionSetIndex(), eCond.getConditionId(), eCond.getDataId(),
eCond.getSystemId(), eCond.getExpression())));

} else if (cond instanceof StringCondition) {

StringCondition sCond = (StringCondition) cond;
Expand All @@ -1220,6 +1254,9 @@ public Collection<Condition> setConditions(String tenantId, String triggerId, Tr
rCond.getConditionSetIndex(), rCond.getConditionId(), rCond.getDataId(),
rCond.getOperatorLow().name(), rCond.getOperatorHigh().name(), rCond.getThresholdLow(),
rCond.getThresholdHigh(), rCond.isInRange())));

} else {
throw new IllegalArgumentException("Unexpected ConditionType: " + cond);
}

// generate the automatic dataId tags for search
Expand Down Expand Up @@ -1587,8 +1624,8 @@ private Condition mapCondition(Row row) throws Exception {
eCondition.setConditionSetSize(row.getInt("conditionSetSize"));
eCondition.setConditionSetIndex(row.getInt("conditionSetIndex"));
eCondition.setDataId(row.getString("dataId"));
eCondition.setSystemId(row.getString("systemId"));
eCondition.setExpression(row.getString("expression"));
eCondition.setSystemId(row.getString("operator"));
eCondition.setExpression(row.getString("pattern"));
condition = eCondition;
break;
case RANGE:
Expand Down Expand Up @@ -2085,11 +2122,14 @@ public List<Tag> getTriggerTags(String tenantId, String triggerId, String catego

@Override
public void registerListener(DefinitionsListener listener, EventType eventType, EventType... eventTypes) {
listeners.put(listener, EnumSet.of(eventType, eventTypes));
EnumSet<EventType> types = EnumSet.of(eventType, eventTypes);
log.debugf("Registering listeners %s for event types", listener, types);
listeners.put(listener, types);
}

private void notifyListeners(EventType eventType) {
DefinitionsEvent de = new DefinitionsEvent(eventType);
log.debugf("Notifying applicable listeners %s of event %s", listeners, eventType.name());
for (Map.Entry<DefinitionsListener, Set<EventType>> me : listeners.entrySet()) {
if (me.getValue().contains(eventType)) {
log.debugf("Notified Listener %s", eventType.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class CassStatement {
public static final String INSERT_ALERT_STATUS;
public static final String INSERT_CONDITION_AVAILABILITY;
public static final String INSERT_CONDITION_COMPARE;
public static final String INSERT_CONDITION_EXTERNAL;
public static final String INSERT_CONDITION_STRING;
public static final String INSERT_CONDITION_THRESHOLD;
public static final String INSERT_CONDITION_THRESHOLD_RANGE;
Expand Down Expand Up @@ -180,6 +181,10 @@ public class CassStatement {
+ "(tenantId, triggerId, triggerMode, type, conditionSetSize, conditionSetIndex, conditionId, "
+ "dataId, operator, data2Id, data2Multiplier) VALUES (?, ?, ?, 'COMPARE', ?, ?, ?, ?, ?, ?, ?) ";

INSERT_CONDITION_EXTERNAL = "INSERT INTO " + keyspace + ".conditions "
+ "(tenantId, triggerId, triggerMode, type, conditionSetSize, conditionSetIndex, conditionId, "
+ "dataId, operator, pattern) VALUES (?, ?, ?, 'EXTERNAL', ?, ?, ?, ?, ?, ?) ";

INSERT_CONDITION_STRING = "INSERT INTO " + keyspace + ".conditions "
+ "(tenantId, triggerId, triggerMode, type, conditionSetSize, conditionSetIndex, conditionId, "
+ "dataId, operator, pattern, ignoreCase) VALUES (?, ?, ?, 'STRING', ?, ?, ?, ?, ?, ?, ?) ";
Expand Down Expand Up @@ -301,11 +306,11 @@ public class CassStatement {
+ "FROM " + keyspace + ".tags "
+ "WHERE tenantId = ? AND triggerId = ? AND name = ? ";

SELECT_TAGS_TRIGGERS_ALL_BY_CATEGORY_AND_NAME = "SELECT triggers "
SELECT_TAGS_TRIGGERS_ALL_BY_CATEGORY_AND_NAME = "SELECT tenantId, triggers "
+ "FROM " + keyspace + ".tags_triggers "
+ "WHERE category = ? AND name = ? ";

SELECT_TAGS_TRIGGERS_ALL_BY_NAME = "SELECT triggers "
SELECT_TAGS_TRIGGERS_ALL_BY_NAME = "SELECT tenantId, triggers "
+ "FROM " + keyspace + ".tags_triggers "
+ "WHERE name = ? ";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.alerts.external.metrics;

import org.jboss.logging.Logger;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.Session;

/**
* Cassandra cluster representation and session factory.
*
* @author Lucas Ponce
*/
public class CassCluster {
private static final Logger log = Logger.getLogger(CassCluster.class);
private static final String ALERTS_CASSANDRA_PORT = "hawkular-alerts-metrics.cassandra-cql-port";
private static final String ALERTS_CASSANDRA_NODES = "hawkular-alerts-metrics.cassandra-nodes";
private static final String ALERTS_CASSANDRA_RETRY_ATTEMPTS = "hawkular-alerts-metrics.cassandra-retry-attempts";
private static final String ALERTS_CASSANDRA_RETRY_TIMEOUT = "hawkular-alerts-metrics.cassandra-retry-timeout";

private static Cluster cluster = null;

private static Session session = null;

private CassCluster() { }

public static Session getSession() throws Exception {
if (cluster == null && session == null) {
String cqlPort = System.getProperty(ALERTS_CASSANDRA_PORT, "9042");
String nodes = System.getProperty(ALERTS_CASSANDRA_NODES, "127.0.0.1");
int attempts = Integer.parseInt(System.getProperty(ALERTS_CASSANDRA_RETRY_ATTEMPTS, "5"));
int timeout = Integer.parseInt(System.getProperty(ALERTS_CASSANDRA_RETRY_TIMEOUT, "2000"));
/*
It might happen that alerts component is faster than embedded cassandra deployed in hawkular.
We will provide a simple attempt/retry loop to avoid issues at initialization.
*/
while(session == null && !Thread.currentThread().isInterrupted() && attempts >= 0) {
try {
cluster = new Cluster.Builder()
.addContactPoints(nodes.split(","))
.withPort(new Integer(cqlPort))
.withProtocolVersion(ProtocolVersion.V3)
.build();
session = cluster.connect();
} catch (Exception e) {
log.warn("Could not connect to Cassandra cluster - assuming is not up yet. Cause: " +
((e.getCause() == null) ? e : e.getCause()));
if (attempts == 0) {
throw e;
}
}
if (session == null) {
log.warn("[" + attempts + "] Retrying connecting to Cassandra cluster in [" + timeout + "]ms...");
attempts--;
try {
Thread.sleep(timeout);
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
return session;
}

public static void shutdown() {
if (session != null && !session.isClosed()) {
session.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import javax.annotation.PreDestroy;
import javax.ejb.EJB;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.inject.Inject;

import org.hawkular.alerts.api.model.condition.Condition;
Expand All @@ -53,6 +54,7 @@
* @author Jay Shaughnessy
* @author Lucas Ponce
*/
@Startup
@Singleton
public class Manager {
private final Logger log = Logger.getLogger(Manager.class);
Expand All @@ -69,6 +71,9 @@ public class Manager {
@Inject
private MetricsService metrics;

//private MetricsServiceImpl metrics;
//private DataAccess dataAccess;

@EJB
private DefinitionsService definitions;

Expand All @@ -77,8 +82,10 @@ public class Manager {

@PostConstruct
public void init() {
log.debugf("Initializing Hawkular Alerts-Metrics Manager...");
expressionExecutor = new ScheduledThreadPoolExecutor(THREAD_POOL_SIZE);

log.debugf("Registering Trigger UPDATE/REMOVE listener");
definitions.registerListener(new DefinitionsListener() {
@Override
public void onChange(DefinitionsEvent event) {
Expand All @@ -89,6 +96,7 @@ public void onChange(DefinitionsEvent event) {

@PreDestroy
public void shutdown() {
log.debugf("Shutting down Hawkular Alerts-Metrics Manager...");
if (null != expressionFutures) {
expressionFutures.values().forEach(f -> f.cancel(true));
}
Expand All @@ -104,13 +112,16 @@ private void refresh() {

// get all of the triggers tagged for hawkular metrics
Collection<Trigger> triggers = definitions.getAllTriggersByTag(TAG_CATEGORY, TAG_NAME);
log.infof("Found [%s] External Metrics Triggers!", triggers.size());

// for each trigger look for Metrics Conditions and start running them
Collection<Condition> conditions = null;
for (Trigger trigger : triggers) {
try {
if (trigger.isEnabled()) {
conditions = definitions.getTriggerConditions(trigger.getTenantId(), trigger.getId(), null);
log.infof("Checking [%s] Conditions for enabled trigger [%s]!", conditions.size(),
trigger.getName());
}
} catch (Exception e) {
log.error("Failed to fetch Conditions when scheduling metrics conditions for " + trigger, e);
Expand Down Expand Up @@ -154,6 +165,7 @@ private void refresh() {
for (Map.Entry<ExternalCondition, ScheduledFuture<?>> me : expressionFutures.entrySet()) {
ExternalCondition ec = me.getKey();
if (!activeConditions.contains(ec)) {
log.infof("Canceling evaluation of obsolete External Metric Condition %s", ec);
me.getValue().cancel(true);
temp.add(ec);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.alerts.external.metrics;

import javax.enterprise.inject.Produces;

import org.hawkular.metrics.core.api.MetricsService;
import org.hawkular.metrics.core.impl.MetricsServiceImpl;

import com.datastax.driver.core.Session;

/**
* @author jay shaughnessy
* @author john sanda
*/
public class MetricsServiceProducer {

private MetricsServiceImpl metricsService;

@Produces
public MetricsService getMetricsService() throws Exception {
if (metricsService == null) {
metricsService = new MetricsServiceImpl();
String keyspace = "hawkular_metrics";
Session session = CassCluster.getSession();
boolean resetDB = false;
boolean createSchema = false;
metricsService.startUp(session, keyspace, resetDB, createSchema);
}

return metricsService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.util.List

import org.hawkular.alerts.api.model.data.StringData
import org.hawkular.alerts.api.model.condition.ExternalCondition
import org.hawkular.alerts.api.model.trigger.Tag
import org.hawkular.alerts.api.model.trigger.Trigger

import static org.hawkular.alerts.api.model.trigger.Trigger.Mode
Expand Down Expand Up @@ -71,6 +72,11 @@ class ExternalMetricsITest extends AbstractExternalITestBase {
assertEquals(200, resp.status)
assertEquals(1, resp.data.size())

// Tag the trigger as a HawkularMetrics:MetricsCondition so it gets picked up for processing
Tag tag = new Tag( "trigger-test-avg", "HawkularMetrics", "MetricsCondition" );
resp = client.post(path: "triggers/tags/", body: tag)
assertEquals(200, resp.status)

// ENABLE Trigger
triggerTestAvg.setEnabled(true);

Expand Down

0 comments on commit 2bb9a1c

Please sign in to comment.