Skip to content

Commit

Permalink
[FLINK-8630] [formats] Add proper support for JSON formats
Browse files Browse the repository at this point in the history
This closes #5491.
  • Loading branch information
twalthr committed Feb 15, 2018
1 parent 942649e commit 6c09c05
Show file tree
Hide file tree
Showing 33 changed files with 1,751 additions and 437 deletions.
16 changes: 16 additions & 0 deletions flink-connectors/flink-connector-kafka-0.10/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ under the License.
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-json. -->
<optional>true</optional>
</dependency>

<!-- test dependencies -->

<dependency>
Expand All @@ -100,6 +108,14 @@ under the License.
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.types.Row;

import java.util.Properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.types.Row;

/**
Expand Down
16 changes: 16 additions & 0 deletions flink-connectors/flink-connector-kafka-0.11/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ under the License.
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-json. -->
<optional>true</optional>
</dependency>

<!-- test dependencies -->

<dependency>
Expand All @@ -109,6 +117,14 @@ under the License.
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.types.Row;

/**
Expand Down
16 changes: 16 additions & 0 deletions flink-connectors/flink-connector-kafka-0.8/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ under the License.
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-json. -->
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -143,6 +151,14 @@ under the License.
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.types.Row;

import java.util.Properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.types.Row;

/**
Expand Down
16 changes: 16 additions & 0 deletions flink-connectors/flink-connector-kafka-0.9/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ under the License.
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-json. -->
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand All @@ -96,6 +104,14 @@ under the License.
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.types.Row;

import java.util.Properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.types.Row;

/**
Expand Down
16 changes: 16 additions & 0 deletions flink-connectors/flink-connector-kafka-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ under the License.
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-json. -->
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -182,6 +190,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.types.Row;

import java.util.Properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.DefinedFieldMapping;
import org.apache.flink.table.sources.StreamTableSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,17 @@
package org.apache.flink.streaming.util.serialization;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import java.io.IOException;
import org.apache.flink.formats.json.JsonNodeDeserializationSchema;

/**
* DeserializationSchema that deserializes a JSON String into an ObjectNode.
*
* <p>Fields can be accessed by calling objectNode.get(&lt;name>).as(&lt;type>)
*
* @deprecated Please use {@link JsonNodeDeserializationSchema} in the "flink-json" module.
*/
@PublicEvolving
public class JSONDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> {

private static final long serialVersionUID = -1699854177598621044L;

private ObjectMapper mapper;

@Override
public ObjectNode deserialize(byte[] message) throws IOException {
if (mapper == null) {
mapper = new ObjectMapper();
}
return mapper.readValue(message, ObjectNode.class);
}

@Override
public boolean isEndOfStream(ObjectNode nextElement) {
return false;
}

@Deprecated
public class JSONDeserializationSchema extends JsonNodeDeserializationSchema {
// delegate everything to the parent class
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,8 @@
package org.apache.flink.streaming.util.serialization;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;

/**
* Deserialization schema from JSON to {@link Row}.
Expand All @@ -37,89 +29,25 @@
* the specified fields.
*
* <p>Failure during deserialization are forwarded as wrapped IOExceptions.
*
* @deprecated Please use {@link org.apache.flink.formats.json.JsonRowDeserializationSchema} in
* the "flink-json" module.
*/
@PublicEvolving
public class JsonRowDeserializationSchema implements DeserializationSchema<Row> {

private static final long serialVersionUID = -228294330688809195L;

/** Type information describing the result type. */
private final TypeInformation<Row> typeInfo;

/** Field names to parse. Indices match fieldTypes indices. */
private final String[] fieldNames;

/** Types to parse fields as. Indices match fieldNames indices. */
private final TypeInformation<?>[] fieldTypes;

/** Object mapper for parsing the JSON. */
private final ObjectMapper objectMapper = new ObjectMapper();

/** Flag indicating whether to fail on a missing field. */
private boolean failOnMissingField;
@Deprecated
public class JsonRowDeserializationSchema extends org.apache.flink.formats.json.JsonRowDeserializationSchema {

/**
* Creates a JSON deserialization schema for the given fields and types.
*
* @param typeInfo Type information describing the result type. The field names are used
* to parse the JSON file and so are the types.
*/
public JsonRowDeserializationSchema(TypeInformation<Row> typeInfo) {
Preconditions.checkNotNull(typeInfo, "Type information");
this.typeInfo = typeInfo;

this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
}

@Override
public Row deserialize(byte[] message) throws IOException {
try {
JsonNode root = objectMapper.readTree(message);

Row row = new Row(fieldNames.length);
for (int i = 0; i < fieldNames.length; i++) {
JsonNode node = root.get(fieldNames[i]);

if (node == null) {
if (failOnMissingField) {
throw new IllegalStateException("Failed to find field with name '"
+ fieldNames[i] + "'.");
} else {
row.setField(i, null);
}
} else {
// Read the value as specified type
Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
row.setField(i, value);
}
}

return row;
} catch (Throwable t) {
throw new IOException("Failed to deserialize JSON object.", t);
}
}

@Override
public boolean isEndOfStream(Row nextElement) {
return false;
}

@Override
public TypeInformation<Row> getProducedType() {
return typeInfo;
}

/**
* Configures the failure behaviour if a JSON field is missing.
*
* <p>By default, a missing field is ignored and the field is set to null.
*
* @param failOnMissingField Flag indicating whether to fail or not on a missing field.
* @deprecated Please use {@link org.apache.flink.formats.json.JsonRowDeserializationSchema} in
* the "flink-json" module.
*/
public void setFailOnMissingField(boolean failOnMissingField) {
this.failOnMissingField = failOnMissingField;
@Deprecated
public JsonRowDeserializationSchema(TypeInformation<Row> typeInfo) {
super(typeInfo);
}

}
Loading

0 comments on commit 6c09c05

Please sign in to comment.