Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CALCITE-2913] Adapter for Apache Kafka #1127

Merged
merged 1 commit into from May 24, 2019

Conversation

mingmxu
Copy link

@mingmxu mingmxu commented Mar 22, 2019

Add an adapter to expose Kafka topics as STREAM tables.

KafkaTableFactory is used here so end users need to specify table-topic mapping one-by-one.

JIRA: https://issues.apache.org/jira/browse/CALCITE-2913

CC: @danny0405

@mingmxu mingmxu force-pushed the kafkaadapter branch 3 times, most recently from 4e9bf04 to 9c0bf4e Compare March 22, 2019 23:05
@zinking
Copy link
Contributor

zinking commented Apr 3, 2019

I was hoping to see some sql test in this new adapter though, could that be mocked?

@mingmxu
Copy link
Author

mingmxu commented Apr 3, 2019

There're some test cases in KafkaAdapterTest with org.apache.kafka.clients.consumer.MockConsumer. Is that what you refers to?

@mingmxu mingmxu force-pushed the kafkaadapter branch 2 times, most recently from 93cd668 to 58c0ea8 Compare April 23, 2019 02:28
@asereda-gs asereda-gs self-requested a review May 10, 2019 23:11
kafka/pom.xml Outdated
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize this is a compile time dependency but I'm not sure we want to depend on AutoValue.

Generally we try to minimize dependencies.

If you really need it we should discuss on dev list what is appropriate codegen library for Calcite.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto-value is not necessary, removed;

consumer.subscribe(Collections.singletonList(tableOptions.topicName()));

return new KafkaMessageEnumerator(consumer, tableOptions.rowConverter());
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to catch coarse-grained Exception ? Perhaps just KafkaException

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No exception(besides RuntimeException) here actually, removed.

