From 6f334b81fd17d80a46d62797e918a098406cf6a0 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Tue, 31 May 2016 18:50:35 +0200 Subject: [PATCH] [hotfix] Fix JSONDeserializationSchema for Kafka; ParameterTool usability --- .../org/apache/flink/api/java/utils/ParameterTool.java | 2 +- .../util/serialization/JSONDeserializationSchema.java | 10 ++-------- .../kafka/JSONDeserializationSchemaTest.java | 2 +- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java index 6be78e2b50cba..46c73879fb3df 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java @@ -152,7 +152,7 @@ else if(arg.startsWith("-")) { public static ParameterTool fromPropertiesFile(String path) throws IOException { File propertiesFile = new File(path); if(!propertiesFile.exists()) { - throw new FileNotFoundException("Properties file "+path+" does not exist"); + throw new FileNotFoundException("Properties file " + propertiesFile.getAbsolutePath() + " does not exist"); } Properties props = new Properties(); FileInputStream fis = new FileInputStream(propertiesFile); diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java index 49e9da85e69ae..d17005835fb58 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java @@ -18,22 +18,20 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.api.common.typeinfo.TypeInformation; import java.io.IOException; -import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass; /** * DeserializationSchema that deserializes a JSON String into an ObjectNode. *

* Fields can be accessed by calling objectNode.get(<name>).as(<type>) */ -public class JSONDeserializationSchema implements KeyedDeserializationSchema { +public class JSONDeserializationSchema extends AbstractDeserializationSchema { private ObjectMapper mapper; @Override - public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + public ObjectNode deserialize(byte[] message) throws IOException { if (mapper == null) { mapper = new ObjectMapper(); } @@ -45,8 +43,4 @@ public boolean isEndOfStream(ObjectNode nextElement) { return false; } - @Override - public TypeInformation getProducedType() { - return getForClass(ObjectNode.class); - } } diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java index f8e3fd1be8a2d..1882a7ed67ed9 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java @@ -33,7 +33,7 @@ public void testDeserialize() throws IOException { byte[] serializedValue = mapper.writeValueAsBytes(initialValue); JSONDeserializationSchema schema = new JSONDeserializationSchema(); - ObjectNode deserializedValue = schema.deserialize(null, serializedValue, "", 0, 0); + ObjectNode deserializedValue = schema.deserialize(serializedValue); Assert.assertEquals(4, deserializedValue.get("key").asInt()); Assert.assertEquals("world", deserializedValue.get("value").asText());