From e40da89cc4f040b78b96534927d3297507342224 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Mon, 31 Jul 2017 17:22:18 +0800 Subject: [PATCH 1/2] fix a bug in PolicyResource.java --- .../metadata/resource/PolicyResource.java | 24 ++++++++ .../metadata/utils/PolicyIdConversions.java | 20 ++++--- .../dev/partials/alert/policyPrototypes.html | 55 ++++++++++--------- .../app/dev/public/js/ctrls/alertEditCtrl.js | 37 ++++++------- 4 files changed, 82 insertions(+), 54 deletions(-) diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java index d09da4b1de..9edfc3eed5 100644 --- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java +++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java @@ -21,6 +21,7 @@ import com.google.inject.Inject; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.coordinator.StreamPartition; import org.apache.eagle.alert.engine.interpreter.PolicyValidationResult; import org.apache.eagle.alert.metadata.resource.OpResult; import org.apache.eagle.common.rest.RESTResponse; @@ -72,6 +73,7 @@ public RESTResponse saveAsPolicyProto(PolicyEntity policyEntity, Preconditions.checkNotNull(policyEntity.getAlertPublishmentIds(), "alert publisher list should not be null"); PolicyDefinition policyDefinition = policyEntity.getDefinition(); + checkOutputStream(policyDefinition.getInputStreams(), policyDefinition.getOutputStreams()); OpResult result = metadataResource.addPolicy(policyDefinition); if (result.code != 200) { throw new IllegalArgumentException(result.message); @@ -153,6 +155,7 @@ public RESTResponse deletePolicyProto(@PathParam("uuid") String uuid) { private PolicyEntity importPolicyProto(PolicyEntity policyEntity) { PolicyDefinition policyDefinition = policyEntity.getDefinition(); + checkOutputStream(policyDefinition.getInputStreams(), policyDefinition.getOutputStreams()); List inputStreamType = new ArrayList<>(); String newDefinition = policyDefinition.getDefinition().getValue(); for (String inputStream : policyDefinition.getInputStreams()) { @@ -164,6 +167,7 @@ private PolicyEntity importPolicyProto(PolicyEntity policyEntity) { policyDefinition.getDefinition().setValue(newDefinition); policyDefinition.setName(PolicyIdConversions.parsePolicyId(policyDefinition.getSiteId(), policyDefinition.getName())); policyDefinition.setSiteId(null); + policyDefinition.getPartitionSpec().clear(); policyEntity.setDefinition(policyDefinition); return policyEntityService.createOrUpdatePolicyProto(policyEntity); } @@ -186,10 +190,19 @@ private Boolean exportPolicyProto(List policyProtoList, String sit policyDefinition.getDefinition().setValue(newDefinition); policyDefinition.setSiteId(site); policyDefinition.setName(PolicyIdConversions.generateUniquePolicyId(site, policyProto.getDefinition().getName())); + PolicyValidationResult validationResult = metadataResource.validatePolicy(policyDefinition); if (!validationResult.isSuccess() || validationResult.getException() != null) { throw new IllegalArgumentException(validationResult.getException()); } + + policyDefinition.getPartitionSpec().clear(); + for (StreamPartition sd : validationResult.getPolicyExecutionPlan().getStreamPartitions()) { + if (inputStreams.contains(sd.getStreamId())) { + policyDefinition.getPartitionSpec().add(sd); + } + } + OpResult result = metadataResource.addPolicy(policyDefinition); if (result.code != 200) { throw new IllegalArgumentException("fail to create policy: " + result.message); @@ -204,4 +217,15 @@ private Boolean exportPolicyProto(List policyProtoList, String sit return true; } + private void checkOutputStream(List inputStreams, List outputStreams) { + for (String inputStream : inputStreams) { + for (String outputStream : outputStreams) { + if (outputStream.contains(inputStream)) { + throw new IllegalArgumentException("OutputStream name should not contains string: " + inputStream + + ". Please rename your OutputStream name"); + } + } + } + } + } diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java index c9ccadcd73..2012a40a02 100644 --- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java +++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java @@ -22,17 +22,21 @@ public class PolicyIdConversions { public static String generateUniquePolicyId(String siteId, String policyName) { - return String.format("%s_%s", policyName, siteId); + String subffix = String.format("_%s", siteId.toLowerCase()); + if (policyName.toLowerCase().endsWith(subffix)) { + return policyName; + } + return String.format("%s_%s", policyName, siteId.toLowerCase()); } - public static String parsePolicyId(String siteId, String generatedUniquePolicyId) { - String subffix = String.format("_%s", siteId); - if (generatedUniquePolicyId.endsWith(subffix)) { - int streamTypeIdLength = generatedUniquePolicyId.length() - subffix.length(); - Preconditions.checkArgument(streamTypeIdLength > 0, "Invalid policyId: " + generatedUniquePolicyId + ", policyId is empty"); - return generatedUniquePolicyId.substring(0, streamTypeIdLength); + public static String parsePolicyId(String siteId, String policyName) { + String subffix = String.format("_%s", siteId.toLowerCase()); + if (policyName.toLowerCase().endsWith(subffix)) { + int streamTypeIdLength = policyName.length() - subffix.length(); + Preconditions.checkArgument(streamTypeIdLength > 0, "Invalid policyId: " + policyName + ", policyId is empty"); + return policyName.substring(0, streamTypeIdLength); } else { - return generatedUniquePolicyId; + return policyName; } } } diff --git a/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html b/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html index 9f22ce4231..0798475a88 100644 --- a/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html +++ b/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html @@ -27,35 +27,36 @@

- - - - - - - + + + + + + + - - - - - - - + + + + + + +
- - NameDefinitionPublishersOperation
+ + NameDefinitionOperation
- - {{item.name}}
{{item.definition.definition.value}}
-
    -
  • - {{publisher}} -
  • -
-
- - -
+ + {{item.name}}
{{item.definition.definition.value}}
+ + +
diff --git a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js index e4623b2a7a..fc2b494962 100644 --- a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js +++ b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js @@ -253,12 +253,12 @@ }; /*$scope.checkInputStream = function (streamId) { - if($scope.isInputStreamSelected(streamId)) { - $scope.policy.inputStreams = common.array.remove(streamId, $scope.policy.inputStreams); - } else { - $scope.policy.inputStreams.push(streamId); - } - };*/ + if($scope.isInputStreamSelected(streamId)) { + $scope.policy.inputStreams = common.array.remove(streamId, $scope.policy.inputStreams); + } else { + $scope.policy.inputStreams.push(streamId); + } + };*/ // ============================================================== // = Definition = @@ -558,7 +558,18 @@ }); } - policyPromise._then(function () { + policyPromise._then(function (res) { + var validate = res.data; + if (!validate.success) { + $.dialog({ + title: "OPS", + content: "Create policy failed: " + (res.data.message || res.data.errors) + }); + $scope.policyLock = false; + $scope.saveLock = false; + return; + } + console.log("Create policy success..."); $.dialog({ title: "Done", @@ -566,18 +577,6 @@ }, function () { $wrapState.go("policyDetail", {name: $scope.policy.name, siteId: $scope.policy.siteId}); }); - }, function (res) { - var errormsg = ""; - if(typeof res.data.message !== 'undefined') { - errormsg = res.data.message; - } else { - errormsg = res.data.errors; - } - $.dialog({ - title: "OPS", - content: "Create policy failed: " + errormsg - }); - $scope.policyLock = false; }); }, function (args) { $.dialog({ From a92cff8327d6f6ebef5775d92812648325b82ddf Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Mon, 31 Jul 2017 17:58:37 +0800 Subject: [PATCH 2/2] fix the inconsistency of event uuid --- .../impl/AlertBoltOutputCollectorWrapper.java | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java index 606ddce652..4c749f4188 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java @@ -49,28 +49,32 @@ public AlertBoltOutputCollectorWrapper(StreamOutputCollector outputCollector, Ob @Override public void emit(AlertStreamEvent event) { + if (event == null) { + return; + } + event.ensureAlertId(); Set clonedPublishPartitions = new HashSet<>(publishPartitions); for (PublishPartition publishPartition : clonedPublishPartitions) { // skip the publish partition which is not belong to this policy and also check streamId PublishPartition cloned = publishPartition.clone(); Optional.ofNullable(event) - .filter(x -> x != null - && x.getSchema() != null - && cloned.getPolicyId().equalsIgnoreCase(x.getPolicyId()) - && (cloned.getStreamId().equalsIgnoreCase(x.getSchema().getStreamId()) - || cloned.getStreamId().equalsIgnoreCase(Publishment.STREAM_NAME_DEFAULT))) - .ifPresent(x -> { - cloned.getColumns().stream() - .filter(y -> event.getSchema().getColumnIndex(y) >= 0 - && event.getSchema().getColumnIndex(y) < event.getSchema().getColumns().size()) - .map(y -> event.getData()[event.getSchema().getColumnIndex(y)]) - .filter(y -> y != null) - .forEach(y -> cloned.getColumnValues().add(y)); - synchronized (outputLock) { - streamContext.counter().incr("alert_count"); - delegate.emit(Arrays.asList(cloned, event)); - } - }); + .filter(x -> x != null + && x.getSchema() != null + && cloned.getPolicyId().equalsIgnoreCase(x.getPolicyId()) + && (cloned.getStreamId().equalsIgnoreCase(x.getSchema().getStreamId()) + || cloned.getStreamId().equalsIgnoreCase(Publishment.STREAM_NAME_DEFAULT))) + .ifPresent(x -> { + cloned.getColumns().stream() + .filter(y -> event.getSchema().getColumnIndex(y) >= 0 + && event.getSchema().getColumnIndex(y) < event.getSchema().getColumns().size()) + .map(y -> event.getData()[event.getSchema().getColumnIndex(y)]) + .filter(y -> y != null) + .forEach(y -> cloned.getColumnValues().add(y)); + synchronized (outputLock) { + streamContext.counter().incr("alert_count"); + delegate.emit(Arrays.asList(cloned, event)); + } + }); } }