Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions inlong-sdk/transform-sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ public class JsonNode {
private int arrayIndex = -1;

public JsonNode(String nodeString) {
int beginIndex = nodeString.indexOf('[');
int beginIndex = nodeString.indexOf('(');
if (beginIndex < 0) {
this.name = nodeString;
} else {
this.name = StringUtils.trim(nodeString.substring(0, beginIndex));
int endIndex = nodeString.lastIndexOf(']');
int endIndex = nodeString.lastIndexOf(')');
if (endIndex >= 0) {
this.isArray = true;
this.arrayIndex = NumberUtils.toInt(nodeString.substring(beginIndex + 1, endIndex), -1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.decode;

import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
import lombok.Data;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.List;

/**
* PbNode
*
*/
@Data
public class PbNode {

private String name;
private FieldDescriptor fieldDesc;
private Descriptors.Descriptor messageType;
private boolean isArray = false;
private int arrayIndex = -1;
private boolean isLastNode = false;

public PbNode(Descriptors.Descriptor messageDesc, String nodeString, boolean isLastNode) {
int beginIndex = nodeString.indexOf('(');
if (beginIndex < 0) {
this.name = nodeString;
this.fieldDesc = messageDesc.findFieldByName(name);
if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) {
this.messageType = this.fieldDesc.getMessageType();
}
} else {
this.name = StringUtils.trim(nodeString.substring(0, beginIndex));
this.fieldDesc = messageDesc.findFieldByName(name);
if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) {
this.messageType = this.fieldDesc.getMessageType();
}
int endIndex = nodeString.lastIndexOf(')');
if (endIndex >= 0) {
this.isArray = true;
this.arrayIndex = NumberUtils.toInt(nodeString.substring(beginIndex + 1, endIndex), -1);
if (this.arrayIndex < 0) {
this.arrayIndex = 0;
}
}
}
this.isLastNode = isLastNode;
}

/**
* parseNodePath
* @param rootDesc
* @param nodePath
* @return
*/
public static List<PbNode> parseNodePath(Descriptors.Descriptor rootDesc, String nodePath) {
if (StringUtils.isBlank(nodePath)) {
return null;
}
List<PbNode> nodes = new ArrayList<>();
String[] nodeStrings = nodePath.split("\\.");
int lastIndex = nodeStrings.length - 1;
Descriptors.Descriptor current = rootDesc;
for (int i = 0; i <= lastIndex; i++) {
if (current == null) {
return null;
}
String nodeString = nodeStrings[i];
PbNode pbNode = new PbNode(current, nodeString, (i == lastIndex));
current = pbNode.getMessageType();
nodes.add(pbNode);
}
return nodes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.decode;

import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* JsonSourceData
*
*/
public class PbSourceData implements SourceData {

private static final Logger LOG = LoggerFactory.getLogger(PbSourceData.class);

public static final String ROOT_KEY = "$root.";

public static final String CHILD_KEY = "$child.";

private Descriptors.Descriptor rootDesc;

private Descriptors.Descriptor childDesc;

private Map<String, List<PbNode>> columnNodeMap = new ConcurrentHashMap<>();

private DynamicMessage root;

private List<DynamicMessage> childRoot;

private Charset srcCharset;

/**
* Constructor
*/
public PbSourceData(DynamicMessage root, List<DynamicMessage> childRoot,
Descriptors.Descriptor rootDesc, Descriptors.Descriptor childDesc,
Map<String, List<PbNode>> columnNodeMap,
Charset srcCharset) {
this.root = root;
this.childRoot = childRoot;
this.rootDesc = rootDesc;
this.childDesc = childDesc;
this.columnNodeMap = columnNodeMap;
this.srcCharset = srcCharset;
}

/**
* Constructor
*/
public PbSourceData(DynamicMessage root,
Descriptors.Descriptor rootDesc,
Map<String, List<PbNode>> columnNodeMap,
Charset srcCharset) {
this.root = root;
this.rootDesc = rootDesc;
this.columnNodeMap = columnNodeMap;
this.srcCharset = srcCharset;
}

/**
* getRowCount
* @return
*/
@Override
public int getRowCount() {
if (this.childRoot == null) {
return 1;
} else {
return this.childRoot.size();
}
}

/**
* getField
* @param rowNum
* @param fieldName
* @return
*/
@Override
public String getField(int rowNum, String fieldName) {
String fieldValue = "";
try {
if (StringUtils.startsWith(fieldName, ROOT_KEY)) {
fieldValue = this.getRootField(fieldName);
} else if (StringUtils.startsWith(fieldName, CHILD_KEY)) {
if (childRoot != null && rowNum < childRoot.size()) {
fieldValue = this.getChildField(rowNum, fieldName);
}
}
return fieldValue;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
return fieldValue;
}

/**
* getRootField
* @param fieldName
* @return
*/
private String getRootField(String srcFieldName) {
List<PbNode> childNodes = this.columnNodeMap.get(srcFieldName);
if (childNodes == null) {
String fieldName = srcFieldName.substring(ROOT_KEY.length());
childNodes = PbNode.parseNodePath(rootDesc, fieldName);
if (childNodes == null) {
childNodes = new ArrayList<>();
}
this.columnNodeMap.put(srcFieldName, childNodes);
}
// error config
if (childNodes.size() == 0) {
return "";
}
// parse other node
String fieldValue = this.getNodeValue(childNodes, root);
return fieldValue;
}

/**
* getChildField
* @param rowNum
* @param srcFieldName
* @return
*/
private String getChildField(int rowNum, String srcFieldName) {
if (this.childRoot == null || this.childDesc == null) {
return "";
}
List<PbNode> childNodes = this.columnNodeMap.get(srcFieldName);
if (childNodes == null) {
String fieldName = srcFieldName.substring(CHILD_KEY.length());
childNodes = PbNode.parseNodePath(childDesc, fieldName);
if (childNodes == null) {
childNodes = new ArrayList<>();
}
this.columnNodeMap.put(srcFieldName, childNodes);
}
// error config
if (childNodes.size() == 0) {
return "";
}
// parse other node
DynamicMessage child = childRoot.get(rowNum);
String fieldValue = this.getNodeValue(childNodes, child);
return fieldValue;
}

/**
* getNodeValue
* @param childNodes
* @param root
* @return
*/
private String getNodeValue(List<PbNode> childNodes, DynamicMessage root) {
String fieldValue = "";
DynamicMessage current = root;
for (int i = 0; i < childNodes.size(); i++) {
PbNode node = childNodes.get(i);
Object nodeValue = current.getField(node.getFieldDesc());
if (nodeValue == null) {
// error data
break;
}
if (node.isLastNode()) {
switch (node.getFieldDesc().getJavaType()) {
case STRING:
case INT:
case LONG:
case FLOAT:
case DOUBLE:
case BOOLEAN:
fieldValue = String.valueOf(nodeValue);
break;
case BYTE_STRING:
ByteString byteString = (ByteString) nodeValue;
fieldValue = new String(byteString.toByteArray(), srcCharset);
break;
case ENUM:
fieldValue = String.valueOf(nodeValue);
break;
case MESSAGE:
fieldValue = String.valueOf(nodeValue);
break;
}
break;
}
if (!node.isArray()) {
if (!(nodeValue instanceof DynamicMessage)) {
// error data
break;
}
current = (DynamicMessage) nodeValue;
} else {
if (!(nodeValue instanceof List)) {
// error data
break;
}
List<?> nodeList = (List<?>) nodeValue;
if (node.getArrayIndex() >= nodeList.size()) {
// error data
break;
}
Object nodeElement = nodeList.get(node.getArrayIndex());
if (!(nodeElement instanceof DynamicMessage)) {
// error data
break;
}
current = (DynamicMessage) nodeElement;
}
}
return fieldValue;
}
}
Loading