Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class PbNode {
private boolean isArray = false;
private int arrayIndex = -1;
private boolean isMap = false;
private boolean isMapType = false;
private String mapKey = "";
private FieldDescriptor mapKeyDesc;
private FieldDescriptor mapValueDesc;
Expand All @@ -60,6 +61,9 @@ public PbNode(Descriptors.Descriptor messageDesc, String nodeString, boolean isL
this.fieldDesc = messageDesc.findFieldByName(name);
if (this.fieldDesc.getJavaType() == JavaType.MESSAGE) {
this.messageType = this.fieldDesc.getMessageType();
if (isMapDescriptor(messageType)) {
this.isMapType = true;
}
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@

import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.DynamicMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -54,7 +59,7 @@ public class PbSourceData extends AbstractSourceData {

private List<DynamicMessage> childRoot;

private Charset srcCharset;
protected Charset srcCharset;

/**
* Constructor
Expand Down Expand Up @@ -105,8 +110,8 @@ public int getRowCount() {
* @return
*/
@Override
public String getField(int rowNum, String fieldName) {
String fieldValue = "";
public Object getField(int rowNum, String fieldName) {
Object fieldValue = "";
try {
if (isContextField(fieldName)) {
return getContextField(fieldName);
Expand All @@ -130,7 +135,7 @@ public String getField(int rowNum, String fieldName) {
* @param fieldName
* @return
*/
private String getRootField(String srcFieldName) {
private Object getRootField(String srcFieldName) {
List<PbNode> childNodes = this.columnNodeMap.get(srcFieldName);
if (childNodes == null) {
String fieldName = srcFieldName.substring(ROOT_KEY.length());
Expand All @@ -145,7 +150,7 @@ private String getRootField(String srcFieldName) {
return "";
}
// parse other node
String fieldValue = this.getNodeValue(childNodes, root);
Object fieldValue = this.getNodeValue(childNodes, root);
return fieldValue;
}

Expand All @@ -155,7 +160,7 @@ private String getRootField(String srcFieldName) {
* @param srcFieldName
* @return
*/
private String getChildField(int rowNum, String srcFieldName) {
private Object getChildField(int rowNum, String srcFieldName) {
if (this.childRoot == null || this.childDesc == null) {
return "";
}
Expand All @@ -174,7 +179,7 @@ private String getChildField(int rowNum, String srcFieldName) {
}
// parse other node
DynamicMessage child = childRoot.get(rowNum);
String fieldValue = this.getNodeValue(childNodes, child);
Object fieldValue = this.getNodeValue(childNodes, child);
return fieldValue;
}

Expand All @@ -184,9 +189,8 @@ private String getChildField(int rowNum, String srcFieldName) {
* @param root
* @return
*/
@SuppressWarnings("rawtypes")
private String getNodeValue(List<PbNode> childNodes, DynamicMessage root) {
String fieldValue = "";
@SuppressWarnings({"rawtypes", "unchecked"})
private Object getNodeValue(List<PbNode> childNodes, DynamicMessage root) {
DynamicMessage current = root;
for (int i = 0; i < childNodes.size(); i++) {
PbNode node = childNodes.get(i);
Expand All @@ -195,62 +199,120 @@ private String getNodeValue(List<PbNode> childNodes, DynamicMessage root) {
// error data
break;
}
if (node.isLastNode()) {
switch (node.getFieldDesc().getJavaType()) {
case STRING:
case INT:
case LONG:
case FLOAT:
case DOUBLE:
case BOOLEAN:
fieldValue = String.valueOf(nodeValue);
break;
case BYTE_STRING:
ByteString byteString = (ByteString) nodeValue;
fieldValue = new String(byteString.toByteArray(), srcCharset);
break;
case ENUM:
fieldValue = String.valueOf(nodeValue);
break;
case MESSAGE:
if (node.isArray()) {
fieldValue = String.valueOf(((List) nodeValue).get(node.getArrayIndex()));
} else if (node.isMap()) {
List<DynamicMessage> nodeValueList = (List<DynamicMessage>) nodeValue;
for (DynamicMessage subnodeValue : nodeValueList) {
String keyValue = String.valueOf(subnodeValue.getField(node.getMapKeyDesc()));
if (StringUtils.equals(keyValue, node.getMapKey())) {
fieldValue = String.valueOf(subnodeValue.getField(node.getMapValueDesc()));
break;
}
}
} else {
fieldValue = String.valueOf(nodeValue);
if (!node.isLastNode()) {
if (node.isArray()) {
current = (DynamicMessage) ((List) nodeValue).get(node.getArrayIndex());
} else if (node.isMap()) {
List<DynamicMessage> nodeValueList = (List<DynamicMessage>) nodeValue;
DynamicMessage newCurrent = null;
for (DynamicMessage subnodeValue : nodeValueList) {
String keyValue = String.valueOf(subnodeValue.getField(node.getMapKeyDesc()));
if (StringUtils.equals(keyValue, node.getMapKey())) {
newCurrent = (DynamicMessage) subnodeValue.getField(node.getMapValueDesc());
break;
}
break;
}
if (newCurrent == null) {
return null;
}
current = newCurrent;
} else {
current = (DynamicMessage) nodeValue;
}
break;
continue;
}
// last node
if (node.isArray()) {
current = (DynamicMessage) ((List) nodeValue).get(node.getArrayIndex());
return buildStructData(node.getMessageType(), ((List) nodeValue).get(node.getArrayIndex()));
} else if (node.isMap()) {
List<DynamicMessage> nodeValueList = (List<DynamicMessage>) nodeValue;
DynamicMessage newCurrent = null;
Object fieldValue = null;
for (DynamicMessage subnodeValue : nodeValueList) {
String keyValue = String.valueOf(subnodeValue.getField(node.getMapKeyDesc()));
if (StringUtils.equals(keyValue, node.getMapKey())) {
newCurrent = (DynamicMessage) subnodeValue.getField(node.getMapValueDesc());
fieldValue = subnodeValue.getField(node.getMapValueDesc());
break;
}
}
if (newCurrent == null) {
return fieldValue;
return this.buildFieldValue(node.getFieldDesc(), fieldValue, false);
} else if (node.isMapType()) {
return this.buildStructData(node.getMessageType(), nodeValue);
} else if (node.getFieldDesc().isRepeated()) {
List<Object> valueList = (List) nodeValue;
List<Object> result = new ArrayList<>(valueList.size());
for (Object value : valueList) {
result.add(this.buildFieldValue(node.getFieldDesc(), value, false));
}
current = newCurrent;
return new GenericArrayData(result.toArray());
} else {
current = (DynamicMessage) nodeValue;
return this.buildFieldValue(node.getFieldDesc(), nodeValue, false);
}
}
return fieldValue;
return null;
}

@SuppressWarnings("unchecked")
private Object buildFieldValue(FieldDescriptor fieldDesc, Object nodeValue, boolean isRepeated) {
if (nodeValue == null) {
return null;
}
switch (fieldDesc.getJavaType()) {
case STRING:
case INT:
case LONG:
case FLOAT:
case DOUBLE:
case BOOLEAN:
case ENUM:
return nodeValue;
case BYTE_STRING:
return ((ByteString) nodeValue).toByteArray();
case MESSAGE: {
if (!isRepeated) {
return this.buildStructData(fieldDesc.getMessageType(), nodeValue);
} else if (PbNode.isMapDescriptor(fieldDesc.getMessageType())) {
return this.buildStructData(fieldDesc.getMessageType(), nodeValue);
}
List<DynamicMessage> valueList = (List<DynamicMessage>) nodeValue;
List<Object> result = new ArrayList<>(valueList.size());
for (DynamicMessage value : valueList) {
result.add(this.buildStructData(fieldDesc.getMessageType(), value));
}
return new GenericArrayData(result.toArray());
}
default:
return String.valueOf(nodeValue);
}
}

@SuppressWarnings("unchecked")
protected Object buildStructData(Descriptors.Descriptor messageType, Object nodeValue) {
// map
if (PbNode.isMapDescriptor(messageType)) {
Descriptors.FieldDescriptor keyField = messageType.findFieldByNumber(1);
Descriptors.FieldDescriptor valueField = messageType.findFieldByNumber(2);
List<DynamicMessage> subNodeValueList = (List<DynamicMessage>) nodeValue;
Map<Object, Object> result = new HashMap<>();
for (DynamicMessage subnodeValue : subNodeValueList) {
Object keyValue = buildFieldValue(keyField, subnodeValue.getField(keyField), false);
Object valueValue = buildFieldValue(valueField, subnodeValue.getField(valueField), false);
result.put(keyValue, valueValue);
}
return new GenericMapData(result);
}
// struct
DynamicMessage msgObj = (DynamicMessage) nodeValue;
GenericRowData result = new GenericRowData(messageType.getFields().size());
int index = 0;
for (FieldDescriptor fieldDesc : messageType.getFields()) {
Object fieldValue = msgObj.getField(fieldDesc);
if (fieldValue == null) {
result.setField(index++, null);
continue;
}
Object fieldResult = this.buildFieldValue(fieldDesc, fieldValue, false);
result.setField(index++, fieldResult);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public String encode(SinkData sinkData, Context context) {
sinkData.keyList().forEach(k -> builder.append(sinkData.getField(k)).append(delimiter));
} else {
for (String fieldName : sinkData.keyList()) {
String fieldValue = sinkData.getField(fieldName);
String fieldValue = formatFieldValue(sinkData.getField(fieldName));
if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) {
builder.append(fieldValue);
} else {
Expand All @@ -78,7 +78,7 @@ public String encode(SinkData sinkData, Context context) {
} else {
for (FieldInfo field : fields) {
String fieldName = field.getName();
String fieldValue = sinkData.getField(fieldName);
String fieldValue = formatFieldValue(sinkData.getField(fieldName));
EscapeUtils.escapeContent(builder, delimiter, escapeChar, fieldValue);
builder.append(delimiter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,20 @@

/**
* DefaultSinkData
*
*/
@Data
public class DefaultSinkData implements SinkData {

private List<String> keyList = new ArrayList<>();
private Map<String, String> currentRow = new HashMap<>();
private Map<String, Object> currentRow = new HashMap<>();

/**
* addField
* @param fieldName
* @param fieldValue
*/
@Override
public void addField(String fieldName, String fieldValue) {
public void addField(String fieldName, Object fieldValue) {
this.keyList.add(fieldName);
this.currentRow.put(fieldName, fieldValue);
}
Expand All @@ -51,7 +50,7 @@ public void addField(String fieldName, String fieldValue) {
* @return
*/
@Override
public String getField(String fieldName) {
public Object getField(String fieldName) {
return this.currentRow.getOrDefault(fieldName, "");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public String encode(SinkData sinkData, Context context) {
builder.delete(0, builder.length());
if (fields == null || fields.size() == 0) {
for (String fieldName : sinkData.keyList()) {
String fieldValue = sinkData.getField(fieldName);
String fieldValue = formatFieldValue(sinkData.getField(fieldName));
if (StringUtils.equals(fieldName, ALL_SOURCE_FIELD_SIGN)) {
builder.append(fieldValue).append(entryDelimiter);
} else {
Expand All @@ -70,7 +70,7 @@ public String encode(SinkData sinkData, Context context) {
} else {
for (FieldInfo field : fields) {
String fieldName = field.getName();
String fieldValue = sinkData.getField(fieldName);
String fieldValue = formatFieldValue(sinkData.getField(fieldName));
builder.append(fieldName).append(kvDelimiter).append(fieldValue).append(entryDelimiter);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Map<String, Object> encode(SinkData sinkData, Context context) {
Map<String, Object> esMap = new HashMap<>();
for (FieldInfo fieldInfo : fields) {
String fieldName = fieldInfo.getName();
String strValue = sinkData.getField(fieldName);
String strValue = formatFieldValue(sinkData.getField(fieldName));
TypeConverter converter = converters.get(fieldName);
if (converter == null) {
esMap.put(fieldName, strValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public ByteArrayOutputStream encode(SinkData sinkData, Context context) {
Object[] rowsInfo = new Object[size];
Arrays.fill(rowsInfo, "");
for (int i = 0; i < size; i++) {
String fieldData = sinkData.getField(this.fields.get(i).getName());
String fieldData = formatFieldValue(sinkData.getField(this.fields.get(i).getName()));
if (fieldData == null) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public byte[] encode(SinkData sinkData, Context context) {
for (String key : sinkData.keyList()) {
Descriptors.FieldDescriptor fieldDescriptor = dynamicDescriptor.findFieldByName(key);
if (fieldDescriptor != null) {
String fieldValue = sinkData.getField(key);
String fieldValue = formatFieldValue(sinkData.getField(key));
if (fieldValue != null) {
Object value = convertValue(fieldDescriptor, fieldValue);
dynamicBuilder.setField(fieldDescriptor, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ public RowDataSinkEncoder(RowDataSinkInfo sinkInfo) {
@Override
public RowData encode(SinkData sinkData, Context context) {
GenericRowData rowData = new GenericRowData(fieldToRowDataConverters.length);

for (int i = 0; i < fields.size(); i++) {
String fieldName = fields.get(i).getName();
String fieldValue = sinkData.getField(fieldName);
Object fieldValue = sinkData.getField(fieldName);
rowData.setField(i, fieldToRowDataConverters[i].convert(fieldValue));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
*/
public interface SinkData {

void addField(String fieldName, String fieldValue);
void addField(String fieldName, Object fieldValue);

String getField(String fieldName);
Object getField(String fieldName);

List<String> keyList();
}
Loading
Loading