Skip to content

Commit

Permalink
[WIP][ISSUE #4043]initial implementation of filter and transform (#4365)
Browse files Browse the repository at this point in the history
* initial implementation -informal

* initial implementation -informal

* fix code style problem

* remove unnecessary code snippets.

* add license header

* add some dep

* fix JsonPathUtils checkstyle problem

* fix test class checkstyle problem

* fix CI problem
  • Loading branch information
pmupkin committed Oct 25, 2023
1 parent 7634404 commit a717d35
Show file tree
Hide file tree
Showing 25 changed files with 1,564 additions and 0 deletions.
5 changes: 5 additions & 0 deletions eventmesh-common/build.gradle
Expand Up @@ -25,6 +25,11 @@ dependencies {
api "org.apache.commons:commons-text"
api "org.apache.commons:commons-lang3"

implementation group: 'com.jayway.jsonpath', name: 'json-path', version: '2.7.0'
implementation 'commons-net:commons-net:3.9.0'
api "io.cloudevents:cloudevents-core"
api "io.cloudevents:cloudevents-json-jackson"

implementation "org.apache.logging.log4j:log4j-api"
implementation "org.apache.logging.log4j:log4j-core"
implementation "org.apache.logging.log4j:log4j-slf4j-impl"
Expand Down
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.filter;

import org.apache.eventmesh.common.filter.condition.Condition;

import java.util.ArrayList;
import java.util.List;

import com.fasterxml.jackson.databind.JsonNode;

public class PatternEntry {

private final String patternPath;

private final List<Condition> conditionList = new ArrayList<>();

public PatternEntry(final String patternPath) {
this.patternPath = patternPath;
}

public void addRuleCondition(Condition patternCondition) {
this.conditionList.add(patternCondition);
}

public String getPatternName() {
return "123";
}

public String getPatternPath() {
return patternPath;
}

public boolean match(JsonNode jsonElement) {
for (final Condition patternCondition : conditionList) {
if (patternCondition.match(jsonElement)) {
return true;
}
}

return false;

}

/**
* Returns the condition list for test only
*
* @return the condition list
*/
List<Condition> getConditionList() {
return conditionList;
}
}
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.filter.condition;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;

import com.fasterxml.jackson.databind.JsonNode;

class AnythingButCondition implements Condition {

private List<Condition> conditionList = new ArrayList<>();

AnythingButCondition(JsonNode condition) {

if (condition.isValueNode()) {
this.conditionList.add(new SpecifiedCondition(condition));
}
// []
if (condition.isArray()) {
for (JsonNode element : condition) {
this.conditionList.add(new SpecifiedCondition(element));
}
}

// prefix,suffix
if (condition.isObject()) {
Iterator<Entry<String, JsonNode>> iterator = condition.fields();
while (iterator.hasNext()) {
Entry<String, JsonNode> entry = iterator.next();
String key = entry.getKey();
JsonNode value = entry.getValue();
Condition condition1 = new ConditionsBuilder().withKey(key).withParams(value).build();
this.conditionList.add(condition1);
}
}

}

@Override
public boolean match(JsonNode inputEvent) {
if (inputEvent == null) {
return false;
}

for (Condition condition : conditionList) {
if (condition.match(inputEvent)) {
return false;
}
}
return true;
}

}
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.filter.condition;

import com.fasterxml.jackson.databind.JsonNode;

/**
*
*/
public interface Condition {

boolean match(JsonNode inputEvent);

}
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.filter.condition;

import com.fasterxml.jackson.databind.JsonNode;

public class ConditionsBuilder {

private String key;
private JsonNode jsonNode;

public ConditionsBuilder withKey(String key) {
this.key = key;
return this;
}

public ConditionsBuilder withParams(JsonNode jsonNode) {
this.jsonNode = jsonNode;
return this;
}

public Condition build() {
Condition condition = null;
switch (this.key) {
case "prefix":
condition = new PrefixxCondition(this.jsonNode);
break;
case "suffix":
condition = new SuffixCondition(this.jsonNode);
break;
case "anything-but":
condition = new AnythingButCondition(this.jsonNode);
break;
case "numeric":
condition = new NumericCondition(this.jsonNode);
break;
case "exists":
condition = new ExistsCondition(this.jsonNode);
break;
case "specified":
condition = new SpecifiedCondition(this.jsonNode);
break;
default:
throw new RuntimeException("INVALID CONDITION");
// Add cases for other keys and conditions
}

return condition;
}
}
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.filter.condition;

import com.fasterxml.jackson.databind.JsonNode;

class ExistsCondition implements Condition {

private JsonNode exists;

ExistsCondition(JsonNode exists) {
this.exists = exists;
}

@Override
public boolean match(JsonNode inputEvent) {

return this.exists.asBoolean() == (inputEvent != null);
}
}
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.filter.condition;

import java.util.ArrayList;
import java.util.List;

import com.fasterxml.jackson.databind.JsonNode;

class NumericCondition implements Condition {

List<String> operators = new ArrayList<>();
List<Double> nums = new ArrayList<>();

NumericCondition(JsonNode numeric) {
if (numeric.isArray()) {
if (numeric.size() % 2 != 0) {
throw new RuntimeException("NUMERIC NO RIGHT FORMAT");
}
for (int i = 0; i < numeric.size(); i += 2) {
JsonNode opt = numeric.get(i);
JsonNode number = numeric.get(i + 1);
operators.add(opt.asText());
nums.add(number.asDouble());
}
} else {
throw new RuntimeException("NUMERIC MUST BE ARRAY");
}
}

private boolean compareNums(Double rule, Double target, String opt) {
int res = Double.compare(target, rule);
switch (opt) {
case "=":
return res == 0;
case "!=":
return res != 0;
case ">":
return res > 0;
case ">=":
return res >= 0;
case "<":
return res < 0;
case "<=":
return res <= 0;
default: // Never be here
return false;
}
}

@Override
public boolean match(JsonNode inputEvent) {
if (inputEvent.isNumber()) {
for (int i = 0; i < operators.size(); ++i) {
if (!compareNums(nums.get(i), inputEvent.asDouble(), operators.get(i))) {
return false;
}

}

}

return true;
}
}

0 comments on commit a717d35

Please sign in to comment.