From 41818fa290ebb4591459900a5aec4a4966bd6630 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=8D=A2=E6=98=A5=E4=BA=AE?= <946240095@qq.com>
Date: Mon, 6 May 2024 19:32:44 +0800
Subject: [PATCH] [INLONG-10117][SDK] Support to transform from PB protocol to
CSV/KV protocol by single SQL
---
inlong-sdk/transform-sdk/pom.xml | 5 +
.../inlong/sdk/transform/decode/JsonNode.java | 4 +-
.../inlong/sdk/transform/decode/PbNode.java | 95 +++++++
.../sdk/transform/decode/PbSourceData.java | 240 ++++++++++++++++++
.../sdk/transform/decode/PbSourceDecoder.java | 165 ++++++++++++
.../transform/decode/TransformException.java | 39 +++
.../sdk/transform/pojo/PbSourceInfo.java | 44 +++-
.../transform/process/TransformProcessor.java | 4 +
.../process/parser/ColumnParser.java | 2 +-
.../transform/pojo/TestTransformConfig.java | 7 +-
.../process/TestTransformProcessor.java | 117 +++++++--
11 files changed, 680 insertions(+), 42 deletions(-)
create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java
create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/TransformException.java
diff --git a/inlong-sdk/transform-sdk/pom.xml b/inlong-sdk/transform-sdk/pom.xml
index aeb3e100b05..8d089f41d02 100644
--- a/inlong-sdk/transform-sdk/pom.xml
+++ b/inlong-sdk/transform-sdk/pom.xml
@@ -53,6 +53,11 @@
org.xerial.snappy
snappy-java
+
+ com.google.protobuf
+ protobuf-java-util
+ ${protobuf.version}
+
diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonNode.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonNode.java
index e60406162f9..adf6cc10976 100644
--- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonNode.java
+++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonNode.java
@@ -33,12 +33,12 @@ public class JsonNode {
private int arrayIndex = -1;
public JsonNode(String nodeString) {
- int beginIndex = nodeString.indexOf('[');
+ int beginIndex = nodeString.indexOf('(');
if (beginIndex < 0) {
this.name = nodeString;
} else {
this.name = StringUtils.trim(nodeString.substring(0, beginIndex));
- int endIndex = nodeString.lastIndexOf(']');
+ int endIndex = nodeString.lastIndexOf(')');
if (endIndex >= 0) {
this.isArray = true;
this.arrayIndex = NumberUtils.toInt(nodeString.substring(beginIndex + 1, endIndex), -1);
diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java
new file mode 100644
index 00000000000..63fa6106d9d
--- /dev/null
+++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+import lombok.Data;
+import org.apache.commons.lang.math.NumberUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * PbNode
+ *
+ */
+@Data
+public class PbNode {
+
+ private String name;
+ private FieldDescriptor fieldDesc;
+ private Descriptors.Descriptor messageType;
+ private boolean isArray = false;
+ private int arrayIndex = -1;
+ private boolean isLastNode = false;
+
+ public PbNode(Descriptors.Descriptor messageDesc, String nodeString, boolean isLastNode) {
+ int beginIndex = nodeString.indexOf('(');
+ if (beginIndex < 0) {
+ this.name = nodeString;
+ this.fieldDesc = messageDesc.findFieldByName(name);
+ if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) {
+ this.messageType = this.fieldDesc.getMessageType();
+ }
+ } else {
+ this.name = StringUtils.trim(nodeString.substring(0, beginIndex));
+ this.fieldDesc = messageDesc.findFieldByName(name);
+ if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) {
+ this.messageType = this.fieldDesc.getMessageType();
+ }
+ int endIndex = nodeString.lastIndexOf(')');
+ if (endIndex >= 0) {
+ this.isArray = true;
+ this.arrayIndex = NumberUtils.toInt(nodeString.substring(beginIndex + 1, endIndex), -1);
+ if (this.arrayIndex < 0) {
+ this.arrayIndex = 0;
+ }
+ }
+ }
+ this.isLastNode = isLastNode;
+ }
+
+ /**
+ * parseNodePath
+ * @param rootDesc
+ * @param nodePath
+ * @return
+ */
+ public static List parseNodePath(Descriptors.Descriptor rootDesc, String nodePath) {
+ if (StringUtils.isBlank(nodePath)) {
+ return null;
+ }
+ List nodes = new ArrayList<>();
+ String[] nodeStrings = nodePath.split("\\.");
+ int lastIndex = nodeStrings.length - 1;
+ Descriptors.Descriptor current = rootDesc;
+ for (int i = 0; i <= lastIndex; i++) {
+ if (current == null) {
+ return null;
+ }
+ String nodeString = nodeStrings[i];
+ PbNode pbNode = new PbNode(current, nodeString, (i == lastIndex));
+ current = pbNode.getMessageType();
+ nodes.add(pbNode);
+ }
+ return nodes;
+ }
+}
diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
new file mode 100644
index 00000000000..147eaa67d13
--- /dev/null
+++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * JsonSourceData
+ *
+ */
+public class PbSourceData implements SourceData {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PbSourceData.class);
+
+ public static final String ROOT_KEY = "$root.";
+
+ public static final String CHILD_KEY = "$child.";
+
+ private Descriptors.Descriptor rootDesc;
+
+ private Descriptors.Descriptor childDesc;
+
+ private Map> columnNodeMap = new ConcurrentHashMap<>();
+
+ private DynamicMessage root;
+
+ private List childRoot;
+
+ private Charset srcCharset;
+
+ /**
+ * Constructor
+ */
+ public PbSourceData(DynamicMessage root, List childRoot,
+ Descriptors.Descriptor rootDesc, Descriptors.Descriptor childDesc,
+ Map> columnNodeMap,
+ Charset srcCharset) {
+ this.root = root;
+ this.childRoot = childRoot;
+ this.rootDesc = rootDesc;
+ this.childDesc = childDesc;
+ this.columnNodeMap = columnNodeMap;
+ this.srcCharset = srcCharset;
+ }
+
+ /**
+ * Constructor
+ */
+ public PbSourceData(DynamicMessage root,
+ Descriptors.Descriptor rootDesc,
+ Map> columnNodeMap,
+ Charset srcCharset) {
+ this.root = root;
+ this.rootDesc = rootDesc;
+ this.columnNodeMap = columnNodeMap;
+ this.srcCharset = srcCharset;
+ }
+
+ /**
+ * getRowCount
+ * @return
+ */
+ @Override
+ public int getRowCount() {
+ if (this.childRoot == null) {
+ return 1;
+ } else {
+ return this.childRoot.size();
+ }
+ }
+
+ /**
+ * getField
+ * @param rowNum
+ * @param fieldName
+ * @return
+ */
+ @Override
+ public String getField(int rowNum, String fieldName) {
+ String fieldValue = "";
+ try {
+ if (StringUtils.startsWith(fieldName, ROOT_KEY)) {
+ fieldValue = this.getRootField(fieldName);
+ } else if (StringUtils.startsWith(fieldName, CHILD_KEY)) {
+ if (childRoot != null && rowNum < childRoot.size()) {
+ fieldValue = this.getChildField(rowNum, fieldName);
+ }
+ }
+ return fieldValue;
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ return fieldValue;
+ }
+
+ /**
+ * getRootField
+ * @param fieldName
+ * @return
+ */
+ private String getRootField(String srcFieldName) {
+ List childNodes = this.columnNodeMap.get(srcFieldName);
+ if (childNodes == null) {
+ String fieldName = srcFieldName.substring(ROOT_KEY.length());
+ childNodes = PbNode.parseNodePath(rootDesc, fieldName);
+ if (childNodes == null) {
+ childNodes = new ArrayList<>();
+ }
+ this.columnNodeMap.put(srcFieldName, childNodes);
+ }
+ // error config
+ if (childNodes.size() == 0) {
+ return "";
+ }
+ // parse other node
+ String fieldValue = this.getNodeValue(childNodes, root);
+ return fieldValue;
+ }
+
+ /**
+ * getChildField
+ * @param rowNum
+ * @param srcFieldName
+ * @return
+ */
+ private String getChildField(int rowNum, String srcFieldName) {
+ if (this.childRoot == null || this.childDesc == null) {
+ return "";
+ }
+ List childNodes = this.columnNodeMap.get(srcFieldName);
+ if (childNodes == null) {
+ String fieldName = srcFieldName.substring(CHILD_KEY.length());
+ childNodes = PbNode.parseNodePath(childDesc, fieldName);
+ if (childNodes == null) {
+ childNodes = new ArrayList<>();
+ }
+ this.columnNodeMap.put(srcFieldName, childNodes);
+ }
+ // error config
+ if (childNodes.size() == 0) {
+ return "";
+ }
+ // parse other node
+ DynamicMessage child = childRoot.get(rowNum);
+ String fieldValue = this.getNodeValue(childNodes, child);
+ return fieldValue;
+ }
+
+ /**
+ * getNodeValue
+ * @param childNodes
+ * @param root
+ * @return
+ */
+ private String getNodeValue(List childNodes, DynamicMessage root) {
+ String fieldValue = "";
+ DynamicMessage current = root;
+ for (int i = 0; i < childNodes.size(); i++) {
+ PbNode node = childNodes.get(i);
+ Object nodeValue = current.getField(node.getFieldDesc());
+ if (nodeValue == null) {
+ // error data
+ break;
+ }
+ if (node.isLastNode()) {
+ switch (node.getFieldDesc().getJavaType()) {
+ case STRING:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case BOOLEAN:
+ fieldValue = String.valueOf(nodeValue);
+ break;
+ case BYTE_STRING:
+ ByteString byteString = (ByteString) nodeValue;
+ fieldValue = new String(byteString.toByteArray(), srcCharset);
+ break;
+ case ENUM:
+ fieldValue = String.valueOf(nodeValue);
+ break;
+ case MESSAGE:
+ fieldValue = String.valueOf(nodeValue);
+ break;
+ }
+ break;
+ }
+ if (!node.isArray()) {
+ if (!(nodeValue instanceof DynamicMessage)) {
+ // error data
+ break;
+ }
+ current = (DynamicMessage) nodeValue;
+ } else {
+ if (!(nodeValue instanceof List)) {
+ // error data
+ break;
+ }
+ List> nodeList = (List>) nodeValue;
+ if (node.getArrayIndex() >= nodeList.size()) {
+ // error data
+ break;
+ }
+ Object nodeElement = nodeList.get(node.getArrayIndex());
+ if (!(nodeElement instanceof DynamicMessage)) {
+ // error data
+ break;
+ }
+ current = (DynamicMessage) nodeElement;
+ }
+ }
+ return fieldValue;
+ }
+}
diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
new file mode 100644
index 00000000000..5ac13cf28f8
--- /dev/null
+++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
+
+import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * PbSourceDecoder
+ *
+ */
+public class PbSourceDecoder implements SourceDecoder {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PbSourceDecoder.class);
+
+ protected PbSourceInfo sourceInfo;
+ private Charset srcCharset = Charset.defaultCharset();
+ private String protoDescription;
+ private String rootMessageType;
+ private Descriptors.Descriptor rootDesc;
+ private String rowsNodePath;
+ private List childNodes;
+ private Descriptors.Descriptor childDesc;
+
+ private Map> columnNodeMap = new ConcurrentHashMap<>();
+
+ /**
+ * Constructor
+ * @param sourceInfo
+ * @throws DescriptorValidationException
+ * @throws InvalidProtocolBufferException
+ */
+ public PbSourceDecoder(PbSourceInfo sourceInfo) {
+ try {
+ this.sourceInfo = sourceInfo;
+ if (!StringUtils.isBlank(sourceInfo.getCharset())) {
+ this.srcCharset = Charset.forName(sourceInfo.getCharset());
+ }
+ this.protoDescription = sourceInfo.getProtoDescription();
+ this.rootMessageType = sourceInfo.getRootMessageType();
+ // parse description
+ byte[] protoBytes = Base64.getDecoder().decode(protoDescription);
+ DescriptorProtos.FileDescriptorSet descriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(protoBytes);
+ DescriptorProtos.FileDescriptorProto fileDesc = descriptorSet.getFile(0);
+ Descriptors.FileDescriptor fileDescriptor = Descriptors.FileDescriptor.buildFrom(fileDesc,
+ new Descriptors.FileDescriptor[]{});
+ this.rootDesc = fileDescriptor.findMessageTypeByName(rootMessageType);
+ // child
+ this.rowsNodePath = sourceInfo.getRowsNodePath();
+ this.childNodes = PbNode.parseNodePath(rootDesc, rowsNodePath);
+ if (this.childNodes != null && this.childNodes.size() > 0) {
+ this.childDesc = this.childNodes.get(this.childNodes.size() - 1).getMessageType();
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new TransformException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * decode
+ * @param srcBytes
+ * @param extParams
+ * @return
+ * @throws InvalidProtocolBufferException
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public SourceData decode(byte[] srcBytes, Map extParams) {
+ try {
+ // decode
+ DynamicMessage.Builder builder = DynamicMessage.newBuilder(rootDesc);
+ DynamicMessage root = builder.mergeFrom(srcBytes).build();
+ // child
+ List childRoot = null;
+ if (this.childNodes != null && this.childNodes.size() > 0) {
+ DynamicMessage current = root;
+ for (PbNode node : childNodes) {
+ Object nodeValue = current.getField(node.getFieldDesc());
+ if (nodeValue == null) {
+ // error data
+ return new PbSourceData(root, rootDesc, columnNodeMap, srcCharset);
+ }
+ if (node.isLastNode()) {
+ if (!(nodeValue instanceof List)) {
+ // error data
+ return new PbSourceData(root, rootDesc, columnNodeMap, srcCharset);
+ } else {
+ childRoot = (List) nodeValue;
+ break;
+ }
+ }
+ if (!node.isArray()) {
+ if (!(nodeValue instanceof DynamicMessage)) {
+ // error data
+ return new PbSourceData(root, rootDesc, columnNodeMap, srcCharset);
+ }
+ current = (DynamicMessage) nodeValue;
+ } else {
+ if (!(nodeValue instanceof List)) {
+ // error data
+ return new PbSourceData(root, rootDesc, columnNodeMap, srcCharset);
+ }
+ List> nodeList = (List>) nodeValue;
+ if (node.getArrayIndex() >= nodeList.size()) {
+ // error data
+ return new PbSourceData(root, rootDesc, columnNodeMap, srcCharset);
+ }
+ Object nodeElement = nodeList.get(node.getArrayIndex());
+ if (!(nodeElement instanceof DynamicMessage)) {
+ // error data
+ return new PbSourceData(root, rootDesc, columnNodeMap, srcCharset);
+ }
+ current = (DynamicMessage) nodeElement;
+ }
+ }
+ }
+ return new PbSourceData(root, childRoot, rootDesc, childDesc, columnNodeMap, srcCharset);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ return null;
+ }
+ }
+
+ /**
+ * decode
+ * @param srcString
+ * @param extParams
+ * @return
+ */
+ @Override
+ public SourceData decode(String srcString, Map extParams) {
+ byte[] srcBytes = Base64.getDecoder().decode(srcString);
+ return this.decode(srcBytes, extParams);
+ }
+}
diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/TransformException.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/TransformException.java
new file mode 100644
index 00000000000..3009acdac0b
--- /dev/null
+++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/TransformException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.decode;
+
+/**
+ * TransformException
+ *
+ */
+public class TransformException extends RuntimeException {
+
+ /**
+ * serialVersionUID long
+ */
+ private static final long serialVersionUID = -6459186664919206191L;
+
+ /**
+ * Constructor
+ * @param error
+ * @param e
+ */
+ public TransformException(String error, Exception e) {
+ super(error, e);
+ }
+}
diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/PbSourceInfo.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/PbSourceInfo.java
index ba226bd8071..5742a4af7a6 100644
--- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/PbSourceInfo.java
+++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/PbSourceInfo.java
@@ -28,34 +28,54 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public class PbSourceInfo extends SourceInfo {
- private String protoDefine;
+ private String protoDescription;
+ private String rootMessageType;
private String rowsNodePath;
@JsonCreator
public PbSourceInfo(
@JsonProperty("charset") String charset,
- @JsonProperty("protoDefine") String protoDefine,
+ @JsonProperty("protoDescription") String protoDescription,
+ @JsonProperty("rootMessageType") String rootMessageType,
@JsonProperty("rowsNodePath") String rowsNodePath) {
super(charset);
- this.protoDefine = protoDefine;
+ this.protoDescription = protoDescription;
+ this.rootMessageType = rootMessageType;
this.rowsNodePath = rowsNodePath;
}
/**
- * get protoDefine
- * @return the protoDefine
+ * get protoDescription
+ * @return the protoDescription
*/
- @JsonProperty("protoDefine")
- public String getProtoDefine() {
- return protoDefine;
+ @JsonProperty("protoDescription")
+ public String getProtoDescription() {
+ return protoDescription;
}
/**
- * set protoDefine
- * @param protoDefine the protoDefine to set
+ * set protoDescription
+ * @param protoDescription the protoDescription to set
*/
- public void setProtoDefine(String protoDefine) {
- this.protoDefine = protoDefine;
+ public void setProtoDescription(String protoDescription) {
+ this.protoDescription = protoDescription;
+ }
+
+ /**
+ * get rootMessageType
+ * @return the rootMessageType
+ */
+ @JsonProperty("rootMessageType")
+ public String getRootMessageType() {
+ return rootMessageType;
+ }
+
+ /**
+ * set rootMessageType
+ * @param rootMessageType the rootMessageType to set
+ */
+ public void setRootMessageType(String rootMessageType) {
+ this.rootMessageType = rootMessageType;
}
/**
diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index ad4c4b43041..23ca6644fa1 100644
--- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -20,6 +20,7 @@
import org.apache.inlong.sdk.transform.decode.CsvSourceDecoder;
import org.apache.inlong.sdk.transform.decode.JsonSourceDecoder;
import org.apache.inlong.sdk.transform.decode.KvSourceDecoder;
+import org.apache.inlong.sdk.transform.decode.PbSourceDecoder;
import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.decode.SourceDecoder;
import org.apache.inlong.sdk.transform.encode.CsvSinkEncoder;
@@ -33,6 +34,7 @@
import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
import org.apache.inlong.sdk.transform.pojo.SinkInfo;
import org.apache.inlong.sdk.transform.pojo.SourceInfo;
import org.apache.inlong.sdk.transform.pojo.TransformConfig;
@@ -108,6 +110,8 @@ private void initDecoder(TransformConfig config) {
this.decoder = new KvSourceDecoder((KvSourceInfo) sourceInfo);
} else if (sourceInfo instanceof JsonSourceInfo) {
this.decoder = new JsonSourceDecoder((JsonSourceInfo) sourceInfo);
+ } else if (sourceInfo instanceof PbSourceInfo) {
+ this.decoder = new PbSourceDecoder((PbSourceInfo) sourceInfo);
}
}
diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
index 1216cfff740..afc58b422e4 100644
--- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
+++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java
@@ -35,7 +35,7 @@ public ColumnParser(Column expr) {
}
public ColumnParser(Function expr) {
- this.fieldName = expr.toString().replace('(', '[').replace(')', ']');
+ this.fieldName = expr.toString();
}
/**
diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java
index 3e36bfda263..c26b3f9cca2 100644
--- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java
+++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/pojo/TestTransformConfig.java
@@ -80,7 +80,12 @@ public void testPb() {
ftime.setName("ftime");
List fields = new ArrayList<>();
fields.add(ftime);
- SourceInfo pbSource = new PbSourceInfo("UTF-8", "syntax = \"proto3\";", "root");
+ String transformBase64 = "CrcCCg90cmFuc2Zvcm0ucHJvdG8SBHRlc3QirQEKClNka01lc3NhZ2USEAoDbXNnGAEgASgMUg"
+ + "Ntc2cSGAoHbXNnVGltZRgCIAEoA1IHbXNnVGltZRI3CgdleHRpbmZvGAMgAygLMh0udGVzdC5TZGtNZXNzYWdlLk"
+ + "V4dGluZm9FbnRyeVIHZXh0aW5mbxo6CgxFeHRpbmZvRW50cnkSEAoDa2V5GAEgASgJUgNrZXkSFAoFdmFsdWUY"
+ + "AiABKAlSBXZhbHVlOgI4ASJmCg5TZGtEYXRhUmVxdWVzdBIQCgNzaWQYASABKAlSA3NpZBIkCgRtc2dzGAIgAygLMh"
+ + "AudGVzdC5TZGtNZXNzYWdlUgRtc2dzEhwKCXBhY2thZ2VJRBgDIAEoBFIJcGFja2FnZUlEYgZwcm90bzM=";
+ SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs");
SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
String transformSql = "select ftime from source";
TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql);
diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
index 498fad7f79b..282e45edfb2 100644
--- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
+++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java
@@ -23,6 +23,7 @@
import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.PbSourceInfo;
import org.apache.inlong.sdk.transform.pojo.SinkInfo;
import org.apache.inlong.sdk.transform.pojo.SourceInfo;
import org.apache.inlong.sdk.transform.pojo.TransformConfig;
@@ -31,6 +32,7 @@
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Base64;
import java.util.HashMap;
import java.util.List;
@@ -101,19 +103,7 @@ public void testKv2Csv() {
@Test
public void testJson2Csv() {
try {
- List fields = new ArrayList<>();
- FieldInfo sid = new FieldInfo();
- sid.setName("sid");
- fields.add(sid);
- FieldInfo packageID = new FieldInfo();
- packageID.setName("packageID");
- fields.add(packageID);
- FieldInfo msgTime = new FieldInfo();
- msgTime.setName("msgTime");
- fields.add(msgTime);
- FieldInfo msg = new FieldInfo();
- msg.setName("msg");
- fields.add(msg);
+ List fields = this.getTestFieldList();
SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "msgs");
SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
String transformSql = "select $root.sid,$root.packageID,$child.msgTime,$child.msg from source";
@@ -140,19 +130,7 @@ public void testJson2Csv() {
@Test
public void testJson2CsvForOne() {
try {
- List fields = new ArrayList<>();
- FieldInfo sid = new FieldInfo();
- sid.setName("sid");
- fields.add(sid);
- FieldInfo packageID = new FieldInfo();
- packageID.setName("packageID");
- fields.add(packageID);
- FieldInfo msgTime = new FieldInfo();
- msgTime.setName("msgTime");
- fields.add(msgTime);
- FieldInfo msg = new FieldInfo();
- msg.setName("msg");
- fields.add(msg);
+ List fields = this.getTestFieldList();
SourceInfo jsonSource = new JsonSourceInfo("UTF-8", "");
SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
String transformSql =
@@ -204,4 +182,91 @@ public void testKvCsvByJsonConfig() {
e.printStackTrace();
}
}
+
+ @Test
+ public void testPb2Csv() {
+ try {
+ List fields = this.getTestFieldList();
+ String transformBase64 = this.getPbTestDescription();
+ SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs");
+ SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+ String transformSql = "select $root.sid,$root.packageID,$child.msgTime,$child.msg from source";
+ TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql);
+ // case1
+ TransformProcessor processor = new TransformProcessor(config);
+ byte[] srcBytes = this.getPbTestData();
+ List output = processor.transform(srcBytes, new HashMap<>());
+ Assert.assertTrue(output.size() == 2);
+ Assert.assertEquals(output.get(0), "sid|1|1713243918000|msgValue4");
+ Assert.assertEquals(output.get(1), "sid|1|1713243918002|msgValue42");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private List getTestFieldList() {
+ List fields = new ArrayList<>();
+ FieldInfo sid = new FieldInfo();
+ sid.setName("sid");
+ fields.add(sid);
+ FieldInfo packageID = new FieldInfo();
+ packageID.setName("packageID");
+ fields.add(packageID);
+ FieldInfo msgTime = new FieldInfo();
+ msgTime.setName("msgTime");
+ fields.add(msgTime);
+ FieldInfo msg = new FieldInfo();
+ msg.setName("msg");
+ fields.add(msg);
+ return fields;
+ }
+
+ private byte[] getPbTestData() {
+ String srcString =
+ "CgNzaWQSIAoJbXNnVmFsdWU0ELCdrqruMRoMCgNrZXkSBXZhbHVlEiMKCm1zZ1ZhbHVlNDIQsp2uqu4xGg4KBGtleTISBnZhbHVlMhgB";
+ byte[] srcBytes = Base64.getDecoder().decode(srcString);
+ return srcBytes;
+ }
+
+ private String getPbTestDescription() {
+ final String transformProto = "syntax = \"proto3\";\n"
+ + "package test;\n"
+ + "message SdkMessage {\n"
+ + " bytes msg = 1;\n"
+ + " int64 msgTime = 2;\n"
+ + " map extinfo = 3;\n"
+ + "}\n"
+ + "message SdkDataRequest {\n"
+ + " string sid = 1;\n"
+ + " repeated SdkMessage msgs = 2;\n"
+ + " uint64 packageID = 3;\n"
+ + "}";
+ String transformBase64 = "CrcCCg90cmFuc2Zvcm0ucHJvdG8SBHRlc3QirQEKClNka01lc3NhZ2USEAoDbXNnGAEgASgMUg"
+ + "Ntc2cSGAoHbXNnVGltZRgCIAEoA1IHbXNnVGltZRI3CgdleHRpbmZvGAMgAygLMh0udGVzdC5TZGtNZXNzYWdlLk"
+ + "V4dGluZm9FbnRyeVIHZXh0aW5mbxo6CgxFeHRpbmZvRW50cnkSEAoDa2V5GAEgASgJUgNrZXkSFAoFdmFsdWUY"
+ + "AiABKAlSBXZhbHVlOgI4ASJmCg5TZGtEYXRhUmVxdWVzdBIQCgNzaWQYASABKAlSA3NpZBIkCgRtc2dzGAIgAygLMh"
+ + "AudGVzdC5TZGtNZXNzYWdlUgRtc2dzEhwKCXBhY2thZ2VJRBgDIAEoBFIJcGFja2FnZUlEYgZwcm90bzM=";
+ return transformBase64;
+ }
+
+ @Test
+ public void testPb2CsvForOne() {
+ try {
+ List fields = this.getTestFieldList();
+ String transformBase64 = this.getPbTestDescription();
+ SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", null);
+ SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
+ String transformSql =
+ "select $root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source";
+ TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql);
+ // case1
+ TransformProcessor processor = new TransformProcessor(config);
+ byte[] srcBytes = this.getPbTestData();
+ List output = processor.transform(srcBytes, new HashMap<>());
+ Assert.assertTrue(output.size() == 1);
+ Assert.assertEquals(output.get(0), "sid|1|1713243918002|msgValue4");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
}