From 772480190af29aefa0f8495deee140c1a26ca71a Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 1 Dec 2017 12:15:10 +0100 Subject: [PATCH 1/2] [hotfix] [tests] Let MarshallingTestBases read the value from String value --- .../rest/messages/RestRequestMarshallingTestBase.java | 5 ++--- .../rest/messages/RestResponseMarshallingTestBase.java | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestRequestMarshallingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestRequestMarshallingTestBase.java index 460cebfbc1467..eacf201508f31 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestRequestMarshallingTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestRequestMarshallingTestBase.java @@ -21,7 +21,6 @@ import org.apache.flink.runtime.rest.util.RestMapperUtils; import org.apache.flink.util.TestLogger; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.junit.Assert; @@ -54,9 +53,9 @@ public void testJsonMarshalling() throws Exception { final R expected = getTestRequestInstance(); ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); - JsonNode json = objectMapper.valueToTree(expected); + final String marshalled = objectMapper.writeValueAsString(expected); - final R unmarshalled = objectMapper.treeToValue(json, getTestRequestClass()); + final R unmarshalled = objectMapper.readValue(marshalled, getTestRequestClass()); Assert.assertEquals(expected, unmarshalled); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java index 2604bd13183d8..6e1b532b4ee09 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java @@ -21,7 +21,6 @@ import org.apache.flink.runtime.rest.util.RestMapperUtils; import org.apache.flink.util.TestLogger; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.junit.Assert; @@ -54,9 +53,9 @@ public void testJsonMarshalling() throws Exception { final R expected = getTestResponseInstance(); ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); - JsonNode json = objectMapper.valueToTree(expected); + final String marshalled = objectMapper.writeValueAsString(expected); - final R unmarshalled = objectMapper.treeToValue(json, getTestResponseClass()); + final R unmarshalled = objectMapper.readValue(marshalled, getTestResponseClass()); assertOriginalEqualsToUnmarshalled(expected, unmarshalled); } From e23c7a1d535a783a0d1386688a16609938d26692 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 1 Dec 2017 12:17:50 +0100 Subject: [PATCH 2/2] [FLINK-8141] [flip6] Fix JsonPlan serialization in JobDetailsInfo The JsonPlan in JobDetailInfo must be serialized as a raw value to make it parsable for downstream applications. --- .../rest/handler/job/JobDetailsHandler.java | 2 +- .../rest/messages/job/JobDetailsInfo.java | 5 ++- .../messages/json/RawJsonDeserializer.java | 45 +++++++++++++++++++ .../rest/messages/job/JobDetailsInfoTest.java | 2 +- 4 files changed, 51 insertions(+), 3 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/RawJsonDeserializer.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java index 0c0ee18cc2103..647763a2c269f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java @@ -126,7 +126,7 @@ protected JobDetailsInfo handleRequest( executionGraph.getJsonPlan()); } - public static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo( + private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo( AccessExecutionJobVertex ejv, long now, JobID jobId, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java index e4d04d5e1777b..551913f621dab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java @@ -27,10 +27,12 @@ import org.apache.flink.runtime.rest.messages.json.JobIDSerializer; import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer; import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer; +import org.apache.flink.runtime.rest.messages.json.RawJsonDeserializer; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonRawValue; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; @@ -103,6 +105,7 @@ public class JobDetailsInfo implements ResponseBody { private final Map jobVerticesPerState; @JsonProperty(FIELD_NAME_JSON_PLAN) + @JsonRawValue private final String jsonPlan; @JsonCreator @@ -118,7 +121,7 @@ public JobDetailsInfo( @JsonProperty(FIELD_NAME_TIMESTAMPS) Map timestamps, @JsonProperty(FIELD_NAME_JOB_VERTEX_INFOS) Collection jobVertexInfos, @JsonProperty(FIELD_NAME_JOB_VERTICES_PER_STATE) Map jobVerticesPerState, - @JsonProperty(FIELD_NAME_JSON_PLAN) String jsonPlan) { + @JsonProperty(FIELD_NAME_JSON_PLAN) @JsonDeserialize(using = RawJsonDeserializer.class) String jsonPlan) { this.jobId = Preconditions.checkNotNull(jobId); this.name = Preconditions.checkNotNull(name); this.isStoppable = isStoppable; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/RawJsonDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/RawJsonDeserializer.java new file mode 100644 index 0000000000000..c42391718c92f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/RawJsonDeserializer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.json; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; + +import java.io.IOException; + +/** + * Json deserializer which deserializes raw json. + */ +public final class RawJsonDeserializer extends StdDeserializer { + + private static final long serialVersionUID = -4089499607872996396L; + + protected RawJsonDeserializer() { + super(String.class); + } + + @Override + public String deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + final JsonNode jsonNode = ctxt.readValue(p, JsonNode.class); + + return jsonNode.toString(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java index 5e2e09d64ace7..aec86747d2285 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java @@ -44,7 +44,7 @@ protected Class getTestResponseClass() { protected JobDetailsInfo getTestResponseInstance() throws Exception { final Random random = new Random(); final int numJobVertexDetailsInfos = 4; - final String jsonPlan = "{id: \"1234\"}"; + final String jsonPlan = "{\"id\":\"1234\"}"; final Map timestamps = new HashMap<>(JobStatus.values().length); final Collection jobVertexInfos = new ArrayList<>(numJobVertexDetailsInfos);