From c4e74b57c80fb5f2d22756535eeac60e428f1645 Mon Sep 17 00:00:00 2001 From: yanghua Date: Sat, 13 Oct 2018 11:48:51 +0800 Subject: [PATCH] [FLINK-9697] Rename KafkaTableSink to KafkaTableSinkBase --- .../connectors/kafka/Kafka010JsonTableSink.java | 6 +++--- .../connectors/kafka/Kafka010TableSink.java | 2 +- .../kafka/Kafka010JsonTableSinkTest.java | 4 ++-- .../connectors/kafka/Kafka011TableSink.java | 2 +- .../connectors/kafka/Kafka08JsonTableSink.java | 8 ++++---- .../connectors/kafka/Kafka08TableSink.java | 2 +- .../kafka/Kafka08JsonTableSinkTest.java | 4 ++-- .../connectors/kafka/Kafka09JsonTableSink.java | 8 ++++---- .../connectors/kafka/Kafka09TableSink.java | 2 +- .../kafka/Kafka09JsonTableSinkTest.java | 4 ++-- .../connectors/kafka/KafkaJsonTableSink.java | 4 ++-- ...fkaTableSink.java => KafkaTableSinkBase.java} | 16 ++++++++-------- ...Base.java => KafkaTableSinkBaseTestBase.java} | 14 +++++++------- 13 files changed, 38 insertions(+), 38 deletions(-) rename flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/{KafkaTableSink.java => KafkaTableSinkBase.java} (94%) rename flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/{KafkaTableSinkTestBase.java => KafkaTableSinkBaseTestBase.java} (90%) diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java index 8471908a9cf9a..b9a5350e61870 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java @@ -28,7 +28,7 @@ import java.util.Properties; /** - * Kafka 0.10 {@link KafkaTableSink} that serializes data in JSON format. + * Kafka 0.10 {@link KafkaTableSinkBase} that serializes data in JSON format. * * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for @@ -39,7 +39,7 @@ public class Kafka010JsonTableSink extends KafkaJsonTableSink { /** - * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10 + * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.10 * topic with fixed partition assignment. * *

Each parallel TableSink instance will write its rows to a single Kafka partition.

@@ -60,7 +60,7 @@ public Kafka010JsonTableSink(String topic, Properties properties) { } /** - * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10 + * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.10 * topic with custom partition assignment. * * @param topic topic in Kafka to which table is written diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java index 1d408b8ab5289..79aad7c98005c 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java @@ -31,7 +31,7 @@ * Kafka 0.10 table sink for writing data into Kafka. */ @Internal -public class Kafka010TableSink extends KafkaTableSink { +public class Kafka010TableSink extends KafkaTableSinkBase { public Kafka010TableSink( TableSchema schema, diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java index 9208f6583b877..4575f8763df61 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java @@ -32,10 +32,10 @@ * drop support for format-specific table sinks. */ @Deprecated -public class Kafka010JsonTableSinkTest extends KafkaTableSinkTestBase { +public class Kafka010JsonTableSinkTest extends KafkaTableSinkBaseTestBase { @Override - protected KafkaTableSink createTableSink( + protected KafkaTableSinkBase createTableSink( String topic, Properties properties, FlinkKafkaPartitioner partitioner) { diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java index 8d81a5b59a1c3..304b26ba10597 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java @@ -33,7 +33,7 @@ * Kafka 0.11 table sink for writing data into Kafka. */ @Internal -public class Kafka011TableSink extends KafkaTableSink { +public class Kafka011TableSink extends KafkaTableSinkBase { public Kafka011TableSink( TableSchema schema, diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java index 189a9fdf46bb9..b7474e23651f2 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java @@ -30,7 +30,7 @@ import java.util.Properties; /** - * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format. + * Kafka 0.8 {@link KafkaTableSinkBase} that serializes data in JSON format. * * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for @@ -41,7 +41,7 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink { /** - * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8 + * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.8 * topic with fixed partition assignment. * *

Each parallel TableSink instance will write its rows to a single Kafka partition.

@@ -62,7 +62,7 @@ public Kafka08JsonTableSink(String topic, Properties properties) { } /** - * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8 + * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.8 * topic with custom partition assignment. * * @param topic topic in Kafka to which table is written @@ -76,7 +76,7 @@ public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaParti } /** - * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8 + * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.8 * topic with custom partition assignment. * * @param topic topic in Kafka to which table is written diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java index 146cfc907390d..90c42588eaf9a 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java @@ -31,7 +31,7 @@ * Kafka 0.8 table sink for writing data into Kafka. */ @Internal -public class Kafka08TableSink extends KafkaTableSink { +public class Kafka08TableSink extends KafkaTableSinkBase { public Kafka08TableSink( TableSchema schema, diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java index fc46ad4c6ee50..aa5fa1609bd9c 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java @@ -32,10 +32,10 @@ * drop support for format-specific table sinks. */ @Deprecated -public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase { +public class Kafka08JsonTableSinkTest extends KafkaTableSinkBaseTestBase { @Override - protected KafkaTableSink createTableSink( + protected KafkaTableSinkBase createTableSink( String topic, Properties properties, FlinkKafkaPartitioner partitioner) { diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java index 336345900615e..cd27f8371c265 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java @@ -30,7 +30,7 @@ import java.util.Properties; /** - * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format. + * Kafka 0.9 {@link KafkaTableSinkBase} that serializes data in JSON format. * * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for @@ -41,7 +41,7 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink { /** - * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9 + * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.9 * topic with fixed partition assignment. * *

Each parallel TableSink instance will write its rows to a single Kafka partition.

@@ -62,7 +62,7 @@ public Kafka09JsonTableSink(String topic, Properties properties) { } /** - * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9 + * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.9 * topic with custom partition assignment. * * @param topic topic in Kafka to which table is written @@ -76,7 +76,7 @@ public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaParti } /** - * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9 + * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.9 * topic with custom partition assignment. * * @param topic topic in Kafka to which table is written diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java index 6e38aad1a3939..657d75a1c02fd 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java @@ -31,7 +31,7 @@ * Kafka 0.9 table sink for writing data into Kafka. */ @Internal -public class Kafka09TableSink extends KafkaTableSink { +public class Kafka09TableSink extends KafkaTableSinkBase { public Kafka09TableSink( TableSchema schema, diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java index 97b5c7d88a2c9..29cfa93ef2ed4 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java @@ -32,10 +32,10 @@ * drop support for format-specific table sinks. */ @Deprecated -public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase { +public class Kafka09JsonTableSinkTest extends KafkaTableSinkBaseTestBase { @Override - protected KafkaTableSink createTableSink( + protected KafkaTableSinkBase createTableSink( String topic, Properties properties, FlinkKafkaPartitioner partitioner) { diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java index 231edddb311cb..d84eb89104454 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java @@ -28,13 +28,13 @@ import java.util.Properties; /** - * Base class for {@link KafkaTableSink} that serializes data in JSON format. + * Base class for {@link KafkaTableSinkBase} that serializes data in JSON format. * * @deprecated Use table descriptors instead of implementation-specific classes. */ @Deprecated @Internal -public abstract class KafkaJsonTableSink extends KafkaTableSink { +public abstract class KafkaJsonTableSink extends KafkaTableSinkBase { /** * Creates KafkaJsonTableSink. diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java similarity index 94% rename from flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java rename to flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java index a85d536eac99b..acd10cc88551b 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java @@ -43,7 +43,7 @@ * override {@link #createKafkaProducer(String, Properties, SerializationSchema, Optional)}}. */ @Internal -public abstract class KafkaTableSink implements AppendStreamTableSink { +public abstract class KafkaTableSinkBase implements AppendStreamTableSink { // TODO make all attributes final and mandatory once we drop support for format-specific table sinks @@ -66,7 +66,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink { protected String[] fieldNames; protected TypeInformation[] fieldTypes; - protected KafkaTableSink( + protected KafkaTableSinkBase( TableSchema schema, String topic, Properties properties, @@ -81,7 +81,7 @@ protected KafkaTableSink( } /** - * Creates KafkaTableSink. + * Creates KafkaTableSinkBase. * * @param topic Kafka topic to write to. * @param properties Properties for the Kafka producer. @@ -89,7 +89,7 @@ protected KafkaTableSink( * @deprecated Use table descriptors instead of implementation-specific classes. */ @Deprecated - public KafkaTableSink( + public KafkaTableSinkBase( String topic, Properties properties, FlinkKafkaPartitioner partitioner) { @@ -133,7 +133,7 @@ protected SerializationSchema createSerializationSchema(RowTypeInfo rowSche * @return Deep copy of this sink */ @Deprecated - protected KafkaTableSink createCopy() { + protected KafkaTableSinkBase createCopy() { throw new UnsupportedOperationException("This method only exists for backwards compatibility."); } @@ -164,14 +164,14 @@ public TypeInformation[] getFieldTypes() { } @Override - public KafkaTableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { + public KafkaTableSinkBase configure(String[] fieldNames, TypeInformation[] fieldTypes) { if (schema.isPresent()) { // a fixed schema is defined so reconfiguration is not supported throw new UnsupportedOperationException("Reconfiguration of this sink is not supported."); } // legacy code - KafkaTableSink copy = createCopy(); + KafkaTableSinkBase copy = createCopy(); copy.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes"); Preconditions.checkArgument(fieldNames.length == fieldTypes.length, @@ -191,7 +191,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - KafkaTableSink that = (KafkaTableSink) o; + KafkaTableSinkBase that = (KafkaTableSinkBase) o; return Objects.equals(schema, that.schema) && Objects.equals(topic, that.topic) && Objects.equals(properties, that.properties) && diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBaseTestBase.java similarity index 90% rename from flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java rename to flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBaseTestBase.java index b4bb89dc04883..dbbb10f51bf9a 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBaseTestBase.java @@ -50,7 +50,7 @@ * drop support for format-specific table sinks. */ @Deprecated -public abstract class KafkaTableSinkTestBase { +public abstract class KafkaTableSinkBaseTestBase { private static final String TOPIC = "testTopic"; private static final String[] FIELD_NAMES = new String[] {"field1", "field2"}; @@ -64,7 +64,7 @@ public void testKafkaTableSink() { DataStream dataStream = mock(DataStream.class); when(dataStream.addSink(any(SinkFunction.class))).thenReturn(mock(DataStreamSink.class)); - KafkaTableSink kafkaTableSink = spy(createTableSink()); + KafkaTableSinkBase kafkaTableSink = spy(createTableSink()); kafkaTableSink.emitDataStream(dataStream); // verify correct producer class @@ -80,8 +80,8 @@ public void testKafkaTableSink() { @Test public void testConfiguration() { - KafkaTableSink kafkaTableSink = createTableSink(); - KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES); + KafkaTableSinkBase kafkaTableSink = createTableSink(); + KafkaTableSinkBase newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES); assertNotSame(kafkaTableSink, newKafkaTableSink); assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames()); @@ -89,7 +89,7 @@ public void testConfiguration() { assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType()); } - protected abstract KafkaTableSink createTableSink( + protected abstract KafkaTableSinkBase createTableSink( String topic, Properties properties, FlinkKafkaPartitioner partitioner); @@ -98,8 +98,8 @@ protected abstract KafkaTableSink createTableSink( protected abstract Class getProducerClass(); - private KafkaTableSink createTableSink() { - KafkaTableSink sink = createTableSink(TOPIC, PROPERTIES, PARTITIONER); + private KafkaTableSinkBase createTableSink() { + KafkaTableSinkBase sink = createTableSink(TOPIC, PROPERTIES, PARTITIONER); return sink.configure(FIELD_NAMES, FIELD_TYPES); }