Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-16627][json]Support ignore null fields when serializing into JSON #24430

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/table/formats/debezium.md
Expand Up @@ -424,6 +424,13 @@ Flink 提供了 `debezium-avro-confluent` 和 `debezium-json` 两种 format 来
<td>Boolean</td>
<td>将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:<code>0.000000027</code> 默认会表示为 <code>2.7E-8</code>。当此选项设为 true 时,则会表示为 <code>0.000000027</code>。</td>
</tr>
<tr>
<td><h5>debezium-json.encode.ignore-null-fields</h5></td>
<td>选填</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。</td>
</tr>
</tbody>
</table>

Expand Down
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/table/formats/json.md
Expand Up @@ -135,6 +135,13 @@ Format 参数
<td>Boolean</td>
<td>将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:<code>0.000000027</code> 默认会表示为 <code>2.7E-8</code>。当此选项设为 true 时,则会表示为 <code>0.000000027</code>。</td>
</tr>
<tr>
<td><h5>json.encode.ignore-null-fields</h5></td>
<td>选填</td>
<td style="word-wrap: break-word;">false</td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this option can be forwarded.

<td>Boolean</td>
<td>仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。</td>
</tr>
<tr>
<td><h5>decode.json-parser.enabled</h5></td>
<td>选填</td>
Expand Down
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/table/formats/maxwell.md
Expand Up @@ -251,6 +251,13 @@ Format Options
<td>Boolean</td>
<td>Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default, and will be written as <code>0.000000027</code> if set this option to true.</td>
</tr>
<tr>
<td><h5>maxwell-json.encode.ignore-null-fields</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Encode only non-null fields. By default, all fields will be included.</td>
</tr>
</tbody>
</table>

Expand Down
9 changes: 8 additions & 1 deletion docs/content.zh/docs/connectors/table/formats/ogg.md
Expand Up @@ -216,7 +216,7 @@ Format Options
<td>当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。</td>
</tr>
<tr>
<td><h5>debezium-json.timestamp-format.standard</h5></td>
<td><h5>ogg-json.timestamp-format.standard</h5></td>
<td>可选</td>
<td style="word-wrap: break-word;"><code>'SQL'</code></td>
<td>String</td>
Expand Down Expand Up @@ -247,6 +247,13 @@ Format Options
<td>String</td>
<td>当 <code>'ogg-json.map-null-key.mode'</code> 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。</td>
</tr>
<tr>
<td><h5>ogg-json.encode.ignore-null-fields</h5></td>
<td>选填</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。</td>
</tr>
</tbody>
</table>

Expand Down
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/table/formats/debezium.md
Expand Up @@ -445,6 +445,13 @@ Use format `debezium-avro-confluent` to interpret Debezium Avro messages and for
<td>Boolean</td>
<td>Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default, and will be written as <code>0.000000027</code> if set this option to true.</td>
</tr>
<tr>
<td><h5>debezium-json.encode.ignore-null-fields</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Encode only non-null fields. By default, all fields will be included.</td>
</tr>
</tbody>
</table>

Expand Down
8 changes: 8 additions & 0 deletions docs/content/docs/connectors/table/formats/json.md
Expand Up @@ -146,6 +146,14 @@ Format Options
<td>Boolean</td>
<td>Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default, and will be written as <code>0.000000027</code> if set this option to true.</td>
</tr>
<tr>
<td><h5>json.encode.ignore-null-fields</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Encode only non-null fields. By default, all fields will be included.</td>
</tr>
<tr>
<td><h5>decode.json-parser.enabled</h5></td>
<td>optional</td>
Expand Down
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/table/formats/maxwell.md
Expand Up @@ -251,6 +251,13 @@ Format Options
<td>Boolean</td>
<td>Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default, and will be written as <code>0.000000027</code> if set this option to true.</td>
</tr>
<tr>
<td><h5>maxwell-json.encode.ignore-null-fields</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Encode only non-null fields. By default, all fields will be included.</td>
</tr>
</tbody>
</table>

