Skip to content
Permalink
Browse files
support use char like \x01 in flink-doris-sink column & line delimite…
…r (#6937)

* support use char like \x01 in flink-doris-sink column & line delimiter

* extend imports

* add docs
  • Loading branch information
wunan1210 committed Oct 29, 2021
1 parent 340ac7e commit bbfb03c41f1a230728fb3827571ccd0c727c34fe
Showing 3 changed files with 44 additions and 7 deletions.
@@ -93,4 +93,8 @@ public String toString() {
}

}

public String getErrorURL() {
return ErrorURL;
}
}
@@ -38,11 +38,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.flink.table.data.RowData.createFieldGetter;

@@ -62,9 +65,11 @@
private static final String FORMAT_KEY = "format";
private static final String FORMAT_JSON_VALUE = "json";
private static final String NULL_VALUE = "\\N";
private static final String ESCAPE_DELIMITERS_KEY = "escape_delimiters";
private static final String ESCAPE_DELIMITERS_DEFAULT = "false";

private final String fieldDelimiter;
private final String lineDelimiter;
private String fieldDelimiter;
private String lineDelimiter;
private final String[] fieldNames;
private final boolean jsonFormat;
private DorisOptions options;
@@ -88,10 +93,26 @@ public DorisDynamicOutputFormat(DorisOptions option,
this.options = option;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY,
FIELD_DELIMITER_DEFAULT);
this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT);

Properties streamLoadProp=executionOptions.getStreamLoadProp();

boolean ifEscape = Boolean.parseBoolean(streamLoadProp.getProperty(ESCAPE_DELIMITERS_KEY, ESCAPE_DELIMITERS_DEFAULT));
if (ifEscape) {
this.fieldDelimiter = escapeString(streamLoadProp.getProperty(FIELD_DELIMITER_KEY,
FIELD_DELIMITER_DEFAULT));
this.lineDelimiter = escapeString(streamLoadProp.getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT));

if (streamLoadProp.contains(ESCAPE_DELIMITERS_KEY)) {
streamLoadProp.remove(ESCAPE_DELIMITERS_KEY);
}
} else {
this.fieldDelimiter = streamLoadProp.getProperty(FIELD_DELIMITER_KEY,
FIELD_DELIMITER_DEFAULT);
this.lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT);
}

this.fieldNames = fieldNames;
this.jsonFormat = FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY));
this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
@@ -100,6 +121,17 @@ public DorisDynamicOutputFormat(DorisOptions option,
}
}

private String escapeString( String s) {
Pattern p = Pattern.compile("\\\\x(\\d{2})");
Matcher m = p.matcher(s);

StringBuffer buf = new StringBuffer();
while (m.find()) {
m.appendReplacement(buf, String.format("%s", (char) Integer.parseInt(m.group(1))));
}
m.appendTail(buf);
return buf.toString();
}

@Override
public void configure(Configuration configuration) {
@@ -94,7 +94,8 @@ public void load(String value) throws StreamLoadException {
try {
RespContent respContent = OBJECT_MAPPER.readValue(loadResponse.respContent, RespContent.class);
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
throw new StreamLoadException("stream load error: " + respContent.getMessage());
String errMsg=String.format("stream load error: %s, see more in %s",respContent.getMessage(),respContent.getErrorURL());
throw new StreamLoadException(errMsg);
}
} catch (IOException e) {
throw new StreamLoadException(e);

0 comments on commit bbfb03c

Please sign in to comment.