diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AbstractSourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AbstractSourceData.java index c7e40282ae7..0caf9619eb5 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AbstractSourceData.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AbstractSourceData.java @@ -29,11 +29,11 @@ public abstract class AbstractSourceData implements SourceData { protected Context context; - protected boolean isContextField(String fieldName) { + public boolean isContextField(String fieldName) { return fieldName.startsWith(CTX_KEY); } - protected String getContextField(String fieldName) { + public String getContextField(String fieldName) { if (context == null) { return ""; } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java index 310f6b1abb8..7351958216b 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java @@ -23,6 +23,8 @@ import lombok.Data; import org.apache.commons.lang.math.NumberUtils; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; @@ -34,63 +36,118 @@ @Data public class PbNode { + public static final Logger LOG = LoggerFactory.getLogger(PbNode.class); + private String name; private FieldDescriptor fieldDesc; - private Descriptors.Descriptor messageType; private boolean isLastNode = false; - private boolean isArray = false; - private int arrayIndex = -1; - private boolean isMap = false; + // primitive + private boolean isPrimitiveType = false; + // array + private boolean isArrayType = false; + private boolean hasArrayIndex = false; + private Integer arrayIndex; + // struct + private boolean isStructType = false; + // map private boolean isMapType = false; - private String mapKey = ""; + private boolean hasMapKey = false; + private Object mapKey; private FieldDescriptor mapKeyDesc; private FieldDescriptor mapValueDesc; + // parent path + private String parentPath; + private String currentPath; + private String currentIndexPath; - public PbNode(Descriptors.Descriptor messageDesc, String nodeString, boolean isLastNode) { - int beginIndex = nodeString.indexOf('('); - if (beginIndex < 0) { - this.name = nodeString; - if (isMapDescriptor(messageDesc)) { - FieldDescriptor valueFieldDesc = messageDesc.getFields().get(1); - Descriptors.Descriptor valueTypeDesc = valueFieldDesc.getMessageType(); - this.fieldDesc = valueTypeDesc.findFieldByName(name); - if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) { - this.messageType = this.fieldDesc.getMessageType(); + public PbNode(Descriptors.Descriptor parentDesc, String parentPath, String nodeString, boolean isLastNode) { + try { + if (parentDesc == null) { + return; + } + this.isLastNode = isLastNode; + // parse name & index + int beginIndex = nodeString.indexOf('('); + String indexString = null; + if (beginIndex < 0) { + this.name = nodeString; + } else { + this.name = StringUtils.trim(nodeString.substring(0, beginIndex)); + int endIndex = nodeString.lastIndexOf(')'); + if (endIndex >= 0) { + indexString = nodeString.substring(beginIndex + 1, endIndex); } + } + // cache path key + this.parentPath = parentPath; + if (this.parentPath == null) { + this.currentPath = this.name; + this.currentIndexPath = nodeString; } else { - this.fieldDesc = messageDesc.findFieldByName(name); - if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) { - this.messageType = this.fieldDesc.getMessageType(); - if (isMapDescriptor(messageType)) { - this.isMapType = true; - } + this.currentPath = this.parentPath + "." + this.name; + this.currentIndexPath = this.parentPath + "." + nodeString; + } + // field desc + this.fieldDesc = parentDesc.findFieldByName(name); + if (this.fieldDesc == null) { + return; + } + // map + if (this.fieldDesc.getJavaType() == JavaType.MESSAGE + && isMapDescriptor(this.fieldDesc.getMessageType())) { + this.isMapType = true; + this.mapKeyDesc = this.fieldDesc.getMessageType().getFields().get(0); + this.mapValueDesc = this.fieldDesc.getMessageType().getFields().get(1); + if (indexString != null) { + this.hasMapKey = true; + this.mapKey = parseMapKey(indexString, mapKeyDesc); } + return; } - } else { - this.name = StringUtils.trim(nodeString.substring(0, beginIndex)); - this.fieldDesc = messageDesc.findFieldByName(name); - if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) { - this.messageType = this.fieldDesc.getMessageType(); - int endIndex = nodeString.lastIndexOf(')'); - if (isMapDescriptor(messageType)) { - this.isMap = true; - if (endIndex >= 0) { - this.mapKey = nodeString.substring(beginIndex + 1, endIndex); - this.mapKeyDesc = messageType.getFields().get(0); - this.mapValueDesc = messageType.getFields().get(1); - } - } else { - this.isArray = true; - if (endIndex >= 0) { - this.arrayIndex = NumberUtils.toInt(nodeString.substring(beginIndex + 1, endIndex), -1); - if (this.arrayIndex < 0) { - this.arrayIndex = 0; - } - } + // array + if (this.fieldDesc.isRepeated()) { + this.isArrayType = true; + this.arrayIndex = NumberUtils.toInt(indexString, -1); + if (arrayIndex >= 0) { + this.hasArrayIndex = true; } + return; + } + // struct + if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) { + this.isStructType = true; + return; } + // primitive + this.isPrimitiveType = true; + } catch (RuntimeException t) { + LOG.error("Fail to PbNode,error:{},fullName:{},nodePath:{},isLastNode:{}", + t.getMessage(), parentDesc.getName(), nodeString, isLastNode, t); + throw t; + } + } + + private static Object parseMapKey(String indexString, FieldDescriptor mapKeyDesc) { + switch (mapKeyDesc.getJavaType()) { + case STRING: + return indexString; + case INT: + return NumberUtils.toInt(indexString, 0); + case LONG: + return NumberUtils.toLong(indexString, 0); + case FLOAT: + return NumberUtils.toFloat(indexString, 0); + case DOUBLE: + return NumberUtils.toDouble(indexString, 0); + case BOOLEAN: + return Boolean.TRUE.toString().equals(indexString); + case ENUM: + return mapKeyDesc.getEnumType().findValueByName(indexString); + case BYTE_STRING: + case MESSAGE: + default: + return indexString; } - this.isLastNode = isLastNode; } /** @@ -105,16 +162,61 @@ public static List parseNodePath(Descriptors.Descriptor rootDesc, String } List nodes = new ArrayList<>(); String[] nodeStrings = nodePath.split("\\."); - int lastIndex = nodeStrings.length - 1; + final int lastIndex = nodeStrings.length - 1; + String parentPath = null; + StringBuilder currentPathBuilder = new StringBuilder(); Descriptors.Descriptor current = rootDesc; for (int i = 0; i <= lastIndex; i++) { if (current == null) { return null; } + // pbNode String nodeString = nodeStrings[i]; - PbNode pbNode = new PbNode(current, nodeString, (i == lastIndex)); - current = pbNode.getMessageType(); - nodes.add(pbNode); + PbNode pbNode = new PbNode(current, parentPath, nodeString, (i == lastIndex)); + if (parentPath == null) { + currentPathBuilder.append(nodeString); + } else { + currentPathBuilder.append(".").append(nodeString); + } + parentPath = currentPathBuilder.toString(); + if (pbNode.getFieldDesc() == null) { + return null; + } + // primitive + if (pbNode.isPrimitiveType()) { + current = null; + nodes.add(pbNode); + continue; + } else if (pbNode.isArrayType()) { + // array + if (pbNode.getFieldDesc().getJavaType() == JavaType.MESSAGE) { + current = pbNode.getFieldDesc().getMessageType(); + } else { + current = null; + } + nodes.add(pbNode); + continue; + } else if (pbNode.isMapType()) { + // map + if (pbNode.isHasMapKey()) { + if (pbNode.getMapValueDesc().getJavaType() == JavaType.MESSAGE) { + current = pbNode.getMapValueDesc().getMessageType(); + } else { + current = null; + } + } else { + current = null; + } + nodes.add(pbNode); + continue; + } else if (pbNode.isStructType()) { + // struct + current = pbNode.getFieldDesc().getMessageType(); + nodes.add(pbNode); + continue; + } else { + return null; + } } return nodes; } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java index e46fb47d8d9..ca63769c845 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java @@ -21,12 +21,16 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.Descriptors.EnumValueDescriptor; import com.google.protobuf.Descriptors.FieldDescriptor; +import com.google.protobuf.Descriptors.FieldDescriptor.JavaType; import com.google.protobuf.DynamicMessage; import org.apache.commons.lang3.StringUtils; import org.apache.flink.table.data.GenericArrayData; import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.binary.BinaryStringData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +39,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; /** @@ -45,6 +50,8 @@ public class PbSourceData extends AbstractSourceData { private static final Logger LOG = LoggerFactory.getLogger(PbSourceData.class); + public static final String ROOT = "$root"; + public static final String ROOT_KEY = "$root."; public static final String CHILD_KEY = "$child."; @@ -61,6 +68,9 @@ public class PbSourceData extends AbstractSourceData { protected Charset srcCharset; + private Map> nodeValueCache = new HashMap<>(); + private Map>> mapNodeCache = new HashMap<>(); + /** * Constructor */ @@ -110,18 +120,216 @@ public int getRowCount() { * @return */ @Override + @SuppressWarnings({"rawtypes", "unchecked"}) public Object getField(int rowNum, String fieldName) { - Object fieldValue = ""; try { + // check(root); if (isContextField(fieldName)) { return getContextField(fieldName); } + Object fieldValue = findFieldNode(rowNum, fieldName); + List childNodes = this.columnNodeMap.get(fieldName); + if (childNodes == null || childNodes.size() == 0) { + return null; + } + PbNode lastNode = childNodes.get(childNodes.size() - 1); + // primitive + if (lastNode.isPrimitiveType()) { + if (fieldValue instanceof ByteString) { + ByteString byteString = (ByteString) fieldValue; + return byteString.toByteArray(); + } else { + return fieldValue; + } + } + // struct + if (lastNode.isStructType()) { + if (!(fieldValue instanceof DynamicMessage)) { + return null; + } + return buildStructData(lastNode.getFieldDesc().getMessageType(), (DynamicMessage) fieldValue); + } + // array + if (lastNode.isArrayType()) { + if (!lastNode.isHasArrayIndex()) { + if (!(fieldValue instanceof List)) { + return null; + } + List valueList = (List) fieldValue; + List result = new ArrayList<>(valueList.size()); + for (Object value : valueList) { + result.add(this.buildFieldValue(lastNode.getFieldDesc(), value)); + } + return new GenericArrayData(result.toArray()); + } + return this.buildFieldValue(lastNode.getFieldDesc(), fieldValue); + } + // map + if (lastNode.isMapType()) { + if (!lastNode.isHasMapKey()) { + return buildMapData(lastNode.getFieldDesc().getMessageType(), fieldValue); + } + return this.buildFieldValue(lastNode.getMapValueDesc(), fieldValue); + } + return null; + } catch (Exception e) { + LOG.error("fail to getField,error:{},rowNum:{},fieldName:{}", e.getMessage(), rowNum, fieldName, e); + return null; + } + } + + public Object buildFieldValue(FieldDescriptor fieldDesc, Object nodeValue) { + if (fieldDesc == null || nodeValue == null) { + return null; + } + switch (fieldDesc.getJavaType()) { + case STRING: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BOOLEAN: + return nodeValue; + case ENUM: + if (nodeValue instanceof EnumValueDescriptor) { + EnumValueDescriptor enumDesc = (EnumValueDescriptor) nodeValue; + return enumDesc.getIndex(); + } + return null; + case BYTE_STRING: + if (nodeValue instanceof ByteString) { + return ((ByteString) nodeValue).toByteArray(); + } else { + return nodeValue; + } + case MESSAGE: + return this.buildStructData(fieldDesc.getMessageType(), nodeValue); + default: + return String.valueOf(nodeValue); + } + } + + public Object buildStructData(Descriptors.Descriptor messageType, Object nodeValue) { + // map + if (PbNode.isMapDescriptor(messageType)) { + return this.buildMapData(messageType, nodeValue); + } + // struct + if (!(nodeValue instanceof DynamicMessage)) { + return null; + } + DynamicMessage msgObj = (DynamicMessage) nodeValue; + GenericRowData result = new GenericRowData(messageType.getFields().size()); + int index = 0; + for (FieldDescriptor fieldDesc : messageType.getFields()) { + Object fieldValue = msgObj.getField(fieldDesc); + if (fieldValue == null) { + result.setField(index++, null); + continue; + } + // field + if (!fieldDesc.isRepeated()) { + Object fieldResult = this.buildFieldValue(fieldDesc, fieldValue); + result.setField(index++, fieldResult); + continue; + } + // array + if (!(fieldValue instanceof List)) { + result.setField(index++, null); + continue; + } + // map + if (fieldDesc.getJavaType().equals(JavaType.MESSAGE) + && PbNode.isMapDescriptor(fieldDesc.getMessageType())) { + result.setField(index++, buildMapData(fieldDesc.getMessageType(), fieldValue)); + } else { + List valueList = (List) fieldValue; + List fieldResult = new ArrayList<>(valueList.size()); + for (Object value : valueList) { + fieldResult.add(this.buildFieldValue(fieldDesc, value)); + } + result.setField(index++, new GenericArrayData(fieldResult.toArray())); + } + } + return result; + } + + protected Object buildMapData(Descriptors.Descriptor messageType, Object nodeValue) { + if (!(nodeValue instanceof List)) { + return null; + } + Descriptors.FieldDescriptor keyField = messageType.findFieldByNumber(1); + Descriptors.FieldDescriptor valueField = messageType.findFieldByNumber(2); + List subNodeValueList = (List) nodeValue; + Map result = new HashMap<>(); + for (Object value : subNodeValueList) { + if (!(value instanceof DynamicMessage)) { + continue; + } + DynamicMessage subnodeValue = (DynamicMessage) value; + Object keyValue = buildFieldValue(keyField, subnodeValue.getField(keyField)); + Object valueValue = buildFieldValue(valueField, subnodeValue.getField(valueField)); + result.put(keyValue, valueValue); + } + return new GenericMapData(result); + } + + /** + * get rootDesc + * @return the rootDesc + */ + public Descriptors.Descriptor getRootDesc() { + return rootDesc; + } + + /** + * get childDesc + * @return the childDesc + */ + public Descriptors.Descriptor getChildDesc() { + return childDesc; + } + + /** + * get root + * @return the root + */ + public DynamicMessage getRoot() { + return root; + } + + /** + * get childRoot + * @return the childRoot + */ + public List getChildRoot() { + return childRoot; + } + + public Object findFieldNode(int rowNum, String fieldName) { + Object fieldValue = ""; + try { if (StringUtils.startsWith(fieldName, ROOT_KEY)) { - fieldValue = this.getRootField(fieldName); + fieldValue = this.findRootField(fieldName); } else if (StringUtils.startsWith(fieldName, CHILD_KEY)) { if (childRoot != null && rowNum < childRoot.size()) { - fieldValue = this.getChildField(rowNum, fieldName); + fieldValue = this.findChildField(rowNum, fieldName); + } + } else { + List childNodes = this.columnNodeMap.get(fieldName); + if (childNodes == null) { + childNodes = PbNode.parseNodePath(rootDesc, fieldName); + if (childNodes == null) { + childNodes = new ArrayList<>(); + } + this.columnNodeMap.put(fieldName, childNodes); + } + // error config + if (childNodes.size() == 0) { + return ""; } + // parse other node + fieldValue = this.findNodeValueByCache(childNodes, root); } return fieldValue; } catch (Exception e) { @@ -130,12 +338,25 @@ public Object getField(int rowNum, String fieldName) { return fieldValue; } - /** - * getRootField - * @param fieldName - * @return - */ - private Object getRootField(String srcFieldName) { + public List parseStructNodeList(String srcFieldName, Descriptor currentDesc) { + List childNodes = this.columnNodeMap.get(srcFieldName); + if (childNodes == null) { + String fieldName = srcFieldName; + if (StringUtils.startsWith(fieldName, ROOT_KEY)) { + fieldName = srcFieldName.substring(ROOT_KEY.length()); + } else if (StringUtils.startsWith(fieldName, CHILD_KEY)) { + fieldName = srcFieldName.substring(CHILD_KEY.length()); + } + childNodes = PbNode.parseNodePath(currentDesc, fieldName); + if (childNodes == null) { + childNodes = new ArrayList<>(); + } + this.columnNodeMap.put(srcFieldName, childNodes); + } + return childNodes; + } + + private Object findRootField(String srcFieldName) { List childNodes = this.columnNodeMap.get(srcFieldName); if (childNodes == null) { String fieldName = srcFieldName.substring(ROOT_KEY.length()); @@ -147,20 +368,14 @@ private Object getRootField(String srcFieldName) { } // error config if (childNodes.size() == 0) { - return ""; + return null; } // parse other node - Object fieldValue = this.getNodeValue(childNodes, root); + Object fieldValue = this.findNodeValueByCache(childNodes, root); return fieldValue; } - /** - * getChildField - * @param rowNum - * @param srcFieldName - * @return - */ - private Object getChildField(int rowNum, String srcFieldName) { + private Object findChildField(int rowNum, String srcFieldName) { if (this.childRoot == null || this.childDesc == null) { return ""; } @@ -179,18 +394,111 @@ private Object getChildField(int rowNum, String srcFieldName) { } // parse other node DynamicMessage child = childRoot.get(rowNum); - Object fieldValue = this.getNodeValue(childNodes, child); + Object fieldValue = this.findNodeValueByCache(childNodes, child); return fieldValue; } - /** - * getNodeValue - * @param childNodes - * @param root - * @return - */ - @SuppressWarnings({"rawtypes", "unchecked"}) - private Object getNodeValue(List childNodes, DynamicMessage root) { + public Object findNodeValueByCache(List childNodes, DynamicMessage root) { + Map subNodeValueCache = this.nodeValueCache.computeIfAbsent(root, + k -> new HashMap<>()); + Map> subMapNodeCache = this.mapNodeCache.computeIfAbsent(root, + k -> new HashMap<>()); + for (int i = childNodes.size() - 1; i >= 0; i--) { + PbNode node = childNodes.get(i); + // index path + Object subNodeValue = subNodeValueCache.get(node.getCurrentIndexPath()); + if (subNodeValue != null) { + if (i == childNodes.size() - 1) { + return subNodeValue; + } else { + if (subNodeValue instanceof DynamicMessage) { + List subChildNodes = childNodes.subList(i + 1, childNodes.size()); + return this.findNodeValue(subChildNodes, (DynamicMessage) subNodeValue); + } else { + return null; + } + } + } + // path + subNodeValue = subNodeValueCache.get(node.getCurrentPath()); + if (subNodeValue != null) { + if (i == childNodes.size() - 1) { + return subNodeValue; + } else { + // primitive + if (node.isPrimitiveType()) { + return null; + } + // struct + if (node.isStructType()) { + List subChildNodes = childNodes.subList(i + 1, childNodes.size()); + return this.findNodeValue(subChildNodes, (DynamicMessage) subNodeValue); + } + // array + if (node.isArrayType()) { + if (!node.isHasArrayIndex()) { + return null; + } + if (!(subNodeValue instanceof List)) { + return null; + } + List nodeValueList = (List) subNodeValue; + if (node.getArrayIndex() >= nodeValueList.size()) { + return null; + } + Object newNode = nodeValueList.get(node.getArrayIndex()); + if (!(newNode instanceof DynamicMessage)) { + return null; + } + List subChildNodes = childNodes.subList(i + 1, childNodes.size()); + return this.findNodeValue(subChildNodes, (DynamicMessage) newNode); + } + // map + if (node.isMapType()) { + if (!node.isHasMapKey()) { + return null; + } + final Object mapNodeValue = subNodeValue; + Map mapCache = subMapNodeCache.computeIfAbsent(node.getCurrentPath(), + k -> parseMapNode(mapNodeValue, node)); + Object fieldValue = mapCache.get(node.getMapKey()); + if (fieldValue == null || !(fieldValue instanceof DynamicMessage)) { + return null; + } + List subChildNodes = childNodes.subList(i + 1, childNodes.size()); + return this.findNodeValue(subChildNodes, (DynamicMessage) fieldValue); + } + return null; + } + } + } + return this.findNodeValue(childNodes, root); + } + + private static Map parseMapNode(Object nodeValue, PbNode node) { + if (!(nodeValue instanceof List)) { + return new HashMap<>(); + } + List nodeValueList = (List) nodeValue; + Map mapCache = new HashMap<>(); + for (Object value : nodeValueList) { + if (!(value instanceof DynamicMessage)) { + continue; + } + DynamicMessage msg = (DynamicMessage) value; + Object keyValue = msg.getField(node.getMapKeyDesc()); + Object valueValue = msg.getField(node.getMapValueDesc()); + mapCache.put(keyValue, valueValue); + } + return mapCache; + } + + // @SuppressWarnings({"rawtypes", "unchecked"}) + public Object findNodeValue(List childNodes, DynamicMessage root) { + Map subNodeValueCache = this.nodeValueCache.computeIfAbsent(root, + k -> new HashMap<>()); + Map> subMapNodeCache = this.mapNodeCache.computeIfAbsent(root, + k -> new HashMap<>()); DynamicMessage current = root; for (int i = 0; i < childNodes.size(); i++) { PbNode node = childNodes.get(i); @@ -199,120 +507,310 @@ private Object getNodeValue(List childNodes, DynamicMessage root) { // error data break; } - if (!node.isLastNode()) { - if (node.isArray()) { - current = (DynamicMessage) ((List) nodeValue).get(node.getArrayIndex()); - } else if (node.isMap()) { - List nodeValueList = (List) nodeValue; - DynamicMessage newCurrent = null; - for (DynamicMessage subnodeValue : nodeValueList) { - String keyValue = String.valueOf(subnodeValue.getField(node.getMapKeyDesc())); - if (StringUtils.equals(keyValue, node.getMapKey())) { - newCurrent = (DynamicMessage) subnodeValue.getField(node.getMapValueDesc()); - break; - } + if (node.isLastNode()) { + // primitive + if (node.isPrimitiveType()) { + if (nodeValue instanceof ByteString) { + ByteString byteString = (ByteString) nodeValue; + return byteString.toByteArray(); + } else if (node.getFieldDesc().getJavaType().equals(JavaType.STRING)) { + return new BinaryStringData(String.valueOf(nodeValue)); + } else { + return nodeValue; } - if (newCurrent == null) { + } + // struct + if (node.isStructType()) { + subNodeValueCache.put(node.getCurrentPath(), nodeValue); + return nodeValue; + } + // array + if (node.isArrayType()) { + subNodeValueCache.put(node.getCurrentPath(), nodeValue); + if (!node.isHasArrayIndex()) { + return nodeValue; + } + if (!(nodeValue instanceof List)) { + return null; + } + List nodeValueList = (List) nodeValue; + if (node.getArrayIndex() >= nodeValueList.size()) { + return null; + } + Object arrayIndexNodeValue = nodeValueList.get(node.getArrayIndex()); + subNodeValueCache.put(node.getCurrentIndexPath(), arrayIndexNodeValue); + return arrayIndexNodeValue; + } + // map + if (node.isMapType()) { + subNodeValueCache.put(node.getCurrentPath(), nodeValue); + if (!node.isHasMapKey()) { + return nodeValue; + } + final Object mapNodeValue = nodeValue; + Map mapCache = subMapNodeCache.computeIfAbsent(node.getCurrentPath(), + k -> parseMapNode(mapNodeValue, node)); + Object fieldValue = mapCache.get(node.getMapKey()); + subNodeValueCache.put(node.getCurrentIndexPath(), fieldValue); + return fieldValue; + } + return null; + } else { + // primitive + if (node.isPrimitiveType()) { + return null; + } + // struct + if (node.isStructType()) { + subNodeValueCache.put(node.getCurrentPath(), nodeValue); + if (!(nodeValue instanceof DynamicMessage)) { return null; } - current = newCurrent; - } else { current = (DynamicMessage) nodeValue; + continue; } - continue; - } - // last node - if (node.isArray()) { - return buildStructData(node.getMessageType(), ((List) nodeValue).get(node.getArrayIndex())); - } else if (node.isMap()) { - List nodeValueList = (List) nodeValue; - Object fieldValue = null; - for (DynamicMessage subnodeValue : nodeValueList) { - String keyValue = String.valueOf(subnodeValue.getField(node.getMapKeyDesc())); - if (StringUtils.equals(keyValue, node.getMapKey())) { - fieldValue = subnodeValue.getField(node.getMapValueDesc()); - break; + // array + if (node.isArrayType()) { + subNodeValueCache.put(node.getCurrentPath(), nodeValue); + if (!node.isHasArrayIndex()) { + return null; } + if (!(nodeValue instanceof List)) { + return null; + } + List nodeValueList = (List) nodeValue; + if (node.getArrayIndex() >= nodeValueList.size()) { + return null; + } + Object newNode = nodeValueList.get(node.getArrayIndex()); + subNodeValueCache.put(node.getCurrentIndexPath(), newNode); + if (!(newNode instanceof DynamicMessage)) { + return null; + } + current = (DynamicMessage) newNode; + continue; } - return this.buildFieldValue(node.getFieldDesc(), fieldValue, false); - } else if (node.isMapType()) { - return this.buildStructData(node.getMessageType(), nodeValue); - } else if (node.getFieldDesc().isRepeated()) { - List valueList = (List) nodeValue; - List result = new ArrayList<>(valueList.size()); - for (Object value : valueList) { - result.add(this.buildFieldValue(node.getFieldDesc(), value, false)); + // map + if (node.isMapType()) { + subNodeValueCache.put(node.getCurrentPath(), nodeValue); + if (!node.isHasMapKey()) { + return null; + } + final Object mapNodeValue = nodeValue; + Map mapCache = subMapNodeCache.computeIfAbsent(node.getCurrentPath(), + k -> parseMapNode(mapNodeValue, node)); + Object fieldValue = mapCache.get(node.getMapKey()); + subNodeValueCache.put(node.getCurrentIndexPath(), fieldValue); + if (fieldValue == null || !(fieldValue instanceof DynamicMessage)) { + return null; + } + current = (DynamicMessage) fieldValue; + continue; } - return new GenericArrayData(result.toArray()); - } else { - return this.buildFieldValue(node.getFieldDesc(), nodeValue, false); + return null; } } return null; } - @SuppressWarnings("unchecked") - private Object buildFieldValue(FieldDescriptor fieldDesc, Object nodeValue, boolean isRepeated) { - if (nodeValue == null) { - return null; + /** + * Clear the leaf field referenced by {@code childNodes} on a copy of {@code root}. + *

+ * Implementation notes (important): + *

    + *
  • Intermediate nodes are descended by reading the value out of the parent + * builder, creating a sub-builder via {@link DynamicMessage#toBuilder()}, + * recursing into it, and then writing the rebuilt sub-message back via + * {@code setField} / {@code setRepeatedField}. We never rely on automatic + * reverse propagation from {@code getFieldBuilder}, which is not consistent + * across protobuf-java versions for {@code DynamicMessage.Builder}.
  • + *
  • Repeated and map entries are NEVER mutated through the list returned by + * {@code getField} (it is an unmodifiable view in many protobuf versions). + * Instead the field is cleared and the kept entries are re-added via + * {@code addRepeatedField}, which is the portable way to "remove" an entry + * from a {@code DynamicMessage.Builder}.
  • + *
+ * + * @param childNodes path to the leaf node to clear + * @param root the top-level builder; modifications are applied to it + */ + public void clearNodeValue(List childNodes, DynamicMessage.Builder root) { + if (childNodes == null || childNodes.isEmpty() || root == null) { + return; } - switch (fieldDesc.getJavaType()) { - case STRING: - case INT: - case LONG: - case FLOAT: - case DOUBLE: - case BOOLEAN: - case ENUM: - return nodeValue; - case BYTE_STRING: - return ((ByteString) nodeValue).toByteArray(); - case MESSAGE: { - if (!isRepeated) { - return this.buildStructData(fieldDesc.getMessageType(), nodeValue); - } else if (PbNode.isMapDescriptor(fieldDesc.getMessageType())) { - return this.buildStructData(fieldDesc.getMessageType(), nodeValue); + clearNodeValueRec(root, childNodes, 0); + } + + /** + * Recursive helper. Modifies {@code builder} in place; for nested levels, every + * change is written back to {@code builder} via setField/setRepeatedField as we + * unwind, so the topmost caller sees the modification. + */ + private void clearNodeValueRec(DynamicMessage.Builder builder, List nodes, int from) { + PbNode node = nodes.get(from); + FieldDescriptor fd = node.getFieldDesc(); + if (fd == null) { + return; + } + + boolean isLast = (from == nodes.size() - 1); + + // ============================== LEAF ============================== + if (isLast) { + // primitive / struct / repeated-without-index / map-without-key: clearField wipes + // the whole field. This is the safe single-call PB API. + if (node.isPrimitiveType() + || node.isStructType() + || (node.isArrayType() && !node.isHasArrayIndex()) + || (node.isMapType() && !node.isHasMapKey())) { + builder.clearField(fd); + return; + } + // repeated with explicit index: rebuild the list without the target element. + if (node.isArrayType() && node.isHasArrayIndex()) { + removeRepeatedAt(builder, fd, node.getArrayIndex()); + return; + } + // map with explicit key: rebuild the entries dropping the matching key. + if (node.isMapType() && node.isHasMapKey()) { + removeMapEntryByKey(builder, fd, node.getMapKeyDesc(), node.getMapKey()); + return; + } + return; + } + + // ============================ INTERMEDIATE ============================ + // primitive intermediate: invalid path, abort + if (node.isPrimitiveType()) { + return; + } + + // struct intermediate: descend into the message field, mutate, then setField back + if (node.isStructType()) { + if (!builder.hasField(fd)) { + return; + } + Object child = builder.getField(fd); + if (!(child instanceof DynamicMessage)) { + return; + } + DynamicMessage.Builder childBuilder = ((DynamicMessage) child).toBuilder(); + clearNodeValueRec(childBuilder, nodes, from + 1); + builder.setField(fd, childBuilder.build()); + return; + } + + // array intermediate (only meaningful with an explicit index): descend into that element + if (node.isArrayType()) { + if (!node.isHasArrayIndex() || fd.getJavaType() != JavaType.MESSAGE) { + return; + } + int idx = node.getArrayIndex(); + int count = builder.getRepeatedFieldCount(fd); + if (idx < 0 || idx >= count) { + return; + } + Object element = builder.getRepeatedField(fd, idx); + if (!(element instanceof DynamicMessage)) { + return; + } + DynamicMessage.Builder eb = ((DynamicMessage) element).toBuilder(); + clearNodeValueRec(eb, nodes, from + 1); + builder.setRepeatedField(fd, idx, eb.build()); + return; + } + + // map intermediate: only meaningful with an explicit key. + // Descend into the value of the matching entry, mutate it, and replace the entry. + if (node.isMapType()) { + if (!node.isHasMapKey()) { + return; + } + FieldDescriptor mapKeyDesc = node.getMapKeyDesc(); + FieldDescriptor mapValueDesc = node.getMapValueDesc(); + if (mapKeyDesc == null || mapValueDesc == null + || mapValueDesc.getJavaType() != JavaType.MESSAGE) { + return; + } + int count = builder.getRepeatedFieldCount(fd); + for (int i = 0; i < count; i++) { + Object entry = builder.getRepeatedField(fd, i); + if (!(entry instanceof DynamicMessage)) { + continue; + } + DynamicMessage entryMsg = (DynamicMessage) entry; + Object keyVal = entryMsg.getField(mapKeyDesc); + if (keyVal == null || !Objects.equals(node.getMapKey(), keyVal)) { + continue; } - List valueList = (List) nodeValue; - List result = new ArrayList<>(valueList.size()); - for (DynamicMessage value : valueList) { - result.add(this.buildStructData(fieldDesc.getMessageType(), value)); + Object valObj = entryMsg.getField(mapValueDesc); + if (!(valObj instanceof DynamicMessage)) { + return; } - return new GenericArrayData(result.toArray()); + DynamicMessage.Builder valBuilder = ((DynamicMessage) valObj).toBuilder(); + clearNodeValueRec(valBuilder, nodes, from + 1); + DynamicMessage.Builder entryBuilder = entryMsg.toBuilder(); + entryBuilder.setField(mapValueDesc, valBuilder.build()); + builder.setRepeatedField(fd, i, entryBuilder.build()); + return; } - default: - return String.valueOf(nodeValue); } } - @SuppressWarnings("unchecked") - protected Object buildStructData(Descriptors.Descriptor messageType, Object nodeValue) { - // map - if (PbNode.isMapDescriptor(messageType)) { - Descriptors.FieldDescriptor keyField = messageType.findFieldByNumber(1); - Descriptors.FieldDescriptor valueField = messageType.findFieldByNumber(2); - List subNodeValueList = (List) nodeValue; - Map result = new HashMap<>(); - for (DynamicMessage subnodeValue : subNodeValueList) { - Object keyValue = buildFieldValue(keyField, subnodeValue.getField(keyField), false); - Object valueValue = buildFieldValue(valueField, subnodeValue.getField(valueField), false); - result.put(keyValue, valueValue); - } - return new GenericMapData(result); + /** + * Remove the {@code targetIndex}-th element from the given repeated field on + * {@code builder}. {@code DynamicMessage.Builder} does not expose + * {@code removeRepeatedField} portably across protobuf-java versions, so we + * rebuild the field via clearField + addRepeatedField. + */ + private static void removeRepeatedAt(DynamicMessage.Builder builder, + FieldDescriptor fd, int targetIndex) { + int count = builder.getRepeatedFieldCount(fd); + if (targetIndex < 0 || targetIndex >= count) { + return; } - // struct - DynamicMessage msgObj = (DynamicMessage) nodeValue; - GenericRowData result = new GenericRowData(messageType.getFields().size()); - int index = 0; - for (FieldDescriptor fieldDesc : messageType.getFields()) { - Object fieldValue = msgObj.getField(fieldDesc); - if (fieldValue == null) { - result.setField(index++, null); + List kept = new ArrayList<>(count - 1); + for (int i = 0; i < count; i++) { + if (i == targetIndex) { continue; } - Object fieldResult = this.buildFieldValue(fieldDesc, fieldValue, false); - result.setField(index++, fieldResult); + kept.add(builder.getRepeatedField(fd, i)); + } + builder.clearField(fd); + for (Object v : kept) { + builder.addRepeatedField(fd, v); + } + } + + /** + * Remove the map entry whose key equals {@code targetKey} from the given map field + * on {@code builder}. Uses the portable clearField + addRepeatedField approach. + */ + private static void removeMapEntryByKey(DynamicMessage.Builder builder, + FieldDescriptor fd, FieldDescriptor mapKeyDesc, Object targetKey) { + if (mapKeyDesc == null || targetKey == null) { + return; + } + int count = builder.getRepeatedFieldCount(fd); + List kept = new ArrayList<>(count); + boolean removed = false; + for (int i = 0; i < count; i++) { + Object entry = builder.getRepeatedField(fd, i); + if (entry instanceof DynamicMessage) { + Object keyVal = ((DynamicMessage) entry).getField(mapKeyDesc); + if (Objects.equals(targetKey, keyVal)) { + removed = true; + continue; + } + } + kept.add(entry); + } + if (!removed) { + return; + } + builder.clearField(fd); + for (Object e : kept) { + builder.addRepeatedField(fd, e); } - return result; } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java index cc4a6a8e99f..46ce0215f11 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java @@ -86,7 +86,7 @@ public PbSourceDecoder(PbSourceInfo sourceInfo) { this.rowsNodePath = sourceInfo.getRowsNodePath(); this.childNodes = PbNode.parseNodePath(rootDesc, rowsNodePath); if (this.childNodes != null && this.childNodes.size() > 0) { - this.childDesc = this.childNodes.get(this.childNodes.size() - 1).getMessageType(); + this.childDesc = this.childNodes.get(this.childNodes.size() - 1).getFieldDesc().getMessageType(); } } catch (Exception e) { LOG.error(e.getMessage(), e); @@ -127,7 +127,7 @@ public SourceData decode(byte[] srcBytes, Context context) { break; } } - if (!node.isArray()) { + if (!node.isArrayType()) { if (!(nodeValue instanceof DynamicMessage)) { // error data return new PbSourceData(root, rootDesc, columnNodeMap, srcCharset); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FunctionConstant.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FunctionConstant.java index 1abe442f840..bca3ec36ddb 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FunctionConstant.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/FunctionConstant.java @@ -35,4 +35,6 @@ public class FunctionConstant { public static final String TEMPORAL_TYPE = "temporal"; + public static final String PB_TYPE = "pb"; + } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ConcatStructFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ConcatStructFunction.java new file mode 100644 index 00000000000..6d23644ab14 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ConcatStructFunction.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.pb; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.FunctionConstant; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; +import org.apache.flink.table.data.GenericRowData; + +import java.util.ArrayList; +import java.util.List; + +/** + * ConcatStructFunction -> concat_struct(field1, field2, field3...) + * description: + * - Always returns a GenericRowData whose arity equals the number of input parameters. + * - If any parameter evaluates to NULL, the corresponding position in the returned + * GenericRowData is set to NULL while the other positions are populated normally. + * - Each field value is taken from the protobuf source data based on its path. + */ +@TransformFunction(type = FunctionConstant.PB_TYPE, names = { + "concat_struct"}, parameter = "(field1,field2,field3...)", descriptions = { + "- Always returns a GenericRowData whose arity equals the number of input parameters;", + "- If any parameter is NULL, the corresponding position in the returned " + + "GenericRowData is set to NULL while the other positions are populated normally;", + "- Each field value is taken from the protobuf source data based on its 'path'." + }, examples = { + "concat_struct($root.name,$root.age) = +I(\"Alice\",11)" + }) +public class ConcatStructFunction implements ValueParser { + + private final List fieldParsers; + + public ConcatStructFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + this.fieldParsers = new ArrayList<>(); + for (int i = 0; i < expressions.size(); i++) { + this.fieldParsers.add(OperatorTools.buildParser(expressions.get(i))); + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + GenericRowData result = new GenericRowData(fieldParsers.size()); + int index = 0; + for (ValueParser parser : fieldParsers) { + result.setField(index++, parser.parse(sourceData, rowIndex, context)); + } + return result; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractBinaryFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractBinaryFunction.java new file mode 100644 index 00000000000..102031f82f0 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractBinaryFunction.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.pb; + +import org.apache.inlong.sdk.transform.decode.PbNode; +import org.apache.inlong.sdk.transform.decode.PbSourceData; +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.FunctionConstant; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ColumnParser; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.MessageLite; +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.table.data.GenericArrayData; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +/** + * ExtractBinaryFunction -> extract_binary(path) + * description: + * - Only works on protobuf source data; returns NULL if the source is not a PbSourceData. + * - Returns NULL if 'path' is missing/invalid, or the path cannot be resolved to a value + * in the protobuf message. + * - For primitive / struct / map nodes and array nodes with an explicit array index, + * returns the matched value serialized as a {@code byte[]}. + * - For array nodes without an array index, returns a {@link GenericArrayData} whose + * elements are the {@code byte[]} representation of each list value. + */ +@TransformFunction(type = FunctionConstant.PB_TYPE, names = { + "extract_binary"}, parameter = "(path)", descriptions = { + "- Only works on protobuf source data; returns NULL if the source is not a PbSourceData;", + "- Returns NULL if 'path' is missing/invalid, or the path cannot be resolved " + + "to a value in the protobuf message;", + "- For primitive / struct / map nodes and array nodes with an explicit array index, " + + "returns the matched value serialized as a byte[];", + "- For array nodes without an array index, returns a GenericArrayData whose elements " + + "are the byte[] representation of each list value." + }, examples = { + "extract_binary($root.feature) = [62,111]" + }) +public class ExtractBinaryFunction implements ValueParser { + + private final ValueParser pathParser; + private Descriptor parentDesc; + private DynamicMessage parentRoot; + + public ExtractBinaryFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + this.pathParser = OperatorTools.buildParser(expressions.get(0)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + // data + if (!(sourceData instanceof PbSourceData)) { + return null; + } + if (pathParser instanceof ColumnParser) { + return this.parseByColumnParser(sourceData, rowIndex, context); + } + if (pathParser instanceof ExtractStructExcludingFunction) { + ExtractStructExcludingFunction excluding = (ExtractStructExcludingFunction) pathParser; + excluding.setKeepMessage(true); + Object result = excluding.parse(sourceData, rowIndex, context); + return toByteArray(result); + } + Object result = this.pathParser.parse(sourceData, rowIndex, context); + return toByteArray(result); + } + + public Object parseByColumnParser(SourceData sourceData, int rowIndex, Context context) { + String path = ((ColumnParser) pathParser).getFieldName(); + if (path == null) { + return null; + } + PbSourceData pbData = (PbSourceData) sourceData; + if (StringUtils.equals(PbSourceData.ROOT, path)) { + return pbData.getRoot().toByteArray(); + } + // node list + List childNodes = null; + boolean isParentData = false; + if (StringUtils.startsWith(path, PbSourceData.ROOT_KEY)) { + childNodes = pbData.parseStructNodeList(path, pbData.getRootDesc()); + } else if (StringUtils.startsWith(path, PbSourceData.CHILD_KEY)) { + if (pbData.getChildDesc() == null) { + return null; + } + childNodes = pbData.parseStructNodeList(path, pbData.getChildDesc()); + } else if (parentDesc != null) { + childNodes = pbData.parseStructNodeList(path, parentDesc); + isParentData = true; + } + if (childNodes == null || childNodes.size() <= 0) { + return null; + } + // value + Object currentNode = null; + if (isParentData) { + currentNode = pbData.findNodeValueByCache(childNodes, parentRoot); + } else { + currentNode = pbData.findFieldNode(rowIndex, path); + } + if (currentNode == null) { + return null; + } + // array + PbNode lastNode = childNodes.get(childNodes.size() - 1); + // primitive + if (lastNode.isPrimitiveType()) { + return toByteArray(currentNode); + } + // struct + if (lastNode.isStructType()) { + return toByteArray(currentNode); + } + // array + if (lastNode.isArrayType()) { + if (!lastNode.isHasArrayIndex()) { + if (!(currentNode instanceof List)) { + return null; + } + List valueList = (List) currentNode; + List fieldResult = new ArrayList<>(valueList.size()); + for (Object value : valueList) { + fieldResult.add(toByteArray(value)); + } + return new GenericArrayData(fieldResult.toArray()); + } + return toByteArray(currentNode); + } + // map + if (lastNode.isMapType()) { + return toByteArray(currentNode); + } + return null; + } + + private Object toByteArray(Object currentNode) { + if (currentNode == null) { + return null; + } + if (currentNode instanceof MessageLite) { + return ((MessageLite) currentNode).toByteArray(); + } + if (currentNode instanceof ByteString) { + return ((ByteString) currentNode).toByteArray(); + } + if (currentNode instanceof List) { + List valueList = (List) currentNode; + List fieldResult = new ArrayList<>(valueList.size()); + for (Object value : valueList) { + fieldResult.add(toByteArray(value)); + } + return new GenericArrayData(fieldResult.toArray()); + } + return String.valueOf(currentNode).getBytes(StandardCharsets.ISO_8859_1); + } + + /** + * get parentDesc + * @return the parentDesc + */ + public Descriptor getParentDesc() { + return parentDesc; + } + + /** + * set parentDesc + * @param parentDesc the parentDesc to set + */ + public void setParentDesc(Descriptor parentDesc) { + this.parentDesc = parentDesc; + } + + /** + * get parentRoot + * @return the parentRoot + */ + public DynamicMessage getParentRoot() { + return parentRoot; + } + + /** + * set parentRoot + * @param parentRoot the parentRoot to set + */ + public void setParentRoot(DynamicMessage parentRoot) { + this.parentRoot = parentRoot; + } + +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractStructExcludingFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractStructExcludingFunction.java new file mode 100644 index 00000000000..589f9433889 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractStructExcludingFunction.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.pb; + +import org.apache.inlong.sdk.transform.decode.PbNode; +import org.apache.inlong.sdk.transform.decode.PbSourceData; +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.FunctionConstant; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ColumnParser; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.Descriptors.FieldDescriptor.JavaType; +import com.google.protobuf.DynamicMessage; +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; +import org.apache.flink.table.data.GenericArrayData; + +import java.util.ArrayList; +import java.util.List; + +/** + * ExtractStructExcludingFunction -> extract_struct_excluding(path, excludeField1, excludeField2, ...) + * description: + * - Only works on protobuf source data; returns NULL if the source is not a PbSourceData. + * - Returns NULL if 'path' is missing/invalid, or the path cannot be resolved to a + * message-typed node in the protobuf source data. + * - Each {@code excludeFieldN} is the name (relative to 'path') of a sub-field that + * should be REMOVED from a copy of the located message before returning. Parameters + * that cannot be resolved on the located message, or are not plain column references, + * are silently ignored. The original record is never mutated. + * - When 'path' resolves to: + * - a single message: returns a GenericRowData built from a trimmed copy of that + * message (the excluded fields are cleared); + * - a repeated (array) field of messages: returns a GenericArrayData whose elements + * are the trimmed GenericRowData for every array element. + */ +@TransformFunction(type = FunctionConstant.PB_TYPE, names = { + "extract_struct_excluding"}, parameter = "(path, excludeField1, excludeField2, ...)", descriptions = { + "- Only works on protobuf source data; returns NULL if the source is not a PbSourceData;", + "- Returns NULL if 'path' is missing/invalid, or the path cannot be resolved " + + "to a message-typed node;", + "- Each excludeFieldN is the name (relative to 'path') of a sub-field to REMOVE " + + "from a copy of the located message; unknown / non-column-reference " + + "parameters are silently ignored;", + "- When 'path' resolves to a single message, returns a trimmed GenericRowData. " + + "When 'path' resolves to a repeated message field, returns a " + + "GenericArrayData whose elements are the trimmed GenericRowData for " + + "every array element. The original record is never mutated." + }, examples = { + "extract_struct_excluding($root.person,address,phone) " + + "= " + }) +public class ExtractStructExcludingFunction implements ValueParser { + + private final ValueParser pathParser; + private final List fieldParsers; + private String path; + private boolean isKeepMessage = false; + + public ExtractStructExcludingFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + this.pathParser = OperatorTools.buildParser(expressions.get(0)); + if (pathParser instanceof ColumnParser) { + this.path = ((ColumnParser) pathParser).getFieldName(); + } + this.fieldParsers = new ArrayList<>(); + for (int i = 1; i < expressions.size(); i++) { + this.fieldParsers.add(OperatorTools.buildParser(expressions.get(i))); + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + if (!(sourceData instanceof PbSourceData)) { + return null; + } + if (path == null) { + return null; + } + PbSourceData pbData = (PbSourceData) sourceData; + if (PbSourceData.ROOT.equals(path)) { + return buildStruct(pbData, rowIndex, context, pbData.getRoot()); + } + // parse path + List pathChildNodes = pbData.parseStructNodeList(path, pbData.getRootDesc()); + if (pathChildNodes == null || pathChildNodes.size() == 0) { + return null; + } + // check message type + PbNode lastNode = pathChildNodes.get(pathChildNodes.size() - 1); + if (!lastNode.getFieldDesc().getJavaType().equals(JavaType.MESSAGE)) { + return null; + } + // get data + Object currentNode = pbData.findFieldNode(rowIndex, path); + if (currentNode == null) { + return null; + } + // array node + if (lastNode.isArrayType() && !lastNode.isHasArrayIndex()) { + if (!(currentNode instanceof List)) { + return null; + } + List currentNodeList = (List) currentNode; + List valueResult = new ArrayList<>(currentNodeList.size()); + for (Object nodeValue : currentNodeList) { + if (!(nodeValue instanceof DynamicMessage)) { + continue; + } + DynamicMessage currentValue = (DynamicMessage) nodeValue; + Object item = buildStruct(pbData, rowIndex, context, currentValue); + valueResult.add(item); + } + GenericArrayData result = new GenericArrayData(valueResult.toArray()); + return result; + } else { + // struct node + if (!(currentNode instanceof DynamicMessage)) { + return null; + } + DynamicMessage currentValue = (DynamicMessage) currentNode; + return buildStruct(pbData, rowIndex, context, currentValue); + } + } + + private Object buildStruct(PbSourceData pbData, int rowIndex, Context context, + DynamicMessage rawValue) { + DynamicMessage.Builder currentValue = rawValue.toBuilder(); + Descriptor currentDesc = currentValue.getDescriptorForType(); + for (ValueParser parser : fieldParsers) { + if (parser instanceof ColumnParser) { + ColumnParser columnParser = (ColumnParser) parser; + String fieldName = columnParser.getFieldName(); + List childNodes = pbData.parseStructNodeList(fieldName, currentDesc); + if (childNodes == null || childNodes.size() == 0) { + continue; + } + pbData.clearNodeValue(childNodes, currentValue); + } + } + if (isKeepMessage()) { + return currentValue.build(); + } + Object result = pbData.buildStructData(currentDesc, currentValue.build()); + return result; + } + + /** + * get isKeepMessage + * @return the isKeepMessage + */ + public boolean isKeepMessage() { + return isKeepMessage; + } + + /** + * set isKeepMessage + * @param isKeepMessage the isKeepMessage to set + */ + public void setKeepMessage(boolean isKeepMessage) { + this.isKeepMessage = isKeepMessage; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractStructFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractStructFunction.java new file mode 100644 index 00000000000..d2c0c2c70f5 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/pb/ExtractStructFunction.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.pb; + +import org.apache.inlong.sdk.transform.decode.PbNode; +import org.apache.inlong.sdk.transform.decode.PbSourceData; +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.function.FunctionConstant; +import org.apache.inlong.sdk.transform.process.function.TransformFunction; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ColumnParser; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.Descriptors.FieldDescriptor.JavaType; +import com.google.protobuf.DynamicMessage; +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericRowData; + +import java.util.ArrayList; +import java.util.List; + +/** + * ExtractStructFunction -> extract_struct(path, field1, field2, field3...) + * description: + * - Only works on protobuf source data; returns NULL if the source is not a PbSourceData. + * - Returns NULL if 'path' is missing/invalid, or the path cannot be resolved to a + * DynamicMessage in the protobuf source data. + * - Otherwise, returns a GenericRowData whose arity equals the number of declared + * fields (field1, field2, ...). For each declared field: + * - if the field can be resolved on the located message, the corresponding + * position is filled with the resolved value; + * - otherwise (field not found, or the parameter is not a column reference), + * the corresponding position is set to NULL. + */ +@TransformFunction(type = FunctionConstant.PB_TYPE, names = { + "extract_struct"}, parameter = "(path, field1,field2,field3...)", descriptions = { + "- Only works on protobuf source data; returns NULL if the source is not a PbSourceData;", + "- Returns NULL if 'path' is missing/invalid, or the path cannot be resolved " + + "to a DynamicMessage in the protobuf source data;", + "- Otherwise, returns a GenericRowData whose arity equals the number of declared fields. " + + "Each position is filled with the resolved field value, or NULL if the field " + + "cannot be resolved on the located message." + }, examples = { + "extract_struct($root.person,name,age) = +I(\"Alice\",11)" + }) +public class ExtractStructFunction implements ValueParser { + + private final ValueParser pathParser; + private final List fieldParsers; + private String path; + + public ExtractStructFunction(Function expr) { + List expressions = expr.getParameters().getExpressions(); + this.pathParser = OperatorTools.buildParser(expressions.get(0)); + if (pathParser instanceof ColumnParser) { + this.path = ((ColumnParser) pathParser).getFieldName(); + } + this.fieldParsers = new ArrayList<>(); + for (int i = 1; i < expressions.size(); i++) { + this.fieldParsers.add(OperatorTools.buildParser(expressions.get(i))); + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + if (!(sourceData instanceof PbSourceData)) { + return null; + } + if (path == null) { + return null; + } + PbSourceData pbData = (PbSourceData) sourceData; + // parse path + List pathChildNodes = pbData.parseStructNodeList(path, pbData.getRootDesc()); + if (pathChildNodes == null || pathChildNodes.size() == 0) { + return null; + } + // check message type + PbNode lastNode = pathChildNodes.get(pathChildNodes.size() - 1); + if (!lastNode.getFieldDesc().getJavaType().equals(JavaType.MESSAGE)) { + return null; + } + // get data + Object currentNode = pbData.findFieldNode(rowIndex, path); + if (currentNode == null) { + return null; + } + // array node + if (lastNode.isArrayType() && !lastNode.isHasArrayIndex()) { + if (!(currentNode instanceof List)) { + return null; + } + List currentNodeList = (List) currentNode; + List valueResult = new ArrayList<>(currentNodeList.size()); + for (Object nodeValue : currentNodeList) { + if (!(nodeValue instanceof DynamicMessage)) { + continue; + } + DynamicMessage currentValue = (DynamicMessage) nodeValue; + GenericRowData item = buildStruct(pbData, rowIndex, context, currentValue); + valueResult.add(item); + } + GenericArrayData result = new GenericArrayData(valueResult.toArray()); + return result; + } else { + // struct node + if (!(currentNode instanceof DynamicMessage)) { + return null; + } + DynamicMessage currentValue = (DynamicMessage) currentNode; + return buildStruct(pbData, rowIndex, context, currentValue); + } + } + + private GenericRowData buildStruct(PbSourceData pbData, int rowIndex, Context context, + DynamicMessage currentValue) { + Descriptor currentDesc = currentValue.getDescriptorForType(); + GenericRowData result = new GenericRowData(fieldParsers.size()); + int index = 0; + for (ValueParser parser : fieldParsers) { + if (parser instanceof ColumnParser) { + ColumnParser columnParser = (ColumnParser) parser; + String fieldName = columnParser.getFieldName(); + List childNodes = pbData.parseStructNodeList(fieldName, currentDesc); + if (childNodes == null || childNodes.size() == 0) { + result.setField(index++, null); + continue; + } + Object fieldValue = pbData.findNodeValueByCache(childNodes, currentValue); + PbNode lastNode = childNodes.get(childNodes.size() - 1); + if (lastNode.isArrayType() && !lastNode.isHasArrayIndex()) { + if (!(fieldValue instanceof List)) { + result.setField(index++, null); + continue; + } + List valueList = (List) fieldValue; + List valueResult = new ArrayList<>(valueList.size()); + for (Object value : valueList) { + Object transformedValue = pbData.buildFieldValue(lastNode.getFieldDesc(), value); + valueResult.add(transformedValue); + } + GenericArrayData arrayItem = new GenericArrayData(valueResult.toArray()); + result.setField(index++, arrayItem); + } else { + Object transformedValue = pbData.buildFieldValue(lastNode.getFieldDesc(), fieldValue); + result.setField(index++, transformedValue); + } + } else if (parser instanceof ExtractBinaryFunction) { + ExtractBinaryFunction extractBinaryFunc = (ExtractBinaryFunction) parser; + extractBinaryFunc.setParentRoot(currentValue); + extractBinaryFunc.setParentDesc(currentDesc); + Object fieldValue = extractBinaryFunc.parse(pbData, rowIndex, context); + result.setField(index++, fieldValue); + } else { + result.setField(index++, null); + } + } + return result; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java index ebec7673750..7102b3c969e 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ColumnParser.java @@ -51,4 +51,11 @@ public Object parse(SourceData sourceData, int rowIndex, Context context) { return sourceData.getField(rowIndex, fieldName); } + /** + * get fieldName + * @return the fieldName + */ + public String getFieldName() { + return fieldName; + } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/TestFunctionDoc.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/TestFunctionDoc.java index 53ca1a4979c..4cb272a23d9 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/TestFunctionDoc.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/TestFunctionDoc.java @@ -32,7 +32,7 @@ public class TestFunctionDoc extends AbstractFunctionStringTestBase { @Test public void TestFunctionDoc() { Map> functionDocMap = FunctionTools.getFunctionDoc(); - Assert.assertEquals(8, functionDocMap.size()); + Assert.assertEquals(9, functionDocMap.size()); System.out.println(new Gson().toJson(functionDocMap)); } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java index a126f8d52dd..e6dcb6e44ee 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java @@ -32,6 +32,7 @@ import org.apache.inlong.sdk.transform.pojo.TransformConfig; import org.apache.inlong.sdk.transform.process.TransformProcessor; +import org.apache.flink.table.data.GenericArrayData; import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; @@ -115,4 +116,140 @@ public void testPb2RowData() throws Exception { Assert.assertEquals(((GenericRowData) output.get(1).getRow(5, 3)).getMap(2).size(), 1); Assert.assertEquals(output.get(1).getArray(6).size(), 2); } + + @Test + public void testPb2RowData4Struct() throws Exception { + String transformBase64 = this.getPbTestDescription(); + PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs"); + List sinkFields = this.getTestFieldList("sid", "packageID", "msgTime"); + // concat_struct + FieldInfo concatStructField = new FieldInfo("concatStruct"); + String[] concatStructFields = new String[]{"attaID", "packageID"}; + FormatInfo[] concatStructFieldFormats = new FormatInfo[]{ + new StringFormatInfo(), + new LongFormatInfo() + }; + RowFormatInfo concatStructFormat = new RowFormatInfo(concatStructFields, concatStructFieldFormats); + concatStructField.setFormatInfo(concatStructFormat); + sinkFields.add(concatStructField); + // extract_struct + FieldInfo extractStructField = new FieldInfo("extractStruct"); + String[] extractStructFields = new String[]{"msg", "msgTime"}; + FormatInfo[] extractStructFieldFormats = new FormatInfo[]{ + new BinaryFormatInfo(Integer.MAX_VALUE), + new LongFormatInfo() + }; + RowFormatInfo extractStructFormat = new RowFormatInfo(extractStructFields, extractStructFieldFormats); + extractStructField.setFormatInfo(extractStructFormat); + sinkFields.add(extractStructField); + // extract_binary_string + FieldInfo extractBinaryStringField = new FieldInfo("extractBinaryString"); + extractBinaryStringField.setFormatInfo(new BinaryFormatInfo(Integer.MAX_VALUE)); + sinkFields.add(extractBinaryStringField); + // extract_binary_array_binary + FieldInfo extractBinaryArrayBinaryField = new FieldInfo("extractBinaryArrayBinary"); + ArrayFormatInfo extractBinaryArrayBinaryFormat = new ArrayFormatInfo(new BinaryFormatInfo(Integer.MAX_VALUE)); + extractBinaryArrayBinaryField.setFormatInfo(extractBinaryArrayBinaryFormat); + sinkFields.add(extractBinaryArrayBinaryField); + // extract_binary_map + FieldInfo extractBinaryMapField = new FieldInfo("extractBinaryMap"); + extractBinaryMapField.setFormatInfo(new BinaryFormatInfo(Integer.MAX_VALUE)); + sinkFields.add(extractBinaryMapField); + // sink + RowDataSinkInfo rowSink = new RowDataSinkInfo("UTF-8", sinkFields); + // sql + String transformSql = "select $root.sid,$root.packageID,$child.msgTime" + + ",concat_struct($root.sid,$root.packageID) as concatStruct" + + ",extract_struct($root.msgs(0),msg,msgTime) as extractStruct" + + ",extract_binary($root.sid) as extractBinaryString" + + ",extract_binary($root.msgs) as extractBinaryArrayBinary" + + ",extract_binary($child.extinfo) as extractBinaryMap " + + "from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1 + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createPbDecoder(pbSource), + SinkEncoderFactory.createRowEncoder(rowSink)); + byte[] srcBytes = this.getPbTestData(); + List output = processor.transformForBytes(srcBytes, new HashMap<>()); + Assert.assertEquals(2, output.size()); + // 0 + Assert.assertEquals(output.get(0).getString(0).toString(), "sid"); + Assert.assertEquals(output.get(0).getString(1).toString(), "1"); + Assert.assertEquals(output.get(0).getString(2).toString(), "1713243918000"); + + Assert.assertEquals(((GenericRowData) output.get(0).getRow(3, 2)).getString(0).toString(), "sid"); + Assert.assertEquals(((GenericRowData) output.get(0).getRow(3, 2)).getLong(1), 1); + + Assert.assertEquals(((GenericRowData) output.get(0).getRow(4, 2)).getBinary(0).length, 9); + Assert.assertEquals(((GenericRowData) output.get(0).getRow(4, 2)).getLong(1), 1713243918000L); + + Assert.assertEquals(output.get(0).getBinary(5).length, 3); + + Assert.assertEquals(((GenericArrayData) output.get(0).getArray(6)).size(), 2); + Assert.assertEquals(((GenericArrayData) output.get(0).getArray(6)).getBinary(0).length, 32); + Assert.assertEquals(((GenericArrayData) output.get(0).getArray(6)).getBinary(1).length, 35); + + Assert.assertEquals(output.get(0).getBinary(7).length, 53); + } + + @Test + public void testPb2RowData4ExtractStructExcluding() throws Exception { + String transformBase64 = this.getPbTestDescription(); + PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs"); + List sinkFields = this.getTestFieldList("sid", "packageID", "msgTime"); + // extract_struct + FieldInfo extractStructExcludingField = new FieldInfo("extractStructExcluding"); + String[] extractStructExcludingFields = new String[]{"msg", "msgTime", "extinfo"}; + FormatInfo[] extractStructExcludingFieldFormats = new FormatInfo[]{ + new BinaryFormatInfo(Integer.MAX_VALUE), + new LongFormatInfo(), + new MapFormatInfo(new StringFormatInfo(), new StringFormatInfo()) + }; + RowFormatInfo extractStructExcludingFormat = new RowFormatInfo(extractStructExcludingFields, + extractStructExcludingFieldFormats); + extractStructExcludingField.setFormatInfo(extractStructExcludingFormat); + sinkFields.add(extractStructExcludingField); + // rootBinary + FieldInfo rootBinary = new FieldInfo("rootBinary"); + rootBinary.setFormatInfo(new BinaryFormatInfo(Integer.MAX_VALUE)); + sinkFields.add(rootBinary); + // extractStructExcludingBinary + FieldInfo extractStructExcludingBinary = new FieldInfo("extractStructExcludingBinary"); + extractStructExcludingBinary.setFormatInfo(new BinaryFormatInfo(Integer.MAX_VALUE)); + sinkFields.add(extractStructExcludingBinary); + // extractStructBinary + FieldInfo extractStructBinary = new FieldInfo("extractStructBinary"); + extractStructBinary.setFormatInfo(new BinaryFormatInfo(Integer.MAX_VALUE)); + sinkFields.add(extractStructBinary); + // sink + RowDataSinkInfo rowSink = new RowDataSinkInfo("UTF-8", sinkFields); + // sql + String transformSql = "select $root.sid,$root.packageID,$child.msgTime" + + ",extract_struct_excluding($root.msgs(0),msg,extinfo) as extractStructExcluding " + + ",extract_binary($root) as rootBinary " + + ",extract_binary(extract_struct_excluding($root.msgs(0),msg,extinfo)) as extractStructExcludingBinary " + + ",extract_binary(extract_struct($root.msgs(0),msg,msgTime)) as extractStructBinary " + + " from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1 + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createPbDecoder(pbSource), + SinkEncoderFactory.createRowEncoder(rowSink)); + byte[] srcBytes = this.getPbTestData(); + List output = processor.transformForBytes(srcBytes, new HashMap<>()); + Assert.assertEquals(2, output.size()); + // 0 + Assert.assertEquals(output.get(0).getString(0).toString(), "sid"); + Assert.assertEquals(output.get(0).getString(1).toString(), "1"); + Assert.assertEquals(output.get(0).getString(2).toString(), "1713243918000"); + + Assert.assertEquals(((GenericRowData) output.get(0).getRow(3, 3)).getBinary(0).length, 0); + Assert.assertEquals(((GenericRowData) output.get(0).getRow(3, 3)).getLong(1), 1713243918000L); + Assert.assertEquals(((GenericRowData) output.get(0).getRow(3, 3)).getMap(2).size(), 0); + + Assert.assertEquals(output.get(0).getBinary(4).length, 78); + Assert.assertEquals(output.get(0).getBinary(5).length, 7); + Assert.assertEquals(output.get(0).getBinary(6).length, 60); + } }