Skip to content

Commit

Permalink
[ISSUE #4621] Implement TransformerEngine for EventMesh Transformer (#…
Browse files Browse the repository at this point in the history
…4622)

* [ISSUE #4621] Implement TransformerEngine for EventMesh Transformer

* fix ci check error
  • Loading branch information
xwm1992 committed Dec 7, 2023
1 parent a170a79 commit 8c59c7f
Show file tree
Hide file tree
Showing 12 changed files with 314 additions and 53 deletions.
Expand Up @@ -92,6 +92,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {

private FilterEngine filterEngine;

private TransformerEngine transformerEngine;

private HttpRetryer httpRetryer;

private transient RateLimiter msgRateLimiter;
Expand Down Expand Up @@ -137,6 +139,8 @@ public void init() throws Exception {

filterEngine = new FilterEngine(metaStorage, producerManager, consumerManager);

transformerEngine = new TransformerEngine(metaStorage, producerManager, consumerManager);

super.setHandlerService(new HandlerService());
super.getHandlerService().setMetrics(this.getMetrics());

Expand Down Expand Up @@ -180,6 +184,8 @@ public void shutdown() throws Exception {

filterEngine.shutdown();

transformerEngine.shutdown();

consumerManager.shutdown();

httpClientPool.shutdown();
Expand Down Expand Up @@ -354,6 +360,10 @@ public FilterEngine getFilterEngine() {
return filterEngine;
}

public TransformerEngine getTransformerEngine() {
return transformerEngine;
}

public MetaStorage getMetaStorage() {
return metaStorage;
}
Expand Down
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.runtime.boot;

import org.apache.eventmesh.api.meta.MetaServiceListener;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerGroupManager;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager;
import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
import org.apache.eventmesh.runtime.meta.MetaStorage;
import org.apache.eventmesh.transformer.Transformer;
import org.apache.eventmesh.transformer.TransformerBuilder;
import org.apache.eventmesh.transformer.TransformerParam;

import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.databind.JsonNode;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TransformerEngine {

/**
* key:group-topic
**/
private final Map<String, Transformer> transformerMap = new HashMap<>();

private final String transformerPrefix = "transformer-";

private final MetaStorage metaStorage;

private MetaServiceListener metaServiceListener;

private final ProducerManager producerManager;

private final ConsumerManager consumerManager;

private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

public TransformerEngine(MetaStorage metaStorage, ProducerManager producerManager, ConsumerManager consumerManager) {
this.metaStorage = metaStorage;
this.producerManager = producerManager;
this.consumerManager = consumerManager;
}

public void start() {
Map<String, String> transformerMetaData = metaStorage.getMetaData(transformerPrefix, true);
for (Entry<String, String> transformerDataEntry : transformerMetaData.entrySet()) {
// transformer-group
String key = transformerDataEntry.getKey();
// topic-transformerParam list
String value = transformerDataEntry.getValue();
updateTransformerMap(key, value);
}
metaServiceListener = this::updateTransformerMap;

// addListeners for producerManager & consumerManager
scheduledExecutorService.scheduleAtFixedRate(() -> {
ConcurrentHashMap<String, EventMeshProducer> producerMap = producerManager.getProducerTable();
for (String producerGroup : producerMap.keySet()) {
for (String transformerKey : transformerMap.keySet()) {
if (!StringUtils.contains(transformerKey, producerGroup)) {
addTransformerListener(producerGroup);
LogUtils.info(log, "addTransformerListener for producer group: " + producerGroup);
}
}
}
ConcurrentHashMap<String, ConsumerGroupManager> consumerMap = consumerManager.getClientTable();
for (String consumerGroup : consumerMap.keySet()) {
for (String transformerKey : transformerMap.keySet()) {
if (!StringUtils.contains(transformerKey, consumerGroup)) {
addTransformerListener(consumerGroup);
LogUtils.info(log, "addTransformerListener for consumer group: " + consumerGroup);
}
}
}
}, 10_000, 5_000, TimeUnit.MILLISECONDS);
}

private void updateTransformerMap(String key, String value) {
String group = StringUtils.substringAfter(key, transformerPrefix);

JsonNode transformerJsonNodeArray = JsonUtils.getJsonNode(value);

if (transformerJsonNodeArray != null) {
for (JsonNode transformerJsonNode : transformerJsonNodeArray) {
String topic = transformerJsonNode.get("topic").asText();
String transformerParam = transformerJsonNode.get("transformerParam").toString();
TransformerParam tfp = JsonUtils.parseObject(transformerParam, TransformerParam.class);
Transformer transformer = TransformerBuilder.buildTransformer(tfp);
transformerMap.put(group + "-" + topic, transformer);
}
}
addTransformerListener(group);
}

public void addTransformerListener(String group) {
String transformerKey = transformerPrefix + group;
try {
metaStorage.getMetaDataWithListener(metaServiceListener, transformerKey);
} catch (Exception e) {
throw new RuntimeException("addTransformerListener exception", e);
}
}

public void shutdown() {
scheduledExecutorService.shutdown();
}

public Transformer getTransformer(String key) {
return transformerMap.get(key);
}

}
Expand Up @@ -45,6 +45,7 @@
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants;
import org.apache.eventmesh.transformer.Transformer;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -161,6 +162,7 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final
final String topic = event.getSubject();

Pattern filterPattern = eventMeshHTTPServer.getFilterEngine().getFilterPattern(producerGroup + "-" + topic);
Transformer transformer = eventMeshHTTPServer.getTransformerEngine().getTransformer(producerGroup + "-" + topic);

// validate body
if (StringUtils.isAnyBlank(bizNo, uniqueId, producerGroup, topic)
Expand Down Expand Up @@ -252,6 +254,14 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final
isFiltered = filterPattern.filter(JsonUtils.toJSONString(event));
}

// apply transformer
if (isFiltered && transformer != null) {
String data = transformer.transform(JsonUtils.toJSONString(event));
event = CloudEventBuilder.from(event).withData(Objects.requireNonNull(JsonUtils.toJSONString(data))
.getBytes(StandardCharsets.UTF_8)).build();
sendMessageContext.setEvent(event);
}

if (isFiltered) {
eventMeshProducer.send(sendMessageContext, new SendCallback() {

Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.WebhookUtil;
import org.apache.eventmesh.transformer.Transformer;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
Expand All @@ -53,6 +54,7 @@

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -138,6 +140,20 @@ public void tryHTTPRequest() {
return;
}
}
Transformer transformer = eventMeshHTTPServer.getTransformerEngine()
.getTransformer(handleMsgContext.getConsumerGroup() + "-" + handleMsgContext.getTopic());
if (transformer != null) {
try {
String data = transformer.transform(JsonUtils.toJSONString(event));
event = CloudEventBuilder.from(event).withData(Objects.requireNonNull(JsonUtils.toJSONString(data))
.getBytes(StandardCharsets.UTF_8)).build();
} catch (Exception exception) {
LOGGER.warn("apply transformer to cloudevents error, group:{}, topic:{}, bizSeqNo={}, uniqueId={}",
this.handleMsgContext.getConsumerGroup(),
this.handleMsgContext.getTopic(), this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId(), exception);
return;
}
}
handleMsgContext.setEvent(event);
super.setEvent(event);

Expand Down
Expand Up @@ -72,8 +72,8 @@ public List<Variable> match(String json) throws JsonProcessingException {

List<Variable> variableList = new ArrayList<>(variablesList.size());
for (Variable element : variablesList) {
if (JsonPathUtils.isValidAndDefinite(element.getJsonPath())) {
String res = JsonPathUtils.matchJsonPathValueWithString(json, element.getJsonPath());
if (JsonPathUtils.isValidAndDefinite(element.getValue())) {
String res = JsonPathUtils.matchJsonPathValueWithString(json, element.getValue());
Variable variable = new Variable(element.getName(), res);
variableList.add(variable);
} else {
Expand Down
Expand Up @@ -34,8 +34,8 @@ public Template(String template) {
public String substitute(List<Variable> variables) throws TransformException {

Map<String, String> valuesMap = variables.stream()
.filter(variable -> variable.getJsonPath() != null)
.collect(Collectors.toMap(Variable::getName, Variable::getJsonPath));
.filter(variable -> variable.getValue() != null)
.collect(Collectors.toMap(Variable::getName, Variable::getValue));
StringSubstitutor sub = new StringSubstitutor(valuesMap);

return sub.replace(template);
Expand Down
Expand Up @@ -37,8 +37,7 @@ public String transform(String json) throws JsonProcessingException {
// 1: get variable match results
List<Variable> variableList = jsonPathParser.match(json);
// 2: use results replace template
String res = template.substitute(variableList);
return res;
return template.substitute(variableList);
}

}
Expand Up @@ -19,41 +19,20 @@

public class TransformerBuilder {

public static final class Builder {

private final TransformerType transformerType;
private String template;
private String content;

public Builder(TransformerType transformerType) {
this.transformerType = transformerType;
}

public Builder setContent(String content) {
this.content = content;
return this;
}

public Builder setTemplate(String template) {
this.template = template;
return this;
}

public Transformer build() {
switch (this.transformerType) {
case CONSTANT:
return buildConstantTransformer(this.content);
case ORIGINAL:
return buildOriginalTransformer();
case TEMPLATE:
return buildTemplateTransFormer(this.content, this.template);
default:
throw new TransformException("invalid config");
}
public static Transformer buildTransformer(TransformerParam transformerParam) {
switch (transformerParam.getTransformerType()) {
case ORIGINAL:
return buildOriginalTransformer();
case CONSTANT:
return buildConstantTransformer(transformerParam.getValue());
case TEMPLATE:
return buildTemplateTransFormer(transformerParam.getValue(), transformerParam.getTemplate());
default:
throw new TransformException("invalid config");
}

}


public static Transformer buildTemplateTransFormer(String jsonContent, String template) {
JsonPathParser jsonPathParser = new JsonPathParser(jsonContent);
Template templateEntry = new Template(template);
Expand Down
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.transformer;

public class TransformerParam {

private TransformerType transformerType;
private String value;
private String template;

public TransformerParam() {
}

public TransformerParam(TransformerType transformerType, String value, String template) {
this.transformerType = transformerType;
this.value = value;
this.template = template;
}

public TransformerParam(TransformerType transformerType, String value) {
this(transformerType, value, null);
}

public TransformerType getTransformerType() {
return transformerType;
}

public void setTransformerType(TransformerType transformerType) {
this.transformerType = transformerType;
}

public String getValue() {
return value;
}

public void setValue(String value) {
this.value = value;
}

public String getTemplate() {
return template;
}

public void setTemplate(String template) {
this.template = template;
}

}

0 comments on commit 8c59c7f

Please sign in to comment.