Skip to content

Commit

Permalink
[FLINK-9697] Rename KafkaTableSink to KafkaTableSinkBase
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua authored and aljoscha committed Oct 16, 2018
1 parent 3a727c0 commit c4e74b5
Show file tree
Hide file tree
Showing 13 changed files with 38 additions and 38 deletions.
Expand Up @@ -28,7 +28,7 @@
import java.util.Properties;

/**
* Kafka 0.10 {@link KafkaTableSink} that serializes data in JSON format.
* Kafka 0.10 {@link KafkaTableSinkBase} that serializes data in JSON format.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
Expand All @@ -39,7 +39,7 @@
public class Kafka010JsonTableSink extends KafkaJsonTableSink {

/**
* Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10
* Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.10
* topic with fixed partition assignment.
*
* <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p>
Expand All @@ -60,7 +60,7 @@ public Kafka010JsonTableSink(String topic, Properties properties) {
}

/**
* Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10
* Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.10
* topic with custom partition assignment.
*
* @param topic topic in Kafka to which table is written
Expand Down
Expand Up @@ -31,7 +31,7 @@
* Kafka 0.10 table sink for writing data into Kafka.
*/
@Internal
public class Kafka010TableSink extends KafkaTableSink {
public class Kafka010TableSink extends KafkaTableSinkBase {

public Kafka010TableSink(
TableSchema schema,
Expand Down
Expand Up @@ -32,10 +32,10 @@
* drop support for format-specific table sinks.
*/
@Deprecated
public class Kafka010JsonTableSinkTest extends KafkaTableSinkTestBase {
public class Kafka010JsonTableSinkTest extends KafkaTableSinkBaseTestBase {

@Override
protected KafkaTableSink createTableSink(
protected KafkaTableSinkBase createTableSink(
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner) {
Expand Down
Expand Up @@ -33,7 +33,7 @@
* Kafka 0.11 table sink for writing data into Kafka.
*/
@Internal
public class Kafka011TableSink extends KafkaTableSink {
public class Kafka011TableSink extends KafkaTableSinkBase {

public Kafka011TableSink(
TableSchema schema,
Expand Down
Expand Up @@ -30,7 +30,7 @@
import java.util.Properties;

/**
* Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
* Kafka 0.8 {@link KafkaTableSinkBase} that serializes data in JSON format.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
Expand All @@ -41,7 +41,7 @@
public class Kafka08JsonTableSink extends KafkaJsonTableSink {

/**
* Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8
* Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.8
* topic with fixed partition assignment.
*
* <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p>
Expand All @@ -62,7 +62,7 @@ public Kafka08JsonTableSink(String topic, Properties properties) {
}

/**
* Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8
* Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.8
* topic with custom partition assignment.
*
* @param topic topic in Kafka to which table is written
Expand All @@ -76,7 +76,7 @@ public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaParti
}

/**
* Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8
* Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.8
* topic with custom partition assignment.
*
* @param topic topic in Kafka to which table is written
Expand Down
Expand Up @@ -31,7 +31,7 @@
* Kafka 0.8 table sink for writing data into Kafka.
*/
@Internal
public class Kafka08TableSink extends KafkaTableSink {
public class Kafka08TableSink extends KafkaTableSinkBase {

public Kafka08TableSink(
TableSchema schema,
Expand Down
Expand Up @@ -32,10 +32,10 @@
* drop support for format-specific table sinks.
*/
@Deprecated
public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
public class Kafka08JsonTableSinkTest extends KafkaTableSinkBaseTestBase {

@Override
protected KafkaTableSink createTableSink(
protected KafkaTableSinkBase createTableSink(
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner) {
Expand Down
Expand Up @@ -30,7 +30,7 @@
import java.util.Properties;

/**
* Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format.
* Kafka 0.9 {@link KafkaTableSinkBase} that serializes data in JSON format.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
Expand All @@ -41,7 +41,7 @@
public class Kafka09JsonTableSink extends KafkaJsonTableSink {

/**
* Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9
* Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.9
* topic with fixed partition assignment.
*
* <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p>
Expand All @@ -62,7 +62,7 @@ public Kafka09JsonTableSink(String topic, Properties properties) {
}

/**
* Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9
* Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.9
* topic with custom partition assignment.
*
* @param topic topic in Kafka to which table is written
Expand All @@ -76,7 +76,7 @@ public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaParti
}

/**
* Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9
* Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.9
* topic with custom partition assignment.
*
* @param topic topic in Kafka to which table is written
Expand Down
Expand Up @@ -31,7 +31,7 @@
* Kafka 0.9 table sink for writing data into Kafka.
*/
@Internal
public class Kafka09TableSink extends KafkaTableSink {
public class Kafka09TableSink extends KafkaTableSinkBase {

public Kafka09TableSink(
TableSchema schema,
Expand Down
Expand Up @@ -32,10 +32,10 @@
* drop support for format-specific table sinks.
*/
@Deprecated
public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
public class Kafka09JsonTableSinkTest extends KafkaTableSinkBaseTestBase {

@Override
protected KafkaTableSink createTableSink(
protected KafkaTableSinkBase createTableSink(
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner) {
Expand Down
Expand Up @@ -28,13 +28,13 @@
import java.util.Properties;

/**
* Base class for {@link KafkaTableSink} that serializes data in JSON format.
* Base class for {@link KafkaTableSinkBase} that serializes data in JSON format.
*
* @deprecated Use table descriptors instead of implementation-specific classes.
*/
@Deprecated
@Internal
public abstract class KafkaJsonTableSink extends KafkaTableSink {
public abstract class KafkaJsonTableSink extends KafkaTableSinkBase {

/**
* Creates KafkaJsonTableSink.
Expand Down
Expand Up @@ -43,7 +43,7 @@
* override {@link #createKafkaProducer(String, Properties, SerializationSchema, Optional)}}.
*/
@Internal
public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
public abstract class KafkaTableSinkBase implements AppendStreamTableSink<Row> {

// TODO make all attributes final and mandatory once we drop support for format-specific table sinks

Expand All @@ -66,7 +66,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
protected String[] fieldNames;
protected TypeInformation[] fieldTypes;

protected KafkaTableSink(
protected KafkaTableSinkBase(
TableSchema schema,
String topic,
Properties properties,
Expand All @@ -81,15 +81,15 @@ protected KafkaTableSink(
}

/**
* Creates KafkaTableSink.
* Creates KafkaTableSinkBase.
*
* @param topic Kafka topic to write to.
* @param properties Properties for the Kafka producer.
* @param partitioner Partitioner to select Kafka partition for each item
* @deprecated Use table descriptors instead of implementation-specific classes.
*/
@Deprecated
public KafkaTableSink(
public KafkaTableSinkBase(
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner) {
Expand Down Expand Up @@ -133,7 +133,7 @@ protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSche
* @return Deep copy of this sink
*/
@Deprecated
protected KafkaTableSink createCopy() {
protected KafkaTableSinkBase createCopy() {
throw new UnsupportedOperationException("This method only exists for backwards compatibility.");
}

Expand Down Expand Up @@ -164,14 +164,14 @@ public TypeInformation<?>[] getFieldTypes() {
}

@Override
public KafkaTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
public KafkaTableSinkBase configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
if (schema.isPresent()) {
// a fixed schema is defined so reconfiguration is not supported
throw new UnsupportedOperationException("Reconfiguration of this sink is not supported.");
}

// legacy code
KafkaTableSink copy = createCopy();
KafkaTableSinkBase copy = createCopy();
copy.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames");
copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes");
Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
Expand All @@ -191,7 +191,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
KafkaTableSink that = (KafkaTableSink) o;
KafkaTableSinkBase that = (KafkaTableSinkBase) o;
return Objects.equals(schema, that.schema) &&
Objects.equals(topic, that.topic) &&
Objects.equals(properties, that.properties) &&
Expand Down
Expand Up @@ -50,7 +50,7 @@
* drop support for format-specific table sinks.
*/
@Deprecated
public abstract class KafkaTableSinkTestBase {
public abstract class KafkaTableSinkBaseTestBase {

private static final String TOPIC = "testTopic";
private static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
Expand All @@ -64,7 +64,7 @@ public void testKafkaTableSink() {
DataStream dataStream = mock(DataStream.class);
when(dataStream.addSink(any(SinkFunction.class))).thenReturn(mock(DataStreamSink.class));

KafkaTableSink kafkaTableSink = spy(createTableSink());
KafkaTableSinkBase kafkaTableSink = spy(createTableSink());
kafkaTableSink.emitDataStream(dataStream);

// verify correct producer class
Expand All @@ -80,16 +80,16 @@ public void testKafkaTableSink() {

@Test
public void testConfiguration() {
KafkaTableSink kafkaTableSink = createTableSink();
KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
KafkaTableSinkBase kafkaTableSink = createTableSink();
KafkaTableSinkBase newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
assertNotSame(kafkaTableSink, newKafkaTableSink);

assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames());
assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes());
assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
}

protected abstract KafkaTableSink createTableSink(
protected abstract KafkaTableSinkBase createTableSink(
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner);
Expand All @@ -98,8 +98,8 @@ protected abstract KafkaTableSink createTableSink(

protected abstract Class<? extends FlinkKafkaProducerBase> getProducerClass();

private KafkaTableSink createTableSink() {
KafkaTableSink sink = createTableSink(TOPIC, PROPERTIES, PARTITIONER);
private KafkaTableSinkBase createTableSink() {
KafkaTableSinkBase sink = createTableSink(TOPIC, PROPERTIES, PARTITIONER);
return sink.configure(FIELD_NAMES, FIELD_TYPES);
}

Expand Down

0 comments on commit c4e74b5

Please sign in to comment.