From 95cc99b76941916a21930ba26d8f12639175054d Mon Sep 17 00:00:00 2001 From: wujinhu Date: Mon, 27 Feb 2017 17:38:49 +0800 Subject: [PATCH] support sequence in policy definition --- .../PolicyExecutionPlannerImpl.java | 3 +- .../interpreter/PolicyInterpreterTest.java | 72 +++++++++++++++++++ 2 files changed, 74 insertions(+), 1 deletion(-) diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java index e30b3de9c4..4e6901ddc5 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java @@ -199,7 +199,8 @@ private PolicyExecutionPlan doParse() throws Exception { for (Map.Entry> entry : streamGroupBy.entrySet()) { if (entry.getValue().size() > 0) { StreamPartition partition = generatePartition(entry.getKey(), null, Arrays.asList(entry.getValue().toArray(new Variable[entry.getValue().size()]))); - if (((StateInputStream) inputStream).getStateType().equals(StateInputStream.Type.PATTERN)) { + if (((StateInputStream) inputStream).getStateType().equals(StateInputStream.Type.PATTERN) + || ((StateInputStream) inputStream).getStateType().equals(StateInputStream.Type.SEQUENCE)) { if (effectivePartitions.containsKey(partition.getStreamId())) { StreamPartition existingPartition = effectivePartitions.get(partition.getStreamId()); if (!existingPartition.equals(partition) diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java index 1553e17366..4047fc1afc 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java @@ -483,4 +483,76 @@ public void testValidPolicyWithPatternSort() { Assert.assertArrayEquals(new String[]{"site", "host", "component", "metric"}, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getColumns().toArray()); Assert.assertEquals("HADOOP_JMX_METRIC_STREAM_1", validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getStreamId()); } + + @Test + public void testValidPolicyWithSequence() { + PolicyDefinition policyDefinition = new PolicyDefinition(); + policyDefinition.setName("test_policy"); + policyDefinition.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1")); + policyDefinition.setOutputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT")); + + PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); + definition.setType("siddhi"); + String policy = + "from every a = HADOOP_JMX_METRIC_STREAM_1[component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] " + + ", b = HADOOP_JMX_METRIC_STREAM_1[b.component==a.component and b.metric==a.metric and b.host==a.host and convert(b.value, \"long\") > convert(a.value, \"long\") ] " + + "select b.metric, b.host as host, convert(b.value, \"long\") as newNumOfMissingBlocks, convert(a.value, \"long\") as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, b.site as site " + + "group by b.metric insert into HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT;"; + definition.setValue(policy); + definition.setInputStreams(policyDefinition.getInputStreams()); + definition.setOutputStreams(policyDefinition.getOutputStreams()); + policyDefinition.setDefinition(definition); + + PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap() { + { + put("HADOOP_JMX_METRIC_STREAM_1", mockStreamDefinition("HADOOP_JMX_METRIC_STREAM_1")); + } + }); + Assert.assertTrue(validation.isSuccess()); + Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size()); + Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size()); + Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size()); + Assert.assertNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec()); + Assert.assertEquals(StreamPartition.Type.GROUPBY, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getType()); + Assert.assertArrayEquals(new String[]{"metric"}, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getColumns().toArray()); + Assert.assertEquals("HADOOP_JMX_METRIC_STREAM_1", validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getStreamId()); + } + + @Test + public void testValidPolicyWithSequenceSort() { + PolicyDefinition policyDefinition = new PolicyDefinition(); + policyDefinition.setName("test_policy"); + policyDefinition.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1")); + policyDefinition.setOutputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT")); + + PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); + definition.setType("siddhi"); + String policy = + "from HADOOP_JMX_METRIC_STREAM_1[metric == \"hadoop.namenode.dfs.missingblocks\"]#window.externalTime(timestamp, 1 min) " + + "select * group by site, host, component, metric insert into temp;\n" + + "\n" + + "from every a = HADOOP_JMX_METRIC_STREAM_1[metric == \"hadoop.namenode.dfs.missingblocks\"], b = HADOOP_JMX_METRIC_STREAM_1[b.component == a.component and b.metric == a.metric and b.host == a.host and convert(b.value, \"long\") > convert(a.value, \"long\") ] " + + "select b.site, b.host, b.component, b.metric, convert(b.value, \"long\") as newNumOfMissingBlocks, convert(a.value, \"long\") as oldNumOfMissingBlocks, max(b.timestamp) as timestamp " + + "group by b.site, b.host, b.component, b.metric insert into HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT;"; + definition.setValue(policy); + definition.setInputStreams(policyDefinition.getInputStreams()); + definition.setOutputStreams(policyDefinition.getOutputStreams()); + policyDefinition.setDefinition(definition); + + PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap() { + { + put("HADOOP_JMX_METRIC_STREAM_1", mockStreamDefinition("HADOOP_JMX_METRIC_STREAM_1")); + } + }); + Assert.assertTrue(validation.isSuccess()); + Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size()); + Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size()); + Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size()); + Assert.assertNotNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec()); + Assert.assertEquals(60000, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis()); + Assert.assertEquals(12000, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec().getWindowMargin()); + Assert.assertEquals(StreamPartition.Type.GROUPBY, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getType()); + Assert.assertArrayEquals(new String[]{"site", "host", "component", "metric"}, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getColumns().toArray()); + Assert.assertEquals("HADOOP_JMX_METRIC_STREAM_1", validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getStreamId()); + } } \ No newline at end of file