From 945762030ba8e268ea79e34011d81c48f0f704bb Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Thu, 5 Jul 2018 11:28:35 +0200 Subject: [PATCH] [hotfix][filesystem] Remove incorrect equals methods in StreamWriters --- .../connectors/fs/AvroKeyValueSinkWriter.java | 25 +------- .../connectors/fs/SequenceFileWriter.java | 36 ++++------- .../connectors/fs/StreamWriterBase.java | 22 +------ .../streaming/connectors/fs/StringWriter.java | 25 +------- .../fs/AvroKeyValueSinkWriterTest.java | 8 +-- .../connectors/fs/RollingSinkITCase.java | 2 +- .../connectors/fs/SequenceFileWriterTest.java | 8 +-- .../fs/StreamWriterBaseComparator.java | 60 +++++++++++++++++++ .../connectors/fs/StringWriterTest.java | 8 +-- .../fs/bucketing/BucketingSinkTest.java | 3 +- 10 files changed, 93 insertions(+), 104 deletions(-) create mode 100644 flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java index 6b2f7d625a110..0f73e8c5c0bf3 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java @@ -40,7 +40,6 @@ import java.io.OutputStream; import java.util.Arrays; import java.util.Map; -import java.util.Objects; /** * Implementation of AvroKeyValue writer that can be used in Sink. @@ -204,7 +203,7 @@ public void setInputType(TypeInformation type, ExecutionConfig executionConfi } @Override - public Writer> duplicate() { + public AvroKeyValueSinkWriter duplicate() { return new AvroKeyValueSinkWriter<>(this); } @@ -335,25 +334,7 @@ public static Schema getSchema(Schema keySchema, Schema valueSchema) { } } - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), properties); - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (other == null) { - return false; - } - if (getClass() != other.getClass()) { - return false; - } - AvroKeyValueSinkWriter writer = (AvroKeyValueSinkWriter) other; - // field comparison - return Objects.equals(properties, writer.properties) - && super.equals(other); + Map getProperties() { + return properties; } } diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java index 2f42ef74c5211..17b16dd8b4611 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java @@ -34,7 +34,6 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory; import java.io.IOException; -import java.util.Objects; /** * A {@link Writer} that writes the bucket files as Hadoop {@link SequenceFile SequenceFiles}. @@ -152,32 +151,23 @@ public void setInputType(TypeInformation type, ExecutionConfig executionConfi } @Override - public Writer> duplicate() { + public SequenceFileWriter duplicate() { return new SequenceFileWriter<>(this); } - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), compressionCodecName, compressionType, keyClass, valueClass); + String getCompressionCodecName() { + return compressionCodecName; } - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (other == null) { - return false; - } - if (getClass() != other.getClass()) { - return false; - } - SequenceFileWriter writer = (SequenceFileWriter) other; - // field comparison - return Objects.equals(compressionCodecName, writer.compressionCodecName) - && Objects.equals(compressionType, writer.compressionType) - && Objects.equals(keyClass, writer.keyClass) - && Objects.equals(valueClass, writer.valueClass) - && super.equals(other); + SequenceFile.CompressionType getCompressionType() { + return compressionType; + } + + Class getKeyClass() { + return keyClass; + } + + Class getValueClass() { + return valueClass; } } diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java index f625ef38905c7..d3035a56622ed 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java @@ -23,7 +23,6 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; -import java.util.Objects; /** * Base class for {@link Writer Writers} that write to a {@link FSDataOutputStream}. @@ -102,24 +101,7 @@ public void close() throws IOException { } } - @Override - public int hashCode() { - return Boolean.hashCode(syncOnFlush); - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (other == null) { - return false; - } - if (getClass() != other.getClass()) { - return false; - } - StreamWriterBase writer = (StreamWriterBase) other; - // field comparison - return Objects.equals(syncOnFlush, writer.syncOnFlush); + public boolean isSyncOnFlush() { + return syncOnFlush; } } diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java index 5c81b15bf1a0e..122bc7fc195c8 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java @@ -26,7 +26,6 @@ import java.nio.charset.Charset; import java.nio.charset.IllegalCharsetNameException; import java.nio.charset.UnsupportedCharsetException; -import java.util.Objects; /** * A {@link Writer} that uses {@code toString()} on the input elements and writes them to @@ -87,29 +86,11 @@ public void write(T element) throws IOException { } @Override - public Writer duplicate() { + public StringWriter duplicate() { return new StringWriter<>(this); } - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), charsetName); - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (other == null) { - return false; - } - if (getClass() != other.getClass()) { - return false; - } - StringWriter writer = (StringWriter) other; - // field comparison - return Objects.equals(charsetName, writer.charsetName) - && super.equals(other); + String getCharsetName() { + return charsetName; } } diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java index 019e56d66bae4..864d9c1b6fa57 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java @@ -18,8 +18,6 @@ package org.apache.flink.streaming.connectors.fs; -import org.apache.flink.api.java.tuple.Tuple2; - import org.apache.avro.Schema; import org.apache.avro.file.DataFileConstants; import org.junit.Test; @@ -47,11 +45,11 @@ public void testDuplicate() { AvroKeyValueSinkWriter writer = new AvroKeyValueSinkWriter(properties); writer.setSyncOnFlush(true); - Writer> other = writer.duplicate(); + AvroKeyValueSinkWriter other = writer.duplicate(); - assertTrue(writer.equals(other)); + assertTrue(StreamWriterBaseComparator.equals(writer, other)); writer.setSyncOnFlush(false); - assertFalse(writer.equals(other)); + assertFalse(StreamWriterBaseComparator.equals(writer, other)); } } diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java index 93f6d5225927a..86821a5dd2f6c 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java @@ -930,7 +930,7 @@ public void open(FileSystem fs, Path path) throws IOException { } @Override - public Writer duplicate() { + public StreamWriterWithConfigCheck duplicate() { return new StreamWriterWithConfigCheck<>(key, expect); } } diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java index 7ea22649d4970..44716d3b4fe38 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java @@ -18,8 +18,6 @@ package org.apache.flink.streaming.connectors.fs; -import org.apache.flink.api.java.tuple.Tuple2; - import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.junit.Test; @@ -36,11 +34,11 @@ public class SequenceFileWriterTest { public void testDuplicate() { SequenceFileWriter writer = new SequenceFileWriter("BZ", SequenceFile.CompressionType.BLOCK); writer.setSyncOnFlush(true); - Writer> other = writer.duplicate(); + SequenceFileWriter other = writer.duplicate(); - assertTrue(writer.equals(other)); + assertTrue(StreamWriterBaseComparator.equals(writer, other)); writer.setSyncOnFlush(false); - assertFalse(writer.equals(other)); + assertFalse(StreamWriterBaseComparator.equals(writer, other)); } } diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java new file mode 100644 index 0000000000000..9472c29ee454a --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.fs; + +import org.apache.hadoop.io.Writable; + +import java.util.Objects; + +/** + * Helper class to perform partial comparisons of {@link StreamWriterBase} instances. During comparisons + * it ignores changes in underlying output streams. + */ +public class StreamWriterBaseComparator { + + public static boolean equals( + StreamWriterBase writer1, + StreamWriterBase writer2) { + return Objects.equals(writer1.isSyncOnFlush(), writer2.isSyncOnFlush()); + } + + public static boolean equals( + AvroKeyValueSinkWriter writer1, + AvroKeyValueSinkWriter writer2) { + return equals((StreamWriterBase) writer1, (StreamWriterBase) writer2) && + Objects.equals(writer1.getProperties(), writer2.getProperties()); + } + + public static boolean equals( + SequenceFileWriter writer1, + SequenceFileWriter writer2) { + return equals((StreamWriterBase) writer1, (StreamWriterBase) writer2) && + Objects.equals(writer1.getCompressionCodecName(), writer2.getCompressionCodecName()) && + Objects.equals(writer1.getCompressionType(), writer2.getCompressionType()) && + Objects.equals(writer1.getKeyClass(), writer2.getKeyClass()) && + Objects.equals(writer1.getValueClass(), writer2.getValueClass()); + } + + public static boolean equals( + StringWriter writer1, + StringWriter writer2) { + return equals((StreamWriterBase) writer1, (StreamWriterBase) writer2) && + Objects.equals(writer1.getCharsetName(), writer2.getCharsetName()); + } +} diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java index 488f860c32773..7009d9467fb89 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java @@ -34,12 +34,12 @@ public class StringWriterTest { public void testDuplicate() { StringWriter writer = new StringWriter(StandardCharsets.UTF_16.name()); writer.setSyncOnFlush(true); - Writer other = writer.duplicate(); + StringWriter other = writer.duplicate(); - assertTrue(writer.equals(other)); + assertTrue(StreamWriterBaseComparator.equals(writer, other)); writer.setSyncOnFlush(false); - assertFalse(writer.equals(other)); - assertFalse(writer.equals(new StringWriter<>())); + assertFalse(StreamWriterBaseComparator.equals(writer, other)); + assertFalse(StreamWriterBaseComparator.equals(writer, new StringWriter<>())); } } diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java index 362c07812deec..dc84846c96c26 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java @@ -28,7 +28,6 @@ import org.apache.flink.streaming.connectors.fs.Clock; import org.apache.flink.streaming.connectors.fs.SequenceFileWriter; import org.apache.flink.streaming.connectors.fs.StringWriter; -import org.apache.flink.streaming.connectors.fs.Writer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -918,7 +917,7 @@ public void open(FileSystem fs, Path path) throws IOException { } @Override - public Writer> duplicate() { + public StreamWriterWithConfigCheck duplicate() { return new StreamWriterWithConfigCheck<>(properties, key, expect); } }