Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8558] [table] Add unified format interfaces and separate formats from connectors #6264

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/dev/table/sqlClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ The SQL Client does not require to setup a Java project using Maven or SBT. Inst
| Name | Version | Download |
| :---------------- | :------------ | :--------------------- |
| Filesystem | | Built-in |
| Apache Kafka | 0.9 | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
| Apache Kafka | 0.10 | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |
| Apache Kafka | 0.11 | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) |

#### Formats
Expand Down
53 changes: 53 additions & 0 deletions flink-connectors/flink-connector-kafka-0.10/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,59 @@ under the License.

</dependencies>

<profiles>
<profile>
<!-- Create SQL Client uber jars for releases -->
<id>release</id>
<activation>
<property>
<name>release</name>
</property>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>sql-jar</shadedClassifierName>
<artifactSet>
<includes combine.children="append">
<include>org.apache.kafka:*</include>
<include>org.apache.flink:flink-connector-kafka-base_${scala.binary.version}</include>
<include>org.apache.flink:flink-connector-kafka-0.9_${scala.binary.version}</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>kafka/kafka-version.properties</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>org.apache.flink.kafka010.shaded.org.apache.kafka</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

<build>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
Expand All @@ -35,8 +35,13 @@

/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
@PublicEvolving
@Deprecated
public class Kafka010AvroTableSource extends KafkaAvroTableSource {

/**
Expand All @@ -46,7 +51,9 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource {
* @param properties Properties for the Kafka consumer.
* @param schema Schema of the produced table.
* @param record Avro specific record.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm why do you deprecate individual methods after already deprecating whole class? Does it solve some problem?

If not I would revert those additional @Deprecated notes.

Copy link
Contributor Author

@twalthr twalthr Jul 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I assumed that it works that way as you referred and it seemed to me that deprecating the class should suffice. But you are right, it doesn't harm (except of larger commit to review)

public Kafka010AvroTableSource(
String topic,
Properties properties,
Expand All @@ -69,7 +76,9 @@ public Kafka010AvroTableSource(
* the value to the field of the Avro record.</p>
*
* @param fieldMapping A mapping from schema fields to Avro fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setFieldMapping(Map<String, String> fieldMapping) {
super.setFieldMapping(fieldMapping);
Expand All @@ -79,7 +88,9 @@ public void setFieldMapping(Map<String, String> fieldMapping) {
* Declares a field of the schema to be a processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setProctimeAttribute(String proctimeAttribute) {
super.setProctimeAttribute(proctimeAttribute);
Expand All @@ -89,7 +100,9 @@ public void setProctimeAttribute(String proctimeAttribute) {
* Declares a field of the schema to be a rowtime attribute.
*
* @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
Expand All @@ -102,15 +115,27 @@ protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properti

/**
* Returns a builder to configure and create a {@link Kafka010AvroTableSource}.
*
* @return A builder to configure and create a {@link Kafka010AvroTableSource}.
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
@Deprecated
public static Builder builder() {
return new Builder();
}

/**
* A builder to configure and create a {@link Kafka010AvroTableSource}.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
@Deprecated
public static class Builder extends KafkaAvroTableSource.Builder<Kafka010AvroTableSource, Kafka010AvroTableSource.Builder> {

@Override
Expand All @@ -127,7 +152,9 @@ protected Kafka010AvroTableSource.Builder builder() {
* Builds and configures a {@link Kafka010AvroTableSource}.
*
* @return A configured {@link Kafka010AvroTableSource}.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public Kafka010AvroTableSource build() {
Kafka010AvroTableSource tableSource = new Kafka010AvroTableSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
Expand All @@ -32,8 +32,13 @@

/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
@PublicEvolving
@Deprecated
public class Kafka010JsonTableSource extends KafkaJsonTableSource {

/**
Expand All @@ -43,7 +48,9 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
* @param properties Properties for the Kafka consumer.
* @param tableSchema The schema of the table.
* @param jsonSchema The schema of the JSON messages to decode from Kafka.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public Kafka010JsonTableSource(
String topic,
Properties properties,
Expand All @@ -58,7 +65,9 @@ public Kafka010JsonTableSource(
* TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
*
* @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setFailOnMissingField(boolean failOnMissingField) {
super.setFailOnMissingField(failOnMissingField);
Expand All @@ -68,7 +77,9 @@ public void setFailOnMissingField(boolean failOnMissingField) {
* Sets the mapping from table schema fields to JSON schema fields.
*
* @param fieldMapping The mapping from table schema fields to JSON schema fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setFieldMapping(Map<String, String> fieldMapping) {
super.setFieldMapping(fieldMapping);
Expand All @@ -78,7 +89,9 @@ public void setFieldMapping(Map<String, String> fieldMapping) {
* Declares a field of the schema to be a processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setProctimeAttribute(String proctimeAttribute) {
super.setProctimeAttribute(proctimeAttribute);
Expand All @@ -88,7 +101,9 @@ public void setProctimeAttribute(String proctimeAttribute) {
* Declares a field of the schema to be a rowtime attribute.
*
* @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
Expand All @@ -101,15 +116,27 @@ protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properti

/**
* Returns a builder to configure and create a {@link Kafka010JsonTableSource}.
*
* @return A builder to configure and create a {@link Kafka010JsonTableSource}.
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
@Deprecated
public static Kafka010JsonTableSource.Builder builder() {
return new Kafka010JsonTableSource.Builder();
}

/**
* A builder to configure and create a {@link Kafka010JsonTableSource}.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#from(ConnectorDescriptor)}.
*/
@Deprecated
public static class Builder extends KafkaJsonTableSource.Builder<Kafka010JsonTableSource, Kafka010JsonTableSource.Builder> {

@Override
Expand All @@ -126,6 +153,7 @@ protected Kafka010JsonTableSource.Builder builder() {
* Builds and configures a {@link Kafka010JsonTableSource}.
*
* @return A configured {@link Kafka010JsonTableSource}.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Override
public Kafka010JsonTableSource build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,79 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;

import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*/
@PublicEvolving
public abstract class Kafka010TableSource extends KafkaTableSource {

// The deserialization schema for the Kafka records
private final DeserializationSchema<Row> deserializationSchema;
@Internal
public class Kafka010TableSource extends KafkaTableSource {

/**
* Creates a Kafka 0.10 {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka records.
* @param typeInfo Type information describing the result type. The field names are used
* to parse the JSON file and so are the types.
* @param schema Schema of the produced table.
* @param proctimeAttribute Field name of the processing time attribute, null if no
* processing time field is defined.
* @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
* @param fieldMapping Mapping for the fields of the table schema to
* fields of the physical returned type or null.
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema for decoding records from Kafka.
* @param startupMode Startup mode for the contained consumer.
* @param specificStartupOffsets Specific startup offsets; only relevant when startup
* mode is {@link StartupMode#SPECIFIC_OFFSETS}.
*/
public Kafka010TableSource(
TableSchema schema,
String proctimeAttribute,
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
Map<String, String> fieldMapping,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
TableSchema schema,
TypeInformation<Row> typeInfo) {
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets) {

super(topic, properties, schema, typeInfo);

this.deserializationSchema = deserializationSchema;
super(
schema,
proctimeAttribute,
rowtimeAttributeDescriptors,
fieldMapping,
topic,
properties,
deserializationSchema,
startupMode,
specificStartupOffsets);
}

@Override
public DeserializationSchema<Row> getDeserializationSchema() {
return this.deserializationSchema;
/**
* Creates a Kafka 0.10 {@link StreamTableSource}.
*
* @param schema Schema of the produced table.
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema for decoding records from Kafka.
*/
public Kafka010TableSource(
TableSchema schema,
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema) {

super(schema, topic, properties, deserializationSchema);
}

@Override
Expand Down
Loading