Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8630] [table] To support JSON schema to TypeInformation conversion #5491

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions flink-connectors/flink-connector-kafka-0.10/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ under the License.
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-json. -->
<optional>true</optional>
</dependency>

<!-- test dependencies -->

<dependency>
Expand All @@ -100,6 +108,14 @@ under the License.
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.types.Row;

import java.util.Properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.types.Row;

/**
Expand Down
16 changes: 16 additions & 0 deletions flink-connectors/flink-connector-kafka-0.11/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ under the License.
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-json. -->
<optional>true</optional>
</dependency>

<!-- test dependencies -->

<dependency>
Expand All @@ -109,6 +117,14 @@ under the License.
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.types.Row;

/**
Expand Down
16 changes: 16 additions & 0 deletions flink-connectors/flink-connector-kafka-0.8/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ under the License.
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-json. -->
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -143,6 +151,14 @@ under the License.
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.types.Row;

import java.util.Properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.types.Row;

/**
Expand Down
16 changes: 16 additions & 0 deletions flink-connectors/flink-connector-kafka-0.9/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ under the License.
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-json. -->
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand All @@ -96,6 +104,14 @@ under the License.
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.types.Row;

import java.util.Properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.types.Row;

/**
Expand Down
16 changes: 16 additions & 0 deletions flink-connectors/flink-connector-kafka-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ under the License.
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-json. -->
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -182,6 +190,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.types.Row;

import java.util.Properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.DefinedFieldMapping;
import org.apache.flink.table.sources.StreamTableSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,17 @@
package org.apache.flink.streaming.util.serialization;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import java.io.IOException;
import org.apache.flink.formats.json.JsonNodeDeserializationSchema;

/**
* DeserializationSchema that deserializes a JSON String into an ObjectNode.
*
* <p>Fields can be accessed by calling objectNode.get(&lt;name>).as(&lt;type>)
*
* @deprecated Please use {@link JsonNodeDeserializationSchema} in the "flink-json" module.
*/
@PublicEvolving
public class JSONDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> {

private static final long serialVersionUID = -1699854177598621044L;

private ObjectMapper mapper;

@Override
public ObjectNode deserialize(byte[] message) throws IOException {
if (mapper == null) {
mapper = new ObjectMapper();
}
return mapper.readValue(message, ObjectNode.class);
}

@Override
public boolean isEndOfStream(ObjectNode nextElement) {
return false;
}

@Deprecated
public class JSONDeserializationSchema extends JsonNodeDeserializationSchema {
// delegate everything to the parent class
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,8 @@
package org.apache.flink.streaming.util.serialization;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;

/**
* Deserialization schema from JSON to {@link Row}.
Expand All @@ -37,89 +29,25 @@
* the specified fields.
*
* <p>Failure during deserialization are forwarded as wrapped IOExceptions.
*
* @deprecated Please use {@link org.apache.flink.formats.json.JsonRowDeserializationSchema} in
* the "flink-json" module.
*/
@PublicEvolving
public class JsonRowDeserializationSchema implements DeserializationSchema<Row> {

private static final long serialVersionUID = -228294330688809195L;

/** Type information describing the result type. */
private final TypeInformation<Row> typeInfo;

/** Field names to parse. Indices match fieldTypes indices. */
private final String[] fieldNames;

/** Types to parse fields as. Indices match fieldNames indices. */
private final TypeInformation<?>[] fieldTypes;

/** Object mapper for parsing the JSON. */
private final ObjectMapper objectMapper = new ObjectMapper();

/** Flag indicating whether to fail on a missing field. */
private boolean failOnMissingField;
@Deprecated
public class JsonRowDeserializationSchema extends org.apache.flink.formats.json.JsonRowDeserializationSchema {

/**
* Creates a JSON deserialization schema for the given fields and types.
*
* @param typeInfo Type information describing the result type. The field names are used
* to parse the JSON file and so are the types.
*/
public JsonRowDeserializationSchema(TypeInformation<Row> typeInfo) {
Preconditions.checkNotNull(typeInfo, "Type information");
this.typeInfo = typeInfo;

this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
}

@Override
public Row deserialize(byte[] message) throws IOException {
try {
JsonNode root = objectMapper.readTree(message);

Row row = new Row(fieldNames.length);
for (int i = 0; i < fieldNames.length; i++) {
JsonNode node = root.get(fieldNames[i]);

if (node == null) {
if (failOnMissingField) {
throw new IllegalStateException("Failed to find field with name '"
+ fieldNames[i] + "'.");
} else {
row.setField(i, null);
}
} else {
// Read the value as specified type
Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
row.setField(i, value);
}
}

return row;
} catch (Throwable t) {
throw new IOException("Failed to deserialize JSON object.", t);
}
}

@Override
public boolean isEndOfStream(Row nextElement) {
return false;
}

@Override
public TypeInformation<Row> getProducedType() {
return typeInfo;
}

/**
* Configures the failure behaviour if a JSON field is missing.
*
* <p>By default, a missing field is ignored and the field is set to null.
*
* @param failOnMissingField Flag indicating whether to fail or not on a missing field.
* @deprecated Please use {@link org.apache.flink.formats.json.JsonRowDeserializationSchema} in
* the "flink-json" module.
*/
public void setFailOnMissingField(boolean failOnMissingField) {
this.failOnMissingField = failOnMissingField;
@Deprecated
public JsonRowDeserializationSchema(TypeInformation<Row> typeInfo) {
super(typeInfo);
}

}
Loading