Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-10119] [table] Add failure handlers for JsonRowDeserializationSchema #6571

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.descriptors.JsonValidator;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -62,6 +63,9 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
/** Flag indicating whether to fail on a missing field. */
private boolean failOnMissingField;

/** Failure handler that handles deserializing error. */
private String failureHandler;

/**
* Creates a JSON deserialization schema for the given type information.
*
Expand Down Expand Up @@ -94,7 +98,21 @@ public Row deserialize(byte[] message) throws IOException {
final JsonNode root = objectMapper.readTree(message);
return convertRow(root, (RowTypeInfo) typeInfo);
} catch (Throwable t) {
throw new IOException("Failed to deserialize JSON object.", t);
if (failureHandler != null && !failureHandler.equals(JsonValidator.FAILURE_HANDLER_FAIL)) {
switch (failureHandler) {
case JsonValidator.FAILURE_HANDLER_IGNORE:
return null;
case JsonValidator.FAILURE_HANDLER_ERROR_FIELD:
final int arity = typeInfo.getArity();
final Object[] array = new Object[arity + 1];
array[arity] = t.getMessage();
return Row.of(array);
default:
throw new IOException("Unable to recognize format.failure-handler: " + failureHandler);
}
} else {
throw new IOException("Failed to deserialize JSON object.", t);
}
}
}

Expand All @@ -119,6 +137,13 @@ public void setFailOnMissingField(boolean failOnMissingField) {
this.failOnMissingField = failOnMissingField;
}

/**
* Configures the failure handler that handles deserializing error.
*/
public void setFailureHandler(String failureHandler) {
this.failureHandler = failureHandler;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Expand Up @@ -50,6 +50,7 @@ protected List<String> supportedFormatProperties() {
properties.add(JsonValidator.FORMAT_JSON_SCHEMA);
properties.add(JsonValidator.FORMAT_SCHEMA);
properties.add(JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD);
properties.add(JsonValidator.FORMAT_FAILURE_HANDLER);
return properties;
}

Expand All @@ -62,6 +63,8 @@ public DeserializationSchema<Row> createDeserializationSchema(Map<String, String

descriptorProperties.getOptionalBoolean(JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD)
.ifPresent(schema::setFailOnMissingField);
descriptorProperties.getOptionalString(JsonValidator.FORMAT_FAILURE_HANDLER)
.ifPresent(schema::setFailureHandler);

return schema;
}
Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;

import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA;
import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_FAILURE_HANDLER;
import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD;
import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_JSON_SCHEMA;
import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_SCHEMA;
Expand All @@ -37,6 +38,7 @@
public class Json extends FormatDescriptor {

private Boolean failOnMissingField;
private String failureHandler;
private Boolean deriveSchema;
private String jsonSchema;
private String schema;
Expand All @@ -59,6 +61,15 @@ public Json failOnMissingField(boolean failOnMissingField) {
return this;
}

/**
* Sets failure handler for deserializing json messages.
* @param failureHandler see failureHandlers in {@link JsonValidator}.
*/
public Json failureHandler(String failureHandler) {
this.failureHandler = failureHandler;
return this;
}

/**
* Sets the JSON schema string with field names and the types according to the JSON schema
* specification [[http://json-schema.org/specification.html]].
Expand Down Expand Up @@ -128,6 +139,10 @@ protected Map<String, String> toFormatProperties() {
properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD, failOnMissingField);
}

if (failureHandler != null) {
properties.putString(FORMAT_FAILURE_HANDLER, failureHandler);
}

return properties.asMap();
}
}
Expand Up @@ -21,6 +21,9 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;

import java.util.Arrays;
import java.util.List;

/**
* Validator for {@link Json}.
*/
Expand All @@ -31,6 +34,16 @@ public class JsonValidator extends FormatDescriptorValidator {
public static final String FORMAT_SCHEMA = "format.schema";
public static final String FORMAT_JSON_SCHEMA = "format.json-schema";
public static final String FORMAT_FAIL_ON_MISSING_FIELD = "format.fail-on-missing-field";
public static final String FORMAT_FAILURE_HANDLER = "format.failure-handler";

public static final String FAILURE_HANDLER_FAIL = "fail";
public static final String FAILURE_HANDLER_IGNORE = "ignore";
public static final String FAILURE_HANDLER_ERROR_FIELD = "error-field";

private final List<String> failureHandlers = Arrays.asList(
FAILURE_HANDLER_FAIL,
FAILURE_HANDLER_IGNORE,
FAILURE_HANDLER_ERROR_FIELD);

@Override
public void validate(DescriptorProperties properties) {
Expand All @@ -52,6 +65,7 @@ public void validate(DescriptorProperties properties) {
properties.validateString(FORMAT_JSON_SCHEMA, false, 1);
}

properties.validateEnumValues(FORMAT_FAILURE_HANDLER, true, failureHandlers);
properties.validateBoolean(FORMAT_FAIL_ON_MISSING_FIELD, true);
}
}
Expand Up @@ -36,6 +36,8 @@

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

/**
* Tests for the {@link JsonRowDeserializationSchema}.
Expand Down Expand Up @@ -179,6 +181,41 @@ public void testMissingNode() throws Exception {
}
}

/**
* Test deserialization with failure handlers "ignore".
*/
@Test
public void testFailureHandlerIgnore() throws IOException {
final JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(
Types.ROW_NAMED(
new String[] { "name" },
Types.STRING)
);

deserializationSchema.setFailureHandler("ignore");
final Row row = deserializationSchema.deserialize("zxvdd".getBytes());
assertNull(row);
}

/**
* Test deserialization with failure handlers "error-field".
*/
@Test
public void testFailureHandlerErrorField() throws IOException {
final JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(
Types.ROW_NAMED(
new String[] { "name", "age" },
Types.STRING, Types.INT)
);
deserializationSchema.setFailureHandler("error-field");

final Row row = deserializationSchema.deserialize("zxvdd".getBytes());
assertEquals(3, row.getArity());
assertNull(row.getField(0));
assertNull(row.getField(1));
assertNotNull(row.getField(2));
}

/**
* Tests that number of field names and types has to match.
*/
Expand Down
Expand Up @@ -84,7 +84,8 @@ public void testJsonSchema() {
final Map<String, String> properties = toMap(
new Json()
.jsonSchema(JSON_SCHEMA)
.failOnMissingField(true));
.failOnMissingField(true)
.failureHandler("fail"));

testJsonSchemaSerializationSchema(properties);

Expand Down
Expand Up @@ -70,6 +70,11 @@ public void testDuplicateSchema() {
addPropertyAndVerify(descriptors().get(0), "format.schema", "DDD");
}

@Test(expected = ValidationException.class)
public void testUnknownFailureHandler() {
addPropertyAndVerify(descriptors().get(0), "format.failure-handler", "ABC");
}

// --------------------------------------------------------------------------------------------

@Override
Expand All @@ -83,7 +88,8 @@ public List<Descriptor> descriptors() {
Types.ROW(
new String[]{"test1", "test2"},
new TypeInformation[]{Types.STRING(), Types.SQL_TIMESTAMP()}))
.failOnMissingField(true);
.failOnMissingField(true)
.failureHandler("error-field");

final Descriptor desc4 = new Json().deriveSchema();

Expand All @@ -108,6 +114,7 @@ public List<Map<String, String>> properties() {
props3.put("format.property-version", "1");
props3.put("format.schema", "ROW<test1 VARCHAR, test2 TIMESTAMP>");
props3.put("format.fail-on-missing-field", "true");
props3.put("format.failure-handler", "error-field");

final Map<String, String> props4 = new HashMap<>();
props4.put("format.type", "json");
Expand Down