From 85f855418f0b1998237bde4eee81ebdbbaea6041 Mon Sep 17 00:00:00 2001 From: djhworld Date: Fri, 8 Sep 2017 22:09:14 +0100 Subject: [PATCH 1/7] BEAM-2807 -> Fixed NPE error on CoderTypeSerializerConfigSnapshot serialization --- .../types/CoderTypeSerializer.java | 3 +- .../types/CoderTypeSerializerTest.java | 46 +++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java index ecfd3fb4f66e..bf203fcfb293 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java @@ -161,7 +161,8 @@ public CoderTypeSerializerConfigSnapshot() { } public CoderTypeSerializerConfigSnapshot(Coder coder) { - this.coderName = coder.getClass().getCanonicalName(); + this.coderName = coder.getClass().getName(); + System.out.println(this.coderName); } @Override diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java new file mode 100644 index 000000000000..51f18c8c422b --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java @@ -0,0 +1,46 @@ +package org.apache.beam.runners.flink.translation.types; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public class CoderTypeSerializerTest { + + @Test + public void shouldBeAbleToWriteSnapshotForAnonymousClassCoder() throws Exception { + AtomicCoder anonymousClassCoder = new AtomicCoder() { + + @Override public void encode(String value, OutputStream outStream) + throws CoderException, IOException { + + } + + @Override public String decode(InputStream inStream) throws CoderException, IOException { + return ""; + } + }; + + CoderTypeSerializer serializer = new CoderTypeSerializer<>(anonymousClassCoder); + + TypeSerializerConfigSnapshot configSnapshot = serializer.snapshotConfiguration(); + configSnapshot.write(new ComparatorTestBase.TestOutputView()); + } + + @Test + public void shouldBeAbleToWriteSnapshotForConcreteClassCoder() throws Exception { //passes + Coder concreteClassCoder = StringUtf8Coder.of(); + CoderTypeSerializer coderTypeSerializer = new CoderTypeSerializer<>(concreteClassCoder); + TypeSerializerConfigSnapshot typeSerializerConfigSnapshot = coderTypeSerializer + .snapshotConfiguration(); + typeSerializerConfigSnapshot.write(new ComparatorTestBase.TestOutputView()); + } +} + From adbb5d328743c45e9581b583e054390f611e7f98 Mon Sep 17 00:00:00 2001 From: djhworld Date: Fri, 8 Sep 2017 22:13:00 +0100 Subject: [PATCH 2/7] BEAM-2807 -> Removed system.out --- .../runners/flink/translation/types/CoderTypeSerializer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java index bf203fcfb293..c8dbac49d68c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java @@ -162,7 +162,6 @@ public CoderTypeSerializerConfigSnapshot() { public CoderTypeSerializerConfigSnapshot(Coder coder) { this.coderName = coder.getClass().getName(); - System.out.println(this.coderName); } @Override From 9f344676d9a6d7dba4a22de119286bf3cad115fc Mon Sep 17 00:00:00 2001 From: djhworld Date: Fri, 8 Sep 2017 22:18:33 +0100 Subject: [PATCH 3/7] BEAM-2807 -> Removed comment --- .../flink/translation/types/CoderTypeSerializerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java index 51f18c8c422b..9d765137cc98 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java @@ -35,7 +35,7 @@ public void shouldBeAbleToWriteSnapshotForAnonymousClassCoder() throws Exception } @Test - public void shouldBeAbleToWriteSnapshotForConcreteClassCoder() throws Exception { //passes + public void shouldBeAbleToWriteSnapshotForConcreteClassCoder() throws Exception { Coder concreteClassCoder = StringUtf8Coder.of(); CoderTypeSerializer coderTypeSerializer = new CoderTypeSerializer<>(concreteClassCoder); TypeSerializerConfigSnapshot typeSerializerConfigSnapshot = coderTypeSerializer From d3f0ca6724e714f6eccd105e53657578bab2c48c Mon Sep 17 00:00:00 2001 From: djhworld Date: Fri, 8 Sep 2017 22:26:54 +0100 Subject: [PATCH 4/7] Added license to top of test file --- .../types/CoderTypeSerializerTest.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java index 9d765137cc98..25a42b7a5c5d 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.flink.translation.types; import org.apache.beam.sdk.coders.AtomicCoder; From 87d1af0296fa98d595cfeb6d50d7050c3eabfa6e Mon Sep 17 00:00:00 2001 From: djhworld Date: Fri, 8 Sep 2017 23:32:23 +0100 Subject: [PATCH 5/7] BEAM-2807 -> Fixed checkstyle violations --- .../types/CoderTypeSerializerTest.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java index 25a42b7a5c5d..af65aa366594 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java @@ -17,6 +17,10 @@ */ package org.apache.beam.runners.flink.translation.types; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -25,14 +29,13 @@ import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.junit.Test; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +/** + * Tests CoderTypeSerializer. + */ public class CoderTypeSerializerTest { - @Test - public void shouldBeAbleToWriteSnapshotForAnonymousClassCoder() throws Exception { + @Test public void shouldBeAbleToWriteSnapshotForAnonymousClassCoder() throws Exception { AtomicCoder anonymousClassCoder = new AtomicCoder() { @Override public void encode(String value, OutputStream outStream) @@ -51,8 +54,7 @@ public void shouldBeAbleToWriteSnapshotForAnonymousClassCoder() throws Exception configSnapshot.write(new ComparatorTestBase.TestOutputView()); } - @Test - public void shouldBeAbleToWriteSnapshotForConcreteClassCoder() throws Exception { + @Test public void shouldBeAbleToWriteSnapshotForConcreteClassCoder() throws Exception { Coder concreteClassCoder = StringUtf8Coder.of(); CoderTypeSerializer coderTypeSerializer = new CoderTypeSerializer<>(concreteClassCoder); TypeSerializerConfigSnapshot typeSerializerConfigSnapshot = coderTypeSerializer From 9e337da5b89aadf32f4f85925b597d72995dd551 Mon Sep 17 00:00:00 2001 From: djhworld Date: Sat, 9 Sep 2017 07:38:23 +0100 Subject: [PATCH 6/7] Added link in Javadocs --- .../flink/translation/types/CoderTypeSerializerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java index af65aa366594..661c6ab48ff4 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java @@ -31,7 +31,7 @@ /** - * Tests CoderTypeSerializer. + * Tests {@link CoderTypeSerializer}. */ public class CoderTypeSerializerTest { From 1a6457afc7479a7f8a7daec7f4efe5c896c62f4b Mon Sep 17 00:00:00 2001 From: djhworld Date: Sat, 9 Sep 2017 12:20:02 +0100 Subject: [PATCH 7/7] Refactored test to test write and read snapshots - Additionally implemented code review fixes --- .../types/CoderTypeSerializerTest.java | 40 +++++++++++++------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java index 661c6ab48ff4..b0c40dee79e4 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java @@ -17,10 +17,14 @@ */ package org.apache.beam.runners.flink.translation.types; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.CoderTypeSerializerConfigSnapshot; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -29,37 +33,47 @@ import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.junit.Test; - /** * Tests {@link CoderTypeSerializer}. */ public class CoderTypeSerializerTest { - @Test public void shouldBeAbleToWriteSnapshotForAnonymousClassCoder() throws Exception { + @Test + public void shouldWriteAndReadSnapshotForAnonymousClassCoder() throws Exception { AtomicCoder anonymousClassCoder = new AtomicCoder() { - @Override public void encode(String value, OutputStream outStream) + @Override + public void encode(String value, OutputStream outStream) throws CoderException, IOException { } - @Override public String decode(InputStream inStream) throws CoderException, IOException { + @Override + public String decode(InputStream inStream) throws CoderException, IOException { return ""; } }; - CoderTypeSerializer serializer = new CoderTypeSerializer<>(anonymousClassCoder); - - TypeSerializerConfigSnapshot configSnapshot = serializer.snapshotConfiguration(); - configSnapshot.write(new ComparatorTestBase.TestOutputView()); + testWriteAndReadConfigSnapshot(anonymousClassCoder); } - @Test public void shouldBeAbleToWriteSnapshotForConcreteClassCoder() throws Exception { + @Test + public void shouldWriteAndReadSnapshotForConcreteClassCoder() throws Exception { Coder concreteClassCoder = StringUtf8Coder.of(); - CoderTypeSerializer coderTypeSerializer = new CoderTypeSerializer<>(concreteClassCoder); - TypeSerializerConfigSnapshot typeSerializerConfigSnapshot = coderTypeSerializer - .snapshotConfiguration(); - typeSerializerConfigSnapshot.write(new ComparatorTestBase.TestOutputView()); + testWriteAndReadConfigSnapshot(concreteClassCoder); + } + + private void testWriteAndReadConfigSnapshot(Coder coder) throws IOException { + CoderTypeSerializer serializer = new CoderTypeSerializer<>(coder); + + TypeSerializerConfigSnapshot writtenSnapshot = serializer.snapshotConfiguration(); + ComparatorTestBase.TestOutputView outView = new ComparatorTestBase.TestOutputView(); + writtenSnapshot.write(outView); + + TypeSerializerConfigSnapshot readSnapshot = new CoderTypeSerializerConfigSnapshot<>(); + readSnapshot.read(outView.getInputView()); + + assertThat(readSnapshot, is(writtenSnapshot)); } }