public abstract class KafkaBaseTable implements ScannableTable {
final KafkaTableOptions tableOptions;

public KafkaBaseTable(final KafkaTableOptions tableOptions) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls make all constructors package-private for now. Until API is stable people should use KafkaTableFactory only.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

private LinkedList<ConsumerRecord<K, V>> bufferedRecords = new LinkedList<>();
private ConsumerRecord<K, V> curRecord;

public KafkaMessageEnumerator(final Consumer consumer,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

package-private constructor

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

* @param <V>: type for Kafka message value,
* refer to {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG};
*/
public class KafkaMessageEnumerator<K, V> implements Enumerator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the type (T) for Enumerator<T> ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated with Object[], refers to an array of elements in a row.


The Kafka adapter exposes a Kafka topic as a STREAM table, so it can be query using
[Calcite Stream SQL]({{ site.baseurl }}/docs/stream.html). Note that the adapter will not attempt to scan all topics,
instead users need to configure tables one-by-one.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you mention that one topic is one table (one-to-one mapping).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added as

Note that the adapter will not attempt to scan all topics,
instead users need to configure tables manually, one Kafka stream table is mapping to one Kafka topic.

"name": "TABLE_NAME",
"type": "custom",
"factory": "org.apache.calcite.adapter.kafka.KafkaTableFactory",
"row.converter": "com.example.CustKafkaRowConverter",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to myself. check if attributes with dot (.) is a good idea

/**
* Parameter constants used to define a Kafka table.
*/
public class KafkaTableConstants {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make all constants package private.
use interface instead of class for defining constants

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

* @param topicName, Kafka topic name;
* @return row type
*/
RelDataType rowDataType(String topicName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From configuration I see that you define one converter per topic. If so, why do you need this method (rowDataType(String)) ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't put row schema in constructor as KafkaRowConverter is defined as interface. Usually we use an external metadata system to manage Kafka message schema, calling rowDataType(String topic) would be more clear IMO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like current interface is both one-to-one (between topic and row) and one-to-many (between topic and RelDataType). It seems more logical to have the following:

interface KafkaRowConverter {
   RelDataType rowDataType();
   Object[] toRow(ConsumerRecord<K, V> message);
}

Ie have separate KafkaRowConverter per topic.

Another option is to provide RelDataType as constructor argument to KafkaTable.

Pls let me know if you agree with such API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Traditionally we have been using single Function1<Consumer<K,V>, Object[]> for such transformations.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asereda-gs sry that I was lost in the thread, the proposed interface looks good to me. If it's not a block issue, I would like to close this PR and update it in a new task(CALCITE-3080), to handle user-specified columns together. In this way, there would be two provided implementations: 1) to handle user-specified columns, 2) to support external row metadata;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK if CALCITE-3080 addresses it separately.

(String) operand.get(KafkaTableConstants.SCHEMA_ROW_CONVERTER))
.newInstance();
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
throw new RuntimeException(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give more details in exception message (eg. Failed to create table $T in schema $S)? In current context you have table name and config.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated with detailed error message

}

curRecord = bufferedRecords.removeFirst();
return true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably it is also a good idea to check for DataContext.Variable.CANCEL_FLAG if user explicitly unsubscribes.

See CsvStreamScannableTable for examples

@asereda-gs
Copy link
Member

pls squash and rebase all commits.
I will make one more (or two) code review pass(es)

@mingmxu
Copy link
Author

mingmxu commented May 13, 2019

Surely, rebased from github/master and merged into one commit.

);
} catch (ClassNotFoundException e) {
throw new RuntimeException(
String.format(Locale.getDefault(), "Class '%s' is not found.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant here is more like

catch (CheckedExeption e) {
  final String details = String.format("Failed to create table %s (topic %s) in schema %s", name, topicName, schema,getName());
   throw new RuntimeException(details, e); // instead of just: throw new RuntimeException(e);
}

You don't need to catch each exception individually.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I got you, let me update it

@mingmxu
Copy link
Author

mingmxu commented May 14, 2019

reformatted, please take another look, will squash into 1 after review.

} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
| InstantiationException | InvocationTargetException e) {
final String details = String.format(
Locale.getDefault(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use Locale.ROOT

@asereda-gs
Copy link
Member

Pls mention (in docs) that ,currently, Kafka Adapter is in beta (preview phase) and we might change public API or even remove it. I would like to stay flexible first couple of versions.

@asereda-gs
Copy link
Member

It would be nice to have examples with some JSON messages (using native calcite json functions).

@mingmxu
Copy link
Author

mingmxu commented May 14, 2019

I've updated a note in document.

For JSON examples, I would defer to my next task, mostly to leverage RelDataType rowType. Examples to show JSON functions(over a STRING Kafka message field) is not necessary here IMO.

*/

/**
* Query provider that reads from files and web pages in various formats.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look like Kafka package description


for (int idx = 0; idx < 10; ++idx) {
addRecord(new ConsumerRecord<byte[], byte[]>("testtopic",
0, idx, ("mykey" + idx).getBytes(Charset.forName("UTF-8")),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mingmxu
Copy link
Author

mingmxu commented May 14, 2019

Back to your comment here, do you think a vote is required to add a new adapter? Ideally I don't want to add it today and remove it days later.

Kafka Adapter is in beta (preview phase) and we might change public API or even remove it.

@julianhyde
Copy link
Contributor

I don't think a vote is required for a new adapter - it is just a code change, albeit a significant one. That said, I think we should get consensus on the dev list that it's a good idea. It will add to the work load of every future release manager.

I've added some high-level review comments to https://issues.apache.org/jira/browse/CALCITE-2913. I will encourage other committers to make high-level comments in JIRA also. @asereda-gs is doing an excellent job of reviewing line-by-line (thank you Andrei!)

sqlline Outdated
@@ -37,7 +37,7 @@ if [ ! -f target/fullclasspath.txt ]; then
fi

CP=
for module in core cassandra druid elasticsearch2 elasticsearch5 file mongodb server spark splunk geode example/csv example/function; do
for module in core cassandra druid elasticsearch2 elasticsearch5 file mongodb server spark splunk geode example/csv example/function kafka; do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Julian's comment:

you've put kafka at the end, but it should be in alphabetical order

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting it at the end is, of course, the modest thing to do.

I learned a while ago that being modest (especially when editing files with lots of unit test methods) creates what DBMS folks call a hot-spot. Everyone adds at the end, so we get merge conflicts. Make your change in the most logical place (which is alphabetical if there is no other organizing principle).

kafka/pom.xml Outdated
<artifactId>calcite-kafka</artifactId>
<packaging>jar</packaging>
<name>calcite kafka</name>
<description>Calcite provider that reads from kafka topics</description>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls change to

Kafka Adapter. Exposes kafka topic(s) as stream table(s).

if (tableOptions.getConsumerParams() != null) {
consumerConfig.putAll(tableOptions.getConsumerParams());
}
Consumer consumer = new KafkaConsumer<>(consumerConfig);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we subscribe as late as possible (inside Enumerator on first next) ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prepare would be the best place, next looks odd to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably it is fine as is for now. It can be improved later-on.

@mingmxu
Copy link
Author

mingmxu commented May 14, 2019

there're so many small commits now, @asereda-gs can you take a quick look and I would do a squash then?

@asereda-gs
Copy link
Member

asereda-gs commented May 14, 2019

@xumingmin you can force push a squashed commit (no need to do it separately). Just rebase locally

@mingmxu
Copy link
Author

mingmxu commented May 14, 2019

is there a template on how to write commit message? I don't find it in http://calcite.apache.org/develop/#contributing.

@asereda-gs
Copy link
Member

asereda-gs commented May 14, 2019

Commit your change to your branch, and use a comment that starts with the JIRA case number, like this:
...
If you are not a committer, add your name in parentheses at the end of the message.

Also check history of existing commits

@mingmxu
Copy link
Author

mingmxu commented May 14, 2019

got you, make a minor change as

[CALCITE-2913] Add Kafka Adapter (Mingmin Xu)
Expose an Apache Kafka topic as a stream table.

kafka consumer is not well-known.

@asereda-gs asereda-gs changed the title [CALCITE-2913] add a KafkaAdapter for Stream [CALCITE-2913] Add a Kafka Adapter May 14, 2019
@asereda-gs asereda-gs changed the title [CALCITE-2913] Add a Kafka Adapter [CALCITE-2913] Add Kafka Adapter May 14, 2019
@asereda-gs asereda-gs added the LGTM-will-merge-soon Overall PR looks OK. Only minor things left. label May 14, 2019
@asereda-gs
Copy link
Member

@xumingmin can you please address Julian's comments in JIRA about proper attribution (especially in documentation) :

Make sure to respect Kafka's branding. Each page in the documentation where Kafka is mentioned, the first mention should call it "Apache Kafka". I'd change the name of this case/commit to "Adapter for Apache Kafka".

@asereda-gs asereda-gs changed the title [CALCITE-2913] Add Kafka Adapter [CALCITE-2913] Add Apache Kafka Adapter May 14, 2019
return this;
}

public Map getConsumerParams() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are key/value types ? Map<String, String> ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, should

@xumingmin can you please address Julian's comments in JIRA about proper attribution (especially in documentation) :

Make sure to respect Kafka's branding. Each page in the documentation where Kafka is mentioned, the first mention should call it "Apache Kafka". I'd change the name of this case/commit to "Adapter for Apache Kafka".

I forget some places, let me go though it again

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are key/value types ? Map<String, String> ?

Should be, let me specify it explicitly.

@mingmxu mingmxu changed the title [CALCITE-2913] Add Apache Kafka Adapter [CALCITE-2913] Adapter for Apache Kafka May 15, 2019
how to decode Kafka message to Calcite row. [KafkaRowConverterImpl]({{ site.apiRoot }}/org/apache/calcite/adapter/kafka/KafkaRowConverterImpl.html)
is used if not provided;

2. More consumer settings can be added in parameter `consumer.paras`;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be consumer.params

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, let me update it.

bufferedRecords.add(record);
}

consumer.commitSync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use commitSync() api, you should set enable.auto.commit to false before create KafkaConsumer instance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove this line directly. If users need auto-commit, they can set in consumer.params

@mingmxu mingmxu force-pushed the kafkaadapter branch 2 times, most recently from bee7541 to 12d6d8c Compare May 16, 2019 02:02
@wangzzu
Copy link
Contributor

wangzzu commented May 16, 2019

@xumingmin Whether it is necessary to support user consume from the special timestamp?

@mingmxu
Copy link
Author

mingmxu commented May 16, 2019

@wangzzu I would like this feature in a following task, to support both from/to timestamp/offset.

Create https://issues.apache.org/jira/browse/CALCITE-3073 for tracking.

@wangzzu
Copy link
Contributor

wangzzu commented May 17, 2019

@xumingmin 👍

* @return fields in the row
*/
@Override public Object[] toRow(final ConsumerRecord<String, String> message) {
Object[] fields = new Object[4];

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you take a array of size 4 not 3?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, thanks for pointing out.

*/
@Override public RelDataType rowDataType(final String topicName) {
final RelDataTypeFactory typeFactory =
new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topicName is useless, is there any reason that pass to this method as parameter

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may update the way to define row schema, refer to thread https://github.com/apache/calcite/pull/1127/files#r283079680. Please let me know if it's a block issue.

@mingmxu
Copy link
Author

mingmxu commented May 23, 2019

@asereda-gs any idea when can we close this PR? I notice that Calcite 1.20.0 release is coming.

Btw, I doubt there's enough time to merge CALCITE-3080, may be better idea to make below change now, any thoughts?

interface KafkaRowConverter {
RelDataType rowDataType();
Object[] toRow(ConsumerRecord<K, V> message);
}

@asereda-gs
Copy link
Member

@xumingmin it is fine if we redefine KafkaRowConverter interface after 1.20 (Ie address CALCITE-3080).

It is the reason why I prefer to mention in 1.20 release notes that Kafka Adapter is currently in "preview mode" (some API might change).

Pls rebase this PR (there are some conflicts)

@julianhyde
Copy link
Contributor

I see high-level discussions about scheduling and dependency on CALCITE-3080 going on here. As I said previously, please have these discussions in JIRA not git.

Expose an Apache Kafka topic as a stream table.
@mingmxu
Copy link
Author

mingmxu commented May 24, 2019

@asereda-gs rebased from apache/master.
@julianhyde , thanks for reminding, let me copy the thread to JIRA.

@asereda-gs asereda-gs merged commit ca6dc99 into apache:master May 24, 2019
@mingmxu mingmxu deleted the kafkaadapter branch September 28, 2020 08:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
LGTM-will-merge-soon Overall PR looks OK. Only minor things left.
Projects
None yet
7 participants