Skip to content

Commit

Permalink
HWKALERTS-47 change strategy for cross-tenant query
Browse files Browse the repository at this point in the history
As per PR feedback, instead of using "ALLOW FILTERING", instead
query for the tenants/partitions and then perform async queries against
each partition.
  • Loading branch information
jshaughn committed Jul 8, 2015
1 parent b5d9598 commit 76e9846
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ private void initTriggers(File fFolder) throws Exception {
if (triggersFile.exists() && triggersFile.isFile()) {
Map<String, Object> triggers = objectMapper.readValue(triggersFile, Map.class);
if (triggers != null && !triggers.isEmpty() && triggers.get("triggers") != null) {
List<Map<String,Object>> aTriggers = (List<Map<String,Object>>)triggers.get("triggers");
for (Map<String,Object> t : aTriggers) {
List<Map<String, Object>> aTriggers = (List<Map<String, Object>>)triggers.get("triggers");
for (Map<String, Object> t : aTriggers) {
String tenantId = (String)t.get("tenantId");
String triggerId = (String)t.get("triggerId");
boolean enabled = (Boolean)t.get("enabled");
Expand All @@ -185,7 +185,7 @@ private void initTriggers(File fFolder) throws Exception {
TriggerTemplate.Match firingMatch = TriggerTemplate.Match.valueOf((String)t.get("firingMatch"));
TriggerTemplate.Match autoResolveMatch = TriggerTemplate.Match
.valueOf((String)t.get("autoResolveMatch"));
List<Map<String, String>> actions = (List<Map<String, String>>) t.get("actions");
List<Map<String, String>> actions = (List<Map<String, String>>)t.get("actions");

Trigger trigger = new Trigger(triggerId, name);
trigger.setEnabled(enabled);
Expand Down Expand Up @@ -214,8 +214,8 @@ private void initConditions(File initFolder) throws Exception {
if (conditionsFile.exists() && conditionsFile.isFile()) {
Map<String, Object> conditions = objectMapper.readValue(conditionsFile, Map.class);
if (conditions != null && !conditions.isEmpty() && conditions.get("conditions") != null) {
List<Map<String, Object>> aConditions = (List<Map<String, Object>>) conditions.get("conditions");
for (Map<String, Object> c: aConditions) {
List<Map<String, Object>> aConditions = (List<Map<String, Object>>)conditions.get("conditions");
for (Map<String, Object> c : aConditions) {
String tenantId = (String)c.get("tenantId");
String triggerId = (String)c.get("triggerId");
Trigger.Mode triggerMode = Trigger.Mode.valueOf((String)c.get("triggerMode"));
Expand Down Expand Up @@ -340,7 +340,7 @@ private void initDampenings(File initFolder) throws Exception {
if (dampeningFile.exists() && dampeningFile.isFile()) {
Map<String, Object> dampenings = objectMapper.readValue(dampeningFile, Map.class);
if (dampenings != null && !dampenings.isEmpty() && dampenings.get("dampenings") != null) {
List<Map<String, Object>> aDampenings = (List<Map<String, Object>>) dampenings.get("dampenings");
List<Map<String, Object>> aDampenings = (List<Map<String, Object>>)dampenings.get("dampenings");
for (Map<String, Object> d : aDampenings) {
String tenantId = (String)d.get("tenantId");
String triggerId = (String)d.get("triggerId");
Expand Down Expand Up @@ -632,35 +632,51 @@ public Collection<Trigger> getAllTriggersByTag(String category, String name) thr
if (session == null) {
throw new RuntimeException("Cassandra session is null");
}
PreparedStatement selectTagsTriggersAllByTag = isEmpty(category) ?
CassStatement.get(session, CassStatement.SELECT_TAGS_TRIGGERS_ALL_BY_NAME) :
CassStatement.get(session, CassStatement.SELECT_TAGS_TRIGGERS_ALL_BY_CATEGORY_AND_NAME);
if (selectTagsTriggersAllByTag == null) {
throw new RuntimeException("selectTagsTriggersAllByTag PreparedStatement is null");
}
Map<String, Set<String>> tenantTriggerIdsMap = new HashMap<>();

try {
BoundStatement bs = isEmpty(category) ?
selectTagsTriggersAllByTag.bind(name) :
selectTagsTriggersAllByTag.bind(category, name);
ResultSet rsTriggerIds = session.execute(bs);
for (Row row : rsTriggerIds) {
String tenantId = row.getString("tenantId");
Set<String> triggerIds = row.getSet("triggers", String.class);
Set<String> storedTriggerIds = tenantTriggerIdsMap.get(tenantId);
if (null != storedTriggerIds) {
triggerIds.addAll(storedTriggerIds);
}
tenantTriggerIdsMap.put(tenantId, triggerIds);
// first, get all the partitions for tags (i.e. tenants)
BoundStatement bs = CassStatement.get(session, CassStatement.SELECT_PARTITIONS_TAGS).bind();
Set<String> tenants = new HashSet<>();
for (Row row : session.execute(bs)) {
tenants.add(row.getString("tenantId"));
}

// next, get all of the tagged triggerIds
PreparedStatement selectTagsTriggers = isEmpty(category) ?
CassStatement.get(session, CassStatement.SELECT_TAGS_TRIGGERS_BY_NAME) :
CassStatement.get(session, CassStatement.SELECT_TAGS_TRIGGERS_BY_CATEGORY_AND_NAME);
if (selectTagsTriggers == null) {
throw new RuntimeException("selectTagsTriggersByTag PreparedStatement is null");
}

Map<String, Set<String>> tenantTriggerIdsMap = new HashMap<>();
List<ResultSetFuture> futures = isEmpty(category) ?
tenants.stream()
.map(tenantId -> session.executeAsync(selectTagsTriggers.bind(tenantId, name)))
.collect(Collectors.toList()) :
tenants.stream()
.map(tenantId -> session.executeAsync(selectTagsTriggers.bind(tenantId, category, name)))
.collect(Collectors.toList());
List<ResultSet> rsTriggerIds = Futures.allAsList(futures).get();
rsTriggerIds.stream().forEach(rs -> {
for (Row row : rs) {
String tenantId = row.getString("tenantId");
Set<String> triggerIds = row.getSet("triggers", String.class);
Set<String> storedTriggerIds = tenantTriggerIdsMap.get(tenantId);
if (null != storedTriggerIds) {
triggerIds.addAll(storedTriggerIds);
}
tenantTriggerIdsMap.put(tenantId, triggerIds);
}
});

// Now, generate a cross-tenant result set if Triggers using the tenantIds and triggerIds
List<Trigger> triggers = new ArrayList<>();
PreparedStatement selectTrigger = CassStatement.get(session, CassStatement.SELECT_TRIGGER);
for (Map.Entry<String, Set<String>> entry : tenantTriggerIdsMap.entrySet()) {
String tenantId = entry.getKey();
Set<String> triggerIds = entry.getValue();
List<ResultSetFuture> futures = triggerIds.stream().map(triggerId ->
futures = triggerIds.stream().map(triggerId ->
session.executeAsync(selectTrigger.bind(tenantId, triggerId)))
.collect(Collectors.toList());
List<ResultSet> rsTriggers = Futures.allAsList(futures).get();
Expand Down Expand Up @@ -772,7 +788,7 @@ public Trigger copyTrigger(String tenantId, String triggerId, Map<String, String
for (Condition c : conditions) {
if (c instanceof CompareCondition) {
dataIdTokens.add(c.getDataId());
dataIdTokens.add(((CompareCondition) c).getData2Id());
dataIdTokens.add(((CompareCondition)c).getData2Id());
} else {
dataIdTokens.add(c.getDataId());
}
Expand Down Expand Up @@ -800,34 +816,34 @@ public Trigger copyTrigger(String tenantId, String triggerId, Map<String, String
if (c instanceof ThresholdCondition) {
newCondition = new ThresholdCondition(newTrigger.getId(), c.getTriggerMode(),
c.getConditionSetSize(), c.getConditionSetIndex(), dataIdMap.get(c.getDataId()),
((ThresholdCondition) c).getOperator(), ((ThresholdCondition) c).getThreshold());
((ThresholdCondition)c).getOperator(), ((ThresholdCondition)c).getThreshold());

} else if (c instanceof ThresholdRangeCondition) {
newCondition = new ThresholdRangeCondition(newTrigger.getId(), c.getTriggerMode(),
c.getConditionSetSize(), c.getConditionSetIndex(), dataIdMap.get(c.getDataId()),
((ThresholdRangeCondition) c).getOperatorLow(),
((ThresholdRangeCondition) c).getOperatorHigh(),
((ThresholdRangeCondition) c).getThresholdLow(),
((ThresholdRangeCondition) c).getThresholdHigh(),
((ThresholdRangeCondition) c).isInRange());
((ThresholdRangeCondition)c).getOperatorLow(),
((ThresholdRangeCondition)c).getOperatorHigh(),
((ThresholdRangeCondition)c).getThresholdLow(),
((ThresholdRangeCondition)c).getThresholdHigh(),
((ThresholdRangeCondition)c).isInRange());

} else if (c instanceof AvailabilityCondition) {
newCondition = new AvailabilityCondition(newTrigger.getId(), c.getTriggerMode(),
c.getConditionSetSize(), c.getConditionSetIndex(), dataIdMap.get(c.getDataId()),
((AvailabilityCondition) c).getOperator());
((AvailabilityCondition)c).getOperator());

} else if (c instanceof CompareCondition) {
newCondition = new CompareCondition(newTrigger.getId(), c.getTriggerMode(),
c.getConditionSetSize(), c.getConditionSetIndex(), dataIdMap.get(c.getDataId()),
((CompareCondition) c).getOperator(),
((CompareCondition) c).getData2Multiplier(),
dataIdMap.get(((CompareCondition) c).getData2Id()));
((CompareCondition)c).getOperator(),
((CompareCondition)c).getData2Multiplier(),
dataIdMap.get(((CompareCondition)c).getData2Id()));

} else if (c instanceof StringCondition) {
newCondition = new StringCondition(newTrigger.getId(), c.getTriggerMode(),
c.getConditionSetSize(), c.getConditionSetIndex(), dataIdMap.get(c.getDataId()),
((StringCondition) c).getOperator(), ((StringCondition) c).getPattern(),
((StringCondition) c).isIgnoreCase());
((StringCondition)c).getOperator(), ((StringCondition)c).getPattern(),
((StringCondition)c).isIgnoreCase());
}
if (newCondition != null) {
newCondition.setTenantId(newTrigger.getTenantId());
Expand Down Expand Up @@ -1235,15 +1251,15 @@ public Collection<Condition> setConditions(String tenantId, String triggerId, Tr

if (cond instanceof AvailabilityCondition) {

AvailabilityCondition aCond = (AvailabilityCondition) cond;
AvailabilityCondition aCond = (AvailabilityCondition)cond;
futures.add(session.executeAsync(insertConditionAvailability.bind(aCond.getTenantId(), aCond
.getTriggerId(),
aCond.getTriggerMode().name(), aCond.getConditionSetSize(), aCond.getConditionSetIndex(),
aCond.getConditionId(), aCond.getDataId(), aCond.getOperator().name())));

} else if (cond instanceof CompareCondition) {

CompareCondition cCond = (CompareCondition) cond;
CompareCondition cCond = (CompareCondition)cond;
dataIds.add(cCond.getData2Id());
futures.add(session.executeAsync(insertConditionCompare.bind(cCond.getTenantId(),
cCond.getTriggerId(), cCond.getTriggerMode().name(), cCond.getConditionSetSize(),
Expand All @@ -1252,31 +1268,31 @@ public Collection<Condition> setConditions(String tenantId, String triggerId, Tr

} else if (cond instanceof ExternalCondition) {

ExternalCondition eCond = (ExternalCondition) cond;
ExternalCondition eCond = (ExternalCondition)cond;
futures.add(session.executeAsync(insertConditionExternal.bind(eCond.getTenantId(), eCond
.getTriggerId(), eCond.getTriggerMode().name(), eCond.getConditionSetSize(),
eCond.getConditionSetIndex(), eCond.getConditionId(), eCond.getDataId(),
eCond.getSystemId(), eCond.getExpression())));

} else if (cond instanceof StringCondition) {

StringCondition sCond = (StringCondition) cond;
StringCondition sCond = (StringCondition)cond;
futures.add(session.executeAsync(insertConditionString.bind(sCond.getTenantId(), sCond
.getTriggerId(), sCond.getTriggerMode().name(), sCond.getConditionSetSize(),
sCond.getConditionSetIndex(), sCond.getConditionId(), sCond.getDataId(),
sCond.getOperator().name(), sCond.getPattern(), sCond.isIgnoreCase())));

} else if (cond instanceof ThresholdCondition) {

ThresholdCondition tCond = (ThresholdCondition) cond;
ThresholdCondition tCond = (ThresholdCondition)cond;
futures.add(session.executeAsync(insertConditionThreshold.bind(tCond.getTenantId(),
tCond.getTriggerId(), tCond.getTriggerMode().name(), tCond.getConditionSetSize(),
tCond.getConditionSetIndex(), tCond.getConditionId(), tCond.getDataId(),
tCond.getOperator().name(), tCond.getThreshold())));

} else if (cond instanceof ThresholdRangeCondition) {

ThresholdRangeCondition rCond = (ThresholdRangeCondition) cond;
ThresholdRangeCondition rCond = (ThresholdRangeCondition)cond;
futures.add(session.executeAsync(insertConditionThresholdRange.bind(rCond.getTenantId(),
rCond.getTriggerId(), rCond.getTriggerMode().name(), rCond.getConditionSetSize(),
rCond.getConditionSetIndex(), rCond.getConditionId(), rCond.getDataId(),
Expand Down Expand Up @@ -2217,17 +2233,17 @@ private void checkTenantId(String tenantId, Object obj) {
return;
}
if (obj instanceof Trigger) {
Trigger trigger = (Trigger) obj;
Trigger trigger = (Trigger)obj;
if (trigger.getTenantId() == null || !trigger.getTenantId().equals(tenantId)) {
trigger.setTenantId(tenantId);
}
} else if (obj instanceof Dampening) {
Dampening dampening = (Dampening) obj;
Dampening dampening = (Dampening)obj;
if (dampening.getTenantId() == null || !dampening.getTenantId().equals(tenantId)) {
dampening.setTenantId(tenantId);
}
} else if (obj instanceof Tag) {
Tag tag = (Tag) obj;
Tag tag = (Tag)obj;
if (tag.getTenantId() == null || !tag.getTenantId().equals(tenantId)) {
tag.setTenantId(tenantId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,11 @@ public class CassStatement {
public static final String SELECT_DAMPENING_ID;
public static final String SELECT_DAMPENINGS_ALL;
public static final String SELECT_DAMPENINGS_BY_TENANT;
public static final String SELECT_PARTITIONS_TAGS;
public static final String SELECT_TAGS;
public static final String SELECT_TAGS_BY_CATEGORY;
public static final String SELECT_TAGS_BY_CATEGORY_AND_NAME;
public static final String SELECT_TAGS_BY_NAME;
public static final String SELECT_TAGS_TRIGGERS_ALL_BY_CATEGORY_AND_NAME;
public static final String SELECT_TAGS_TRIGGERS_ALL_BY_NAME;
public static final String SELECT_TAGS_TRIGGERS_BY_CATEGORY;
public static final String SELECT_TAGS_TRIGGERS_BY_CATEGORY_AND_NAME;
public static final String SELECT_TAGS_TRIGGERS_BY_NAME;
Expand Down Expand Up @@ -290,6 +289,10 @@ public class CassStatement {
+ "FROM " + keyspace + ".dampenings "
+ "WHERE tenantId = ? ";

// This is for use as a pre-query to gather all partitions to be subsequently queried. If the
// partition key changes this should also change.
SELECT_PARTITIONS_TAGS = "SELECT DISTINCT tenantid FROM " + keyspace + ".triggers ";

SELECT_TAGS = "SELECT tenantId, triggerId, category, name, visible "
+ "FROM " + keyspace + ".tags "
+ "WHERE tenantId = ? AND triggerId = ? ORDER BY triggerId, name ";
Expand All @@ -306,23 +309,14 @@ public class CassStatement {
+ "FROM " + keyspace + ".tags "
+ "WHERE tenantId = ? AND triggerId = ? AND name = ? ";

SELECT_TAGS_TRIGGERS_ALL_BY_CATEGORY_AND_NAME = "SELECT tenantId, triggers "
+ "FROM " + keyspace + ".tags_triggers "
+ "WHERE category = ? AND name = ? ";

SELECT_TAGS_TRIGGERS_ALL_BY_NAME = "SELECT tenantId, triggers "
+ "FROM " + keyspace + ".tags_triggers "
+ "WHERE name = ? "
+ "ALLOW FILTERING";

SELECT_TAGS_TRIGGERS_BY_CATEGORY = "SELECT triggers FROM " + keyspace
SELECT_TAGS_TRIGGERS_BY_CATEGORY = "SELECT tenantId, triggers FROM " + keyspace
+ ".tags_triggers WHERE tenantId = ? AND category = ? ";

SELECT_TAGS_TRIGGERS_BY_CATEGORY_AND_NAME = "SELECT triggers "
SELECT_TAGS_TRIGGERS_BY_CATEGORY_AND_NAME = "SELECT tenantId, triggers "
+ "FROM " + keyspace + ".tags_triggers "
+ "WHERE tenantId = ? AND category = ? AND name = ? ";

SELECT_TAGS_TRIGGERS_BY_NAME = "SELECT triggers "
SELECT_TAGS_TRIGGERS_BY_NAME = "SELECT tenantId, triggers "
+ "FROM " + keyspace + ".tags_triggers "
+ "WHERE tenantId = ? AND name = ? ";

Expand Down

0 comments on commit 76e9846

Please sign in to comment.