Expand Down
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/table/formats/ogg.md
Expand Up @@ -260,6 +260,13 @@ Format Options
<td>String</td>
<td>Specify string literal to replace null key when <code>'ogg-json.map-null-key.mode'</code> is LITERAL.</td>
</tr>
<tr>
<td><h5>ogg-json.encode.ignore-null-fields</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Encode only non-null fields. By default, all fields will be included.</td>
</tr>
</tbody>
</table>

Expand Down
Expand Up @@ -108,7 +108,7 @@ public class ThriftObjectConversions {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final RowDataToJsonConverters TO_JSON_CONVERTERS =
new RowDataToJsonConverters(
TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "null");
TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", false);
private static final Map<String, TableKind> TABLE_TYPE_MAPPINGS = buildTableTypeMapping();

// --------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -46,6 +46,7 @@

import static org.apache.flink.formats.json.JsonFormatOptions.DECODE_JSON_PARSER_ENABLED;
import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
import static org.apache.flink.formats.json.JsonFormatOptions.FAIL_ON_MISSING_FIELD;
import static org.apache.flink.formats.json.JsonFormatOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.json.JsonFormatOptions.MAP_NULL_KEY_LITERAL;
Expand Down Expand Up @@ -147,6 +148,7 @@ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(

final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);

return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
Expand All @@ -158,7 +160,8 @@ public SerializationSchema<RowData> createRuntimeEncoder(
timestampOption,
mapNullKeyMode,
mapNullKeyLiteral,
encodeDecimalAsPlainNumber);
encodeDecimalAsPlainNumber,
ignoreNullFields);
}

@Override
Expand Down Expand Up @@ -187,6 +190,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(MAP_NULL_KEY_MODE);
options.add(MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
options.add(ENCODE_IGNORE_NULL_FIELDS);
return options;
}

Expand All @@ -197,6 +201,7 @@ public Set<ConfigOption<?>> forwardOptions() {
options.add(MAP_NULL_KEY_MODE);
options.add(MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
options.add(ENCODE_IGNORE_NULL_FIELDS);
return options;
}
}
Expand Up @@ -73,6 +73,13 @@ public class JsonFormatOptions {
.withDescription(
"Optional flag to specify whether to encode all decimals as plain numbers instead of possible scientific notations, false by default.");

public static final ConfigOption<Boolean> ENCODE_IGNORE_NULL_FIELDS =
ConfigOptions.key("encode.ignore-null-fields")
.booleanType()
.defaultValue(false)
.withDescription(
"Optional flag to specify whether to ignore null fields when encoding, false by default.");

public static final ConfigOption<Boolean> DECODE_JSON_PARSER_ENABLED =
ConfigOptions.key("decode.json-parser.enabled")
.booleanType()
Expand Down
Expand Up @@ -68,19 +68,28 @@ public class JsonRowDataSerializationSchema implements SerializationSchema<RowDa
/** Flag indicating whether to serialize all decimals as plain numbers. */
private final boolean encodeDecimalAsPlainNumber;

/** Flag indicating whether to ignore null fields. */
private final boolean ignoreNullFields;

public JsonRowDataSerializationSchema(
RowType rowType,
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
boolean encodeDecimalAsPlainNumber) {
boolean encodeDecimalAsPlainNumber,
boolean ignoreNullFields) {
this.rowType = rowType;
this.timestampFormat = timestampFormat;
this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral;
this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
this.ignoreNullFields = ignoreNullFields;
this.runtimeConverter =
new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, mapNullKeyLiteral)
new RowDataToJsonConverters(
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
ignoreNullFields)
.createConverter(rowType);
}

