Skip to content

Commit

Permalink
[INLONG-10323][Sort] Support Kv deserialization format in sort module
Browse files Browse the repository at this point in the history
  • Loading branch information
EMsnap committed Jun 4, 2024
1 parent ca0983f commit 88a122d
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
@JsonSubTypes.Type(value = CanalJsonFormat.class, name = "canalJsonFormat"),
@JsonSubTypes.Type(value = CsvFormat.class, name = "csvFormat"),
@JsonSubTypes.Type(value = InLongMsgFormat.class, name = "inLongMsgFormat"),
@JsonSubTypes.Type(value = RawFormat.class, name = "rawFormat")
@JsonSubTypes.Type(value = RawFormat.class, name = "rawFormat"),
@JsonSubTypes.Type(value = KvFormat.class, name = "kvFormat")
})
public interface Format extends Serializable {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.protocol.node.format;

import lombok.Data;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.Map;

import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_CHARSET;
import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ESCAPE_CHARACTER;
import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS;
import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_ENTRY_DELIMITER;
import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL;
import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_QUOTE_CHARACTER;

@JsonTypeName("kvFormat")
@Data
public class KvFormat implements Format {

private static final String IDENTIFIER = "inlong-kv";

@JsonProperty(value = "entryDelimiter", defaultValue = "&")
private final String entryDelimiter;

@JsonProperty(value = "kvDelimiter", defaultValue = "=")
private final String kvDelimiter;

@JsonProperty(value = "ignoreParseErrors", defaultValue = "false")
private final String ignoreParseErrors;

@JsonProperty(value = "escapeChar")
private final String escapeChar;

@JsonProperty(value = "charset")
private final String charset;

@JsonProperty(value = "nullLiteral")
private final String NULL_LITERAL;

@JsonProperty(value = "quoteCharacter")
private final String QUOTE_CHARACTER;

@JsonCreator
public KvFormat(@JsonProperty(value = "entryDelimiter") String entryDelimiter,
@JsonProperty(value = "kvDelimiter") String kvDelimiter,
@Nullable @JsonProperty(value = "escapeChar") String escapeChar,
@Nullable @JsonProperty(value = "ignoreParseErrors", defaultValue = "false") String ignoreParseErrors,
@Nullable @JsonProperty(value = "charset") String charset,
@Nullable @JsonProperty(value = "nullLiteral") String nullLiteral,
@Nullable @JsonProperty(value = "quoteCharacter") String quoteCharacter) {
this.entryDelimiter = entryDelimiter;
this.kvDelimiter = kvDelimiter;
this.escapeChar = escapeChar;
this.ignoreParseErrors = ignoreParseErrors;
this.charset = charset;
this.NULL_LITERAL = nullLiteral;
this.QUOTE_CHARACTER = quoteCharacter;
}

@Override
public String getFormat() {
return IDENTIFIER;
}

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Map<String, String> generateOptions() {
Map<String, String> options = new HashMap<>(16);

options.put("format", getFormat());
options.put(FORMAT_KV_DELIMITER, this.kvDelimiter);
options.put(FORMAT_KV_ENTRY_DELIMITER, this.entryDelimiter);

if (ObjectUtils.isNotEmpty(this.charset)) {
options.put(FORMAT_CHARSET, this.charset);
}
if (ObjectUtils.isNotEmpty(this.NULL_LITERAL)) {
options.put(FORMAT_NULL_LITERAL, this.NULL_LITERAL);
}
if (ObjectUtils.isNotEmpty(this.QUOTE_CHARACTER)) {
options.put(FORMAT_QUOTE_CHARACTER, this.QUOTE_CHARACTER);
}
if (ObjectUtils.isNotEmpty(this.escapeChar)) {
options.put(FORMAT_ESCAPE_CHARACTER, this.escapeChar);
}
if (ObjectUtils.isNotEmpty(this.ignoreParseErrors)) {
options.put(FORMAT_IGNORE_ERRORS, this.ignoreParseErrors);
}

return options;
}

}
2 changes: 1 addition & 1 deletion inlong-sort/sort-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-format-kv</artifactId>
<artifactId>sort-format-rowdata-kv</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,22 @@ public static RowFormatInfo deriveRowFormatInfo(
return new RowFormatInfo(rowType.getFieldNames().toArray(new String[1]), fieldFormatInfos);
}

public static RowFormatInfo deriveRowFormatInfo(DataType dataType) {

RowType rowType = (RowType) dataType.getLogicalType();
int size = rowType.getFields().size();
FormatInfo[] fieldFormatInfos = new FormatInfo[size];
String[] fieldNames = new String[size];

for (int i = 0; i < size; i++) {
LogicalType fieldType = rowType.getTypeAt(i);
fieldFormatInfos[i] = deriveFormatInfo(fieldType);
fieldNames[i] = rowType.getFieldNames().get(i);
}

return new RowFormatInfo(fieldNames, fieldFormatInfos);
}

public static RowFormatInfo deserializeRowFormatInfo(String rowFormatInfoStr) {
try {
FormatInfo formatInfo = FormatUtils.demarshall(rowFormatInfoStr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils;
import org.apache.inlong.sort.formats.base.TableFormatOptions;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
import org.apache.inlong.sort.formats.base.TextFormatOptions;

import org.apache.commons.lang3.StringEscapeUtils;
Expand Down Expand Up @@ -104,10 +105,9 @@ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
public DeserializationSchema<RowData> createRuntimeDecoder(
DynamicTableSource.Context context,
DataType dataType) {

KvRowDataDeserializationSchema.Builder schemaBuilder =
new KvRowDataDeserializationSchema.Builder(
deserializeRowFormatInfo(formatOptions.get(ROW_FORMAT_INFO)),
TableFormatUtils.deriveRowFormatInfo(dataType),
context.createTypeInformation(dataType));
configureDeserializationSchema(formatOptions, schemaBuilder);
return schemaBuilder.build();
Expand All @@ -128,7 +128,6 @@ public String factoryIdentifier() {
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(ROW_FORMAT_INFO);
options.add(TextFormatOptions.KV_ENTRY_DELIMITER);
options.add(TextFormatOptions.KV_DELIMITER);
options.add(TextFormatOptions.CHARSET);
Expand All @@ -138,6 +137,7 @@ public Set<ConfigOption<?>> requiredOptions() {
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(ROW_FORMAT_INFO);
options.add(TextFormatOptions.ESCAPE_CHARACTER);
options.add(TextFormatOptions.QUOTE_CHARACTER);
options.add(TextFormatOptions.NULL_LITERAL);
Expand Down

0 comments on commit 88a122d

Please sign in to comment.