diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java index 4fe166832b8..310f6b1abb8 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbNode.java @@ -41,6 +41,7 @@ public class PbNode { private boolean isArray = false; private int arrayIndex = -1; private boolean isMap = false; + private boolean isMapType = false; private String mapKey = ""; private FieldDescriptor mapKeyDesc; private FieldDescriptor mapValueDesc; @@ -60,6 +61,9 @@ public PbNode(Descriptors.Descriptor messageDesc, String nodeString, boolean isL this.fieldDesc = messageDesc.findFieldByName(name); if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) { this.messageType = this.fieldDesc.getMessageType(); + if (isMapDescriptor(messageType)) { + this.isMapType = true; + } } } } else { diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java index dbeeade6f31..e46fb47d8d9 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceData.java @@ -21,13 +21,18 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.DynamicMessage; import org.apache.commons.lang3.StringUtils; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -54,7 +59,7 @@ public class PbSourceData extends AbstractSourceData { private List childRoot; - private Charset srcCharset; + protected Charset srcCharset; /** * Constructor @@ -105,8 +110,8 @@ public int getRowCount() { * @return */ @Override - public String getField(int rowNum, String fieldName) { - String fieldValue = ""; + public Object getField(int rowNum, String fieldName) { + Object fieldValue = ""; try { if (isContextField(fieldName)) { return getContextField(fieldName); @@ -130,7 +135,7 @@ public String getField(int rowNum, String fieldName) { * @param fieldName * @return */ - private String getRootField(String srcFieldName) { + private Object getRootField(String srcFieldName) { List childNodes = this.columnNodeMap.get(srcFieldName); if (childNodes == null) { String fieldName = srcFieldName.substring(ROOT_KEY.length()); @@ -145,7 +150,7 @@ private String getRootField(String srcFieldName) { return ""; } // parse other node - String fieldValue = this.getNodeValue(childNodes, root); + Object fieldValue = this.getNodeValue(childNodes, root); return fieldValue; } @@ -155,7 +160,7 @@ private String getRootField(String srcFieldName) { * @param srcFieldName * @return */ - private String getChildField(int rowNum, String srcFieldName) { + private Object getChildField(int rowNum, String srcFieldName) { if (this.childRoot == null || this.childDesc == null) { return ""; } @@ -174,7 +179,7 @@ private String getChildField(int rowNum, String srcFieldName) { } // parse other node DynamicMessage child = childRoot.get(rowNum); - String fieldValue = this.getNodeValue(childNodes, child); + Object fieldValue = this.getNodeValue(childNodes, child); return fieldValue; } @@ -184,9 +189,8 @@ private String getChildField(int rowNum, String srcFieldName) { * @param root * @return */ - @SuppressWarnings("rawtypes") - private String getNodeValue(List childNodes, DynamicMessage root) { - String fieldValue = ""; + @SuppressWarnings({"rawtypes", "unchecked"}) + private Object getNodeValue(List childNodes, DynamicMessage root) { DynamicMessage current = root; for (int i = 0; i < childNodes.size(); i++) { PbNode node = childNodes.get(i); @@ -195,62 +199,120 @@ private String getNodeValue(List childNodes, DynamicMessage root) { // error data break; } - if (node.isLastNode()) { - switch (node.getFieldDesc().getJavaType()) { - case STRING: - case INT: - case LONG: - case FLOAT: - case DOUBLE: - case BOOLEAN: - fieldValue = String.valueOf(nodeValue); - break; - case BYTE_STRING: - ByteString byteString = (ByteString) nodeValue; - fieldValue = new String(byteString.toByteArray(), srcCharset); - break; - case ENUM: - fieldValue = String.valueOf(nodeValue); - break; - case MESSAGE: - if (node.isArray()) { - fieldValue = String.valueOf(((List) nodeValue).get(node.getArrayIndex())); - } else if (node.isMap()) { - List nodeValueList = (List) nodeValue; - for (DynamicMessage subnodeValue : nodeValueList) { - String keyValue = String.valueOf(subnodeValue.getField(node.getMapKeyDesc())); - if (StringUtils.equals(keyValue, node.getMapKey())) { - fieldValue = String.valueOf(subnodeValue.getField(node.getMapValueDesc())); - break; - } - } - } else { - fieldValue = String.valueOf(nodeValue); + if (!node.isLastNode()) { + if (node.isArray()) { + current = (DynamicMessage) ((List) nodeValue).get(node.getArrayIndex()); + } else if (node.isMap()) { + List nodeValueList = (List) nodeValue; + DynamicMessage newCurrent = null; + for (DynamicMessage subnodeValue : nodeValueList) { + String keyValue = String.valueOf(subnodeValue.getField(node.getMapKeyDesc())); + if (StringUtils.equals(keyValue, node.getMapKey())) { + newCurrent = (DynamicMessage) subnodeValue.getField(node.getMapValueDesc()); + break; } - break; + } + if (newCurrent == null) { + return null; + } + current = newCurrent; + } else { + current = (DynamicMessage) nodeValue; } - break; + continue; } + // last node if (node.isArray()) { - current = (DynamicMessage) ((List) nodeValue).get(node.getArrayIndex()); + return buildStructData(node.getMessageType(), ((List) nodeValue).get(node.getArrayIndex())); } else if (node.isMap()) { List nodeValueList = (List) nodeValue; - DynamicMessage newCurrent = null; + Object fieldValue = null; for (DynamicMessage subnodeValue : nodeValueList) { String keyValue = String.valueOf(subnodeValue.getField(node.getMapKeyDesc())); if (StringUtils.equals(keyValue, node.getMapKey())) { - newCurrent = (DynamicMessage) subnodeValue.getField(node.getMapValueDesc()); + fieldValue = subnodeValue.getField(node.getMapValueDesc()); break; } } - if (newCurrent == null) { - return fieldValue; + return this.buildFieldValue(node.getFieldDesc(), fieldValue, false); + } else if (node.isMapType()) { + return this.buildStructData(node.getMessageType(), nodeValue); + } else if (node.getFieldDesc().isRepeated()) { + List valueList = (List) nodeValue; + List result = new ArrayList<>(valueList.size()); + for (Object value : valueList) { + result.add(this.buildFieldValue(node.getFieldDesc(), value, false)); } - current = newCurrent; + return new GenericArrayData(result.toArray()); } else { - current = (DynamicMessage) nodeValue; + return this.buildFieldValue(node.getFieldDesc(), nodeValue, false); } } - return fieldValue; + return null; + } + + @SuppressWarnings("unchecked") + private Object buildFieldValue(FieldDescriptor fieldDesc, Object nodeValue, boolean isRepeated) { + if (nodeValue == null) { + return null; + } + switch (fieldDesc.getJavaType()) { + case STRING: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BOOLEAN: + case ENUM: + return nodeValue; + case BYTE_STRING: + return ((ByteString) nodeValue).toByteArray(); + case MESSAGE: { + if (!isRepeated) { + return this.buildStructData(fieldDesc.getMessageType(), nodeValue); + } else if (PbNode.isMapDescriptor(fieldDesc.getMessageType())) { + return this.buildStructData(fieldDesc.getMessageType(), nodeValue); + } + List valueList = (List) nodeValue; + List result = new ArrayList<>(valueList.size()); + for (DynamicMessage value : valueList) { + result.add(this.buildStructData(fieldDesc.getMessageType(), value)); + } + return new GenericArrayData(result.toArray()); + } + default: + return String.valueOf(nodeValue); + } + } + + @SuppressWarnings("unchecked") + protected Object buildStructData(Descriptors.Descriptor messageType, Object nodeValue) { + // map + if (PbNode.isMapDescriptor(messageType)) { + Descriptors.FieldDescriptor keyField = messageType.findFieldByNumber(1); + Descriptors.FieldDescriptor valueField = messageType.findFieldByNumber(2); + List subNodeValueList = (List) nodeValue; + Map result = new HashMap<>(); + for (DynamicMessage subnodeValue : subNodeValueList) { + Object keyValue = buildFieldValue(keyField, subnodeValue.getField(keyField), false); + Object valueValue = buildFieldValue(valueField, subnodeValue.getField(valueField), false); + result.put(keyValue, valueValue); + } + return new GenericMapData(result); + } + // struct + DynamicMessage msgObj = (DynamicMessage) nodeValue; + GenericRowData result = new GenericRowData(messageType.getFields().size()); + int index = 0; + for (FieldDescriptor fieldDesc : messageType.getFields()) { + Object fieldValue = msgObj.getField(fieldDesc); + if (fieldValue == null) { + result.setField(index++, null); + continue; + } + Object fieldResult = this.buildFieldValue(fieldDesc, fieldValue, false); + result.setField(index++, fieldResult); + } + return result; } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java index 97b43c02b90..6c9c99c9dd0 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java @@ -63,7 +63,7 @@ public String encode(SinkData sinkData, Context context) { sinkData.keyList().forEach(k -> builder.append(sinkData.getField(k)).append(delimiter)); } else { for (String fieldName : sinkData.keyList()) { - String fieldValue = sinkData.getField(fieldName); + String fieldValue = formatFieldValue(sinkData.getField(fieldName)); if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) { builder.append(fieldValue); } else { @@ -78,7 +78,7 @@ public String encode(SinkData sinkData, Context context) { } else { for (FieldInfo field : fields) { String fieldName = field.getName(); - String fieldValue = sinkData.getField(fieldName); + String fieldValue = formatFieldValue(sinkData.getField(fieldName)); EscapeUtils.escapeContent(builder, delimiter, escapeChar, fieldValue); builder.append(delimiter); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java index 2b470cdc246..dcc5d84e209 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/DefaultSinkData.java @@ -26,13 +26,12 @@ /** * DefaultSinkData - * */ @Data public class DefaultSinkData implements SinkData { private List keyList = new ArrayList<>(); - private Map currentRow = new HashMap<>(); + private Map currentRow = new HashMap<>(); /** * addField @@ -40,7 +39,7 @@ public class DefaultSinkData implements SinkData { * @param fieldValue */ @Override - public void addField(String fieldName, String fieldValue) { + public void addField(String fieldName, Object fieldValue) { this.keyList.add(fieldName); this.currentRow.put(fieldName, fieldValue); } @@ -51,7 +50,7 @@ public void addField(String fieldName, String fieldValue) { * @return */ @Override - public String getField(String fieldName) { + public Object getField(String fieldName) { return this.currentRow.getOrDefault(fieldName, ""); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java index 094f4d6884e..444c73ff6cf 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java @@ -60,7 +60,7 @@ public String encode(SinkData sinkData, Context context) { builder.delete(0, builder.length()); if (fields == null || fields.size() == 0) { for (String fieldName : sinkData.keyList()) { - String fieldValue = sinkData.getField(fieldName); + String fieldValue = formatFieldValue(sinkData.getField(fieldName)); if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) { builder.append(fieldValue).append(entryDelimiter); } else { @@ -70,7 +70,7 @@ public String encode(SinkData sinkData, Context context) { } else { for (FieldInfo field : fields) { String fieldName = field.getName(); - String fieldValue = sinkData.getField(fieldName); + String fieldValue = formatFieldValue(sinkData.getField(fieldName)); builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter); } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java index c76c4e80ff4..cdcc8d3fa73 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java @@ -49,7 +49,7 @@ public Map encode(SinkData sinkData, Context context) { Map esMap = new HashMap<>(); for (FieldInfo fieldInfo : fields) { String fieldName = fieldInfo.getName(); - String strValue = sinkData.getField(fieldName); + String strValue = formatFieldValue(sinkData.getField(fieldName)); TypeConverter converter = converters.get(fieldName); if (converter == null) { esMap.put(fieldName, strValue); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java index 6d377b061c1..bbe2d018637 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java @@ -70,7 +70,7 @@ public ByteArrayOutputStream encode(SinkData sinkData, Context context) { Object[] rowsInfo = new Object[size]; Arrays.fill(rowsInfo, ""); for (int i = 0; i < size; i++) { - String fieldData = sinkData.getField(this.fields.get(i).getName()); + String fieldData = formatFieldValue(sinkData.getField(this.fields.get(i).getName())); if (fieldData == null) { continue; } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java index 226405c5159..e0263558ca1 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java @@ -58,7 +58,7 @@ public byte[] encode(SinkData sinkData, Context context) { for (String key : sinkData.keyList()) { Descriptors.FieldDescriptor fieldDescriptor = dynamicDescriptor.findFieldByName(key); if (fieldDescriptor != null) { - String fieldValue = sinkData.getField(key); + String fieldValue = formatFieldValue(sinkData.getField(key)); if (fieldValue != null) { Object value = convertValue(fieldDescriptor, fieldValue); dynamicBuilder.setField(fieldDescriptor, value); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java index 1cab9f6fe3c..507741d5c35 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/RowDataSinkEncoder.java @@ -42,10 +42,9 @@ public RowDataSinkEncoder(RowDataSinkInfo sinkInfo) { @Override public RowData encode(SinkData sinkData, Context context) { GenericRowData rowData = new GenericRowData(fieldToRowDataConverters.length); - for (int i = 0; i < fields.size(); i++) { String fieldName = fields.get(i).getName(); - String fieldValue = sinkData.getField(fieldName); + Object fieldValue = sinkData.getField(fieldName); rowData.setField(i, fieldToRowDataConverters[i].convert(fieldValue)); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java index 1ad0c38c688..6137eb15222 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkData.java @@ -25,9 +25,9 @@ */ public interface SinkData { - void addField(String fieldName, String fieldValue); + void addField(String fieldName, Object fieldValue); - String getField(String fieldName); + Object getField(String fieldName); List keyList(); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java index bd804cd771d..05d2e6a9adf 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java @@ -44,4 +44,13 @@ public SinkEncoder(List fields) { } public abstract Output encode(SinkData sinkData, Context context); + + protected String formatFieldValue(Object fieldValue) { + if (fieldValue == null) { + return null; + } else if (fieldValue instanceof byte[]) { + return new String((byte[]) fieldValue); + } + return String.valueOf(fieldValue); + } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java index e6392bbe2d5..7346a205d1d 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java @@ -227,7 +227,7 @@ public List transformForBytes(byte[] input, Map extParams) { if (fieldValue == null) { sinkData.addField(fieldName, ""); } else { - sinkData.addField(fieldName, fieldValue.toString()); + sinkData.addField(fieldName, fieldValue); } } catch (Throwable t) { sinkData.addField(fieldName, ""); diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ConcatFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ConcatFunction.java index 85eba0567f8..3f5faa57101 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ConcatFunction.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/string/ConcatFunction.java @@ -27,6 +27,7 @@ import lombok.extern.slf4j.Slf4j; import net.sf.jsqlparser.expression.Expression; import net.sf.jsqlparser.expression.Function; +import org.apache.flink.table.data.StringData; import java.util.ArrayList; import java.util.List; @@ -64,7 +65,16 @@ public ConcatFunction(Function expr) { public Object parse(SourceData sourceData, int rowIndex, Context context) { StringBuilder builder = new StringBuilder(); for (ValueParser node : nodeList) { - builder.append(node.parse(sourceData, rowIndex, context)); + Object itemValue = node.parse(sourceData, rowIndex, context); + if (itemValue == null) { + continue; + } else if (itemValue instanceof byte[]) { + builder.append(new String((byte[]) itemValue)); + } else if (itemValue instanceof StringData) { + builder.append(((StringData) itemValue).toString()); + } else { + builder.append(String.valueOf(itemValue)); + } } return builder.toString(); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java index ff61d912b3c..2743921c589 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/utils/FieldToRowDataUtils.java @@ -22,6 +22,7 @@ import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericArrayData; import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.ArrayType; @@ -39,6 +40,7 @@ import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -72,7 +74,6 @@ public interface FieldToRowDataConverter extends Serializable { converterMap.put(LogicalTypeRoot.BIGINT, (obj) -> parseLong(obj)); converterMap.put(LogicalTypeRoot.FLOAT, (obj) -> parseFloat(obj)); converterMap.put(LogicalTypeRoot.DOUBLE, (obj) -> parseDouble(obj)); - converterMap.put(LogicalTypeRoot.BINARY, (obj) -> parseBinary(obj)); converterMap.put(LogicalTypeRoot.VARBINARY, (obj) -> parseBinary(obj)); converterMap.put(LogicalTypeRoot.CHAR, (obj) -> parseVarchar(obj)); converterMap.put(LogicalTypeRoot.VARCHAR, (obj) -> parseVarchar(obj)); @@ -82,6 +83,10 @@ public interface FieldToRowDataConverter extends Serializable { converterMap.put(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, (obj) -> parseTimestampWithLocalTimeZone(obj)); converterMap.put(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, (obj) -> parseTimestampWithLocalTimeZone(obj)); converterMap.put(LogicalTypeRoot.DECIMAL, (obj) -> parseDecimal(obj)); + converterMap.put(LogicalTypeRoot.BINARY, (obj) -> parseBinary(obj)); + converterMap.put(LogicalTypeRoot.ARRAY, (obj) -> parseArray(obj)); + converterMap.put(LogicalTypeRoot.MAP, (obj) -> parseMap(obj)); + converterMap.put(LogicalTypeRoot.ROW, (obj) -> parseRow(obj)); } private static final ThreadLocal> formatLocal = new ThreadLocal<>(); @@ -122,8 +127,8 @@ private static FieldToRowDataConverter createFieldRowConverter(LogicalType field case ARRAY: return obj -> { final Object[] array = (Object[]) obj; - FieldToRowDataConverter elementConverter = - createFieldRowConverter(((ArrayType) fieldType).getElementType()); + FieldToRowDataConverter elementConverter = createFieldRowConverter( + ((ArrayType) fieldType).getElementType()); Object[] converted = Arrays.stream(array) .map(elementConverter::convert) .toArray(); @@ -131,10 +136,9 @@ private static FieldToRowDataConverter createFieldRowConverter(LogicalType field }; case MAP: return obj -> { - FieldToRowDataConverter keyConverter = - createFieldRowConverter(((MapType) fieldType).getKeyType()); - FieldToRowDataConverter valueConverter = - createFieldRowConverter(((MapType) fieldType).getValueType()); + FieldToRowDataConverter keyConverter = createFieldRowConverter(((MapType) fieldType).getKeyType()); + FieldToRowDataConverter valueConverter = createFieldRowConverter( + ((MapType) fieldType).getValueType()); Map map = (Map) obj; Map internalMap = new HashMap<>(); for (Object k : map.keySet()) { @@ -153,7 +157,13 @@ private static FieldToRowDataConverter createFieldRowConverter(LogicalType field private static Object parseBoolean(Object obj) { try { - return Boolean.parseBoolean(obj.toString()); + if (obj == null) { + return null; + } + if (obj instanceof Boolean) { + return obj; + } + return Boolean.parseBoolean(String.valueOf(obj)); } catch (RuntimeException e) { if (isIgnoreError()) { return null; @@ -164,7 +174,13 @@ private static Object parseBoolean(Object obj) { private static Object parseTinyint(Object obj) { try { - return Byte.parseByte(obj.toString()); + if (obj == null) { + return null; + } + if (obj instanceof Byte) { + return obj; + } + return Byte.parseByte(String.valueOf(obj)); } catch (RuntimeException e) { if (isIgnoreError()) { return null; @@ -175,7 +191,13 @@ private static Object parseTinyint(Object obj) { private static Object parseSmallint(Object obj) { try { - return Short.parseShort(obj.toString()); + if (obj == null) { + return null; + } + if (obj instanceof Short) { + return obj; + } + return Short.parseShort(String.valueOf(obj)); } catch (RuntimeException e) { if (isIgnoreError()) { return null; @@ -186,7 +208,13 @@ private static Object parseSmallint(Object obj) { private static Object parseInteger(Object obj) { try { - return Integer.parseInt(obj.toString()); + if (obj == null) { + return null; + } + if (obj instanceof Integer) { + return obj; + } + return Integer.parseInt(String.valueOf(obj)); } catch (RuntimeException e) { if (isIgnoreError()) { return null; @@ -197,7 +225,13 @@ private static Object parseInteger(Object obj) { private static Object parseLong(Object obj) { try { - return Long.parseLong(obj.toString()); + if (obj == null) { + return null; + } + if (obj instanceof Long) { + return obj; + } + return Long.parseLong(String.valueOf(obj)); } catch (RuntimeException e) { if (isIgnoreError()) { return null; @@ -208,7 +242,13 @@ private static Object parseLong(Object obj) { private static Object parseFloat(Object obj) { try { - return Float.parseFloat(obj.toString()); + if (obj == null) { + return null; + } + if (obj instanceof Float) { + return obj; + } + return Float.parseFloat(String.valueOf(obj)); } catch (RuntimeException e) { if (isIgnoreError()) { return null; @@ -219,18 +259,13 @@ private static Object parseFloat(Object obj) { private static Object parseDouble(Object obj) { try { - return Double.parseDouble(obj.toString()); - } catch (RuntimeException e) { - if (isIgnoreError()) { + if (obj == null) { return null; } - throw e; - } - } - - private static Object parseBinary(Object obj) { - try { - return obj.toString().getBytes(); + if (obj instanceof Double) { + return obj; + } + return Double.parseDouble(String.valueOf(obj)); } catch (RuntimeException e) { if (isIgnoreError()) { return null; @@ -241,7 +276,13 @@ private static Object parseBinary(Object obj) { private static Object parseVarchar(Object obj) { try { - return StringData.fromString((String) obj); + if (obj == null) { + return null; + } + if (obj instanceof byte[]) { + return StringData.fromString(new String((byte[]) obj)); + } + return StringData.fromString(String.valueOf(obj)); } catch (RuntimeException e) { if (isIgnoreError()) { return null; @@ -258,7 +299,7 @@ private static Object parseDate(Object obj) { if (obj instanceof Date) { return ((Date) obj).toLocalDate().toEpochDay(); } - String strObj = obj.toString(); + String strObj = String.valueOf(obj); Date date = parseDateTime(strObj); return date.toLocalDate().toEpochDay(); } catch (RuntimeException e) { @@ -305,7 +346,7 @@ private static Object parseTimeWithoutTimeZone(Object obj) { if (obj instanceof Time) { return ((Time) obj).toLocalTime().toSecondOfDay() * 1000; } - String strObj = obj.toString(); + String strObj = String.valueOf(obj); Date date = parseDateTime(strObj); return new Time(date.getTime()).toLocalTime().toSecondOfDay() * 1000; } catch (RuntimeException e) { @@ -324,7 +365,7 @@ private static Object parseTimestampWithLocalTimeZone(Object obj) { if (obj instanceof Timestamp) { return TimestampData.fromTimestamp((Timestamp) obj); } - String strObj = obj.toString(); + String strObj = String.valueOf(obj); Date date = parseDateTime(strObj); return TimestampData.fromTimestamp(new Timestamp(date.getTime())); } catch (RuntimeException e) { @@ -346,7 +387,7 @@ private static Object parseDecimal(Object obj) { DecimalType.DEFAULT_PRECISION, DecimalType.DEFAULT_SCALE); } - String strObj = obj.toString(); + String strObj = String.valueOf(obj); return DecimalData.fromBigDecimal( new BigDecimal(strObj), DecimalType.DEFAULT_PRECISION, @@ -358,4 +399,82 @@ private static Object parseDecimal(Object obj) { throw e; } } + + private static Object parseBinary(Object obj) { + try { + if (obj == null) { + return null; + } + if (obj instanceof byte[]) { + return obj; + } + return String.valueOf(obj).getBytes(); + } catch (RuntimeException e) { + if (isIgnoreError()) { + return null; + } + throw e; + } + } + + private static Object parseArray(Object obj) { + try { + if (obj == null) { + return null; + } + if (obj instanceof GenericArrayData) { + return obj; + } + if (obj instanceof List) { + return new GenericArrayData(((List) obj).toArray()); + } + return new GenericArrayData(new Object[]{obj}); + } catch (RuntimeException e) { + if (isIgnoreError()) { + return null; + } + throw e; + } + } + + private static Object parseMap(Object obj) { + try { + if (obj == null) { + return null; + } + if (obj instanceof GenericMapData) { + return obj; + } + if (obj instanceof Map) { + return new GenericMapData((Map) obj); + } + Map mapObj = new HashMap<>(); + mapObj.put(obj, obj); + return new GenericMapData(mapObj); + } catch (RuntimeException e) { + if (isIgnoreError()) { + return null; + } + throw e; + } + } + + private static Object parseRow(Object obj) { + try { + if (obj == null) { + return null; + } + if (obj instanceof GenericRowData) { + return obj; + } + GenericRowData result = new GenericRowData(1); + result.setField(0, obj); + return result; + } catch (RuntimeException e) { + if (isIgnoreError()) { + return null; + } + throw e; + } + } } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java new file mode 100644 index 00000000000..a126f8d52dd --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestPb2RowDataProcessor.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.processor; + +import org.apache.inlong.common.pojo.sort.dataflow.field.format.ArrayFormatInfo; +import org.apache.inlong.common.pojo.sort.dataflow.field.format.BinaryFormatInfo; +import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo; +import org.apache.inlong.common.pojo.sort.dataflow.field.format.LongFormatInfo; +import org.apache.inlong.common.pojo.sort.dataflow.field.format.MapFormatInfo; +import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo; +import org.apache.inlong.common.pojo.sort.dataflow.field.format.StringFormatInfo; +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.PbSourceInfo; +import org.apache.inlong.sdk.transform.pojo.RowDataSinkInfo; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestPb2RowDataProcessor extends AbstractProcessorTestBase { + + @Test + public void testPb2RowData() throws Exception { + String transformBase64 = this.getPbTestDescription(); + PbSourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs"); + String[] fieldNames = new String[]{"sid", "packageID", "msgTime", + "binaryMsg", "mapExtinfo", "structMsgItem", "listMsgs"}; + List sinkFields = this.getTestFieldList("sid", "packageID", "msgTime"); + // binaryMsg + FieldInfo binaryMsg = new FieldInfo("binaryMsg"); + BinaryFormatInfo binaryMsgFormat = new BinaryFormatInfo(Integer.MAX_VALUE); + binaryMsg.setFormatInfo(binaryMsgFormat); + sinkFields.add(binaryMsg); + // mapExtinfo + FieldInfo mapExtinfo = new FieldInfo("mapExtinfo"); + MapFormatInfo mapExtinfoFormat = new MapFormatInfo(new StringFormatInfo(), new StringFormatInfo()); + mapExtinfo.setFormatInfo(mapExtinfoFormat); + sinkFields.add(mapExtinfo); + // structMsgItem + FieldInfo structMsgItem = new FieldInfo("structMsgItem"); + String[] structMsgItemFields = new String[]{"msg", "msgTime", "extinfo"}; + FormatInfo[] structMsgItemFormats = new FormatInfo[]{ + new BinaryFormatInfo(Integer.MAX_VALUE), + new LongFormatInfo(), + new MapFormatInfo(new StringFormatInfo(), new StringFormatInfo()) + }; + RowFormatInfo structMsgItemFormat = new RowFormatInfo(structMsgItemFields, structMsgItemFormats); + structMsgItem.setFormatInfo(structMsgItemFormat); + sinkFields.add(structMsgItem); + // listMsgs + FieldInfo listMsgs = new FieldInfo("listMsgs"); + ArrayFormatInfo listMsgsFormat = new ArrayFormatInfo(structMsgItemFormat); + listMsgs.setFormatInfo(listMsgsFormat); + sinkFields.add(listMsgs); + // sink + RowDataSinkInfo rowSink = new RowDataSinkInfo("UTF-8", sinkFields); + // sql + String transformSql = "select $root.sid,$root.packageID,$child.msgTime" + + ",$child.msg as binaryMsg," + + "$child.extinfo as mapExtinfo," + + "$root.msgs(1) as structMsgItem," + + "$root.msgs as listMsgs from source"; + TransformConfig config = new TransformConfig(transformSql); + // case1 + TransformProcessor processor = TransformProcessor + .create(config, SourceDecoderFactory.createPbDecoder(pbSource), + SinkEncoderFactory.createRowEncoder(rowSink)); + byte[] srcBytes = this.getPbTestData(); + List output = processor.transformForBytes(srcBytes, new HashMap<>()); + Assert.assertEquals(2, output.size()); + // 0 + Assert.assertEquals(output.get(0).getString(0).toString(), "sid"); + Assert.assertEquals(output.get(0).getString(1).toString(), "1"); + Assert.assertEquals(output.get(0).getString(2).toString(), "1713243918000"); + Assert.assertEquals(new String(output.get(0).getBinary(3)), "msgValue4"); + Assert.assertEquals(((GenericMapData) output.get(0).getMap(4)).get("key"), "value"); + Assert.assertEquals(((GenericMapData) output.get(0).getMap(4)).get("value"), null); + Assert.assertEquals(new String(((GenericRowData) output.get(0).getRow(5, 3)).getBinary(0)), "msgValue42"); + Assert.assertEquals(((GenericRowData) output.get(0).getRow(5, 3)).getLong(1), 1713243918002L); + Assert.assertEquals(((GenericRowData) output.get(0).getRow(5, 3)).getMap(2).size(), 1); + Assert.assertEquals(output.get(0).getArray(6).size(), 2); + // 1 + Assert.assertEquals(output.get(1).getString(0).toString(), "sid"); + Assert.assertEquals(output.get(1).getString(1).toString(), "1"); + Assert.assertEquals(output.get(1).getString(2).toString(), "1713243918002"); + Assert.assertEquals(new String(output.get(1).getBinary(3)), "msgValue42"); + Assert.assertEquals(((GenericMapData) output.get(1).getMap(4)).get("key2"), "value2"); + Assert.assertEquals(((GenericMapData) output.get(1).getMap(4)).get("value"), null); + Assert.assertEquals(new String(((GenericRowData) output.get(1).getRow(5, 3)).getBinary(0)), "msgValue42"); + Assert.assertEquals(((GenericRowData) output.get(1).getRow(5, 3)).getLong(1), 1713243918002L); + Assert.assertEquals(((GenericRowData) output.get(1).getRow(5, 3)).getMap(2).size(), 1); + Assert.assertEquals(output.get(1).getArray(6).size(), 2); + } +}