Expand All @@ -95,7 +104,7 @@ public void open(InitializationContext context) throws Exception {

@Override
public byte[] serialize(RowData row) {
if (node == null) {
if (node == null || ignoreNullFields) {
node = mapper.createObjectNode();
}

Expand All @@ -120,7 +129,8 @@ public boolean equals(Object o) {
&& timestampFormat.equals(that.timestampFormat)
&& mapNullKeyMode.equals(that.mapNullKeyMode)
&& mapNullKeyLiteral.equals(that.mapNullKeyLiteral)
&& encodeDecimalAsPlainNumber == that.encodeDecimalAsPlainNumber;
&& encodeDecimalAsPlainNumber == that.encodeDecimalAsPlainNumber
&& ignoreNullFields == that.ignoreNullFields;
}

@Override
Expand All @@ -130,6 +140,7 @@ public int hashCode() {
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
encodeDecimalAsPlainNumber);
encodeDecimalAsPlainNumber,
ignoreNullFields);
}
}
Expand Up @@ -68,13 +68,18 @@ public class RowDataToJsonConverters implements Serializable {
/** The string literal when handling mode for map null key LITERAL. is */
private final String mapNullKeyLiteral;

/** Flag indicating whether to ignore null fields. */
private final boolean ignoreNullFields;

public RowDataToJsonConverters(
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral) {
String mapNullKeyLiteral,
boolean ignoreNullFields) {
this.timestampFormat = timestampFormat;
this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral;
this.ignoreNullFields = ignoreNullFields;
}

/**
Expand Down Expand Up @@ -331,9 +336,11 @@ private RowDataToJsonConverter createRowConverter(RowType type) {
String fieldName = fieldNames[i];
try {
Object field = fieldGetters[i].getFieldOrNull(row);
node.set(
fieldName,
fieldConverters[i].convert(mapper, node.get(fieldName), field));
if (field != null || !ignoreNullFields) {
node.set(
fieldName,
fieldConverters[i].convert(mapper, node.get(fieldName), field));
}
} catch (Throwable t) {
throw new RuntimeException(
String.format("Fail to serialize at field: %s.", fieldName), t);
Expand Down
Expand Up @@ -44,6 +44,7 @@
import java.util.Set;

import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
import static org.apache.flink.formats.json.canal.CanalJsonFormatOptions.DATABASE_INCLUDE;
import static org.apache.flink.formats.json.canal.CanalJsonFormatOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.json.canal.CanalJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
Expand Down Expand Up @@ -91,6 +92,8 @@ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);

final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);

return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
public ChangelogMode getChangelogMode() {
Expand All @@ -111,7 +114,8 @@ public SerializationSchema<RowData> createRuntimeEncoder(
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
encodeDecimalAsPlainNumber);
encodeDecimalAsPlainNumber,
ignoreNullFields);
}
};
}
Expand All @@ -136,6 +140,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(JSON_MAP_NULL_KEY_MODE);
options.add(JSON_MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
options.add(ENCODE_IGNORE_NULL_FIELDS);
return options;
}

Expand Down
Expand Up @@ -59,14 +59,16 @@ public CanalJsonSerializationSchema(
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
boolean encodeDecimalAsPlainNumber) {
boolean encodeDecimalAsPlainNumber,
boolean ignoreNullFields) {
jsonSerializer =
new JsonRowDataSerializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
encodeDecimalAsPlainNumber);
encodeDecimalAsPlainNumber,
ignoreNullFields);
}

@Override
Expand Down
Expand Up @@ -45,6 +45,7 @@
import java.util.Set;

import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_MODE;
Expand Down Expand Up @@ -92,6 +93,7 @@ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(

final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);

return new EncodingFormat<SerializationSchema<RowData>>() {

Expand All @@ -114,7 +116,8 @@ public SerializationSchema<RowData> createRuntimeEncoder(
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
encodeDecimalAsPlainNumber);
encodeDecimalAsPlainNumber,
ignoreNullFields);
}
};
}
Expand All @@ -138,6 +141,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(JSON_MAP_NULL_KEY_MODE);
options.add(JSON_MAP_NULL_KEY_LITERAL);
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
options.add(ENCODE_IGNORE_NULL_FIELDS);
return options;
}

Expand Down
Expand Up @@ -56,14 +56,16 @@ public DebeziumJsonSerializationSchema(
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
boolean encodeDecimalAsPlainNumber) {
boolean encodeDecimalAsPlainNumber,
boolean ignoreNullFields) {
jsonSerializer =
new JsonRowDataSerializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
encodeDecimalAsPlainNumber);
encodeDecimalAsPlainNumber,
ignoreNullFields);
}

@Override
Expand Down