diff --git a/kafka/pom.xml b/kafka/pom.xml new file mode 100644 index 000000000000..98658b7a7747 --- /dev/null +++ b/kafka/pom.xml @@ -0,0 +1,99 @@ + + + + 4.0.0 + + org.apache.calcite + calcite + 1.20.0-SNAPSHOT + + + + calcite-kafka + jar + calcite kafka + Kafka Adapter. Exposes kafka topic(s) as stream table(s). + + + ${project.basedir}/.. + ${maven.build.timestamp} + + + + + org.apache.calcite + calcite-core + + + org.apache.calcite + calcite-core + test-jar + test + + + commons-compiler + org.codehaus.janino + + + + + org.apache.calcite + calcite-linq4j + + + com.google.guava + guava + + + org.apache.kafka + kafka-clients + + + junit + junit + test + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + ${maven-dependency-plugin.version} + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/dependencies/ + false + false + true + + + + + + + + diff --git a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaMessageEnumerator.java b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaMessageEnumerator.java new file mode 100644 index 000000000000..fb9fb0333fc4 --- /dev/null +++ b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaMessageEnumerator.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.kafka; + +import org.apache.calcite.linq4j.Enumerator; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +import java.time.Duration; +import java.util.LinkedList; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Enumerator to read data from {@link Consumer}, + * and converted into SQL rows with {@link KafkaRowConverter}. + * @param : type for Kafka message key, + * refer to {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG}; + * @param : type for Kafka message value, + * refer to {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}; + */ +public class KafkaMessageEnumerator implements Enumerator { + final Consumer consumer; + final KafkaRowConverter rowConverter; + private final AtomicBoolean cancelFlag; + + //runtime + private final LinkedList> bufferedRecords = new LinkedList<>(); + private ConsumerRecord curRecord; + + KafkaMessageEnumerator(final Consumer consumer, + final KafkaRowConverter rowConverter, + final AtomicBoolean cancelFlag) { + this.consumer = consumer; + this.rowConverter = rowConverter; + this.cancelFlag = cancelFlag; + } + + /** + * It returns an Array of Object, with each element represents a field of row. + */ + @Override public Object[] current() { + return rowConverter.toRow(curRecord); + } + + @Override public boolean moveNext() { + if (cancelFlag.get()) { + return false; + } + + while (bufferedRecords.isEmpty()) { + pullRecords(); + } + + curRecord = bufferedRecords.removeFirst(); + return true; + } + + private void pullRecords() { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + bufferedRecords.add(record); + } + } + + @Override public void reset() { + this.bufferedRecords.clear(); + pullRecords(); + } + + @Override public void close() { + consumer.close(); + } +} +// End KafkaMessageEnumerator.java diff --git a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaRowConverter.java b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaRowConverter.java new file mode 100644 index 000000000000..0f35cdff5527 --- /dev/null +++ b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaRowConverter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.kafka; + +import org.apache.calcite.rel.type.RelDataType; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +/** + * Interface to handle formatting between Kafka message and Calcite row. + * + * @param : type for Kafka message key, + * refer to {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG}; + * @param : type for Kafka message value, + * refer to {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}; + * + */ +public interface KafkaRowConverter { + + /** + * Generate row type for a given Kafka topic. + * + * @param topicName, Kafka topic name; + * @return row type + */ + RelDataType rowDataType(String topicName); + + /** + * Parse and reformat Kafka message from consumer, + * to align with row type defined as {@link #rowDataType(String)}. + * @param message, the raw Kafka message record; + * @return fields in the row + */ + Object[] toRow(ConsumerRecord message); +} +// End KafkaRowConverter.java diff --git a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaRowConverterImpl.java b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaRowConverterImpl.java new file mode 100644 index 000000000000..a659e73da586 --- /dev/null +++ b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaRowConverterImpl.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.kafka; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.sql.type.SqlTypeName; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +/** + * Default implementation of {@link KafkaRowConverter}, both key and value are byte[]. + */ +public class KafkaRowConverterImpl implements KafkaRowConverter { + /** + * Generate row schema for a given Kafka topic. + * + * @param topicName, Kafka topic name; + * @return row type + */ + @Override public RelDataType rowDataType(final String topicName) { + final RelDataTypeFactory typeFactory = + new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + final RelDataTypeFactory.Builder fieldInfo = typeFactory.builder(); + fieldInfo.add("MSG_PARTITION", typeFactory.createSqlType(SqlTypeName.INTEGER)).nullable(false); + fieldInfo.add("MSG_TIMESTAMP", typeFactory.createSqlType(SqlTypeName.BIGINT)).nullable(false); + fieldInfo.add("MSG_OFFSET", typeFactory.createSqlType(SqlTypeName.BIGINT)).nullable(false); + fieldInfo.add("MSG_KEY_BYTES", typeFactory.createSqlType(SqlTypeName.VARBINARY)).nullable(true); + fieldInfo.add("MSG_VALUE_BYTES", typeFactory.createSqlType(SqlTypeName.VARBINARY)) + .nullable(false); + + return fieldInfo.build(); + } + + /** + * Parse and reformat Kafka message from consumer, to align with row schema + * defined as {@link #rowDataType(String)}. + * @param message, the raw Kafka message record; + * @return fields in the row + */ + @Override public Object[] toRow(final ConsumerRecord message) { + Object[] fields = new Object[5]; + fields[0] = message.partition(); + fields[1] = message.timestamp(); + fields[2] = message.offset(); + fields[3] = message.key(); + fields[4] = message.value(); + + return fields; + } +} +// End KafkaRowConverterImpl.java diff --git a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaStreamTable.java b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaStreamTable.java new file mode 100644 index 000000000000..ddacfc1ea00a --- /dev/null +++ b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaStreamTable.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.kafka; + +import org.apache.calcite.DataContext; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; +import org.apache.calcite.schema.StreamableTable; +import org.apache.calcite.schema.Table; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlNode; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import com.google.common.collect.ImmutableList; + +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A table which maps to an Apache Kafka topic, currently only {@link KafkaStreamTable} is + * implemented as a STREAM table. + */ +public class KafkaStreamTable implements ScannableTable, StreamableTable { + final KafkaTableOptions tableOptions; + + KafkaStreamTable(final KafkaTableOptions tableOptions) { + this.tableOptions = tableOptions; + } + + @Override public Enumerable scan(final DataContext root) { + final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root); + return new AbstractEnumerable() { + public Enumerator enumerator() { + if (tableOptions.getConsumer() != null) { + return new KafkaMessageEnumerator(tableOptions.getConsumer(), + tableOptions.getRowConverter(), cancelFlag); + } + + Properties consumerConfig = new Properties(); + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + tableOptions.getBootstrapServers()); + //by default it's + consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + + if (tableOptions.getConsumerParams() != null) { + consumerConfig.putAll(tableOptions.getConsumerParams()); + } + Consumer consumer = new KafkaConsumer<>(consumerConfig); + consumer.subscribe(Collections.singletonList(tableOptions.getTopicName())); + + return new KafkaMessageEnumerator(consumer, tableOptions.getRowConverter(), cancelFlag); + } + }; + } + + @Override public RelDataType getRowType(final RelDataTypeFactory typeFactory) { + return tableOptions.getRowConverter().rowDataType(tableOptions.getTopicName()); + } + + /** Returns a provider of statistics about this table. */ + @Override public Statistic getStatistic() { + return Statistics.of(100d, ImmutableList.of(), + RelCollations.createSingleton(0)); + } + + @Override public boolean isRolledUp(final String column) { + return false; + } + + @Override public boolean rolledUpColumnValidInsideAgg(final String column, final SqlCall call, + final SqlNode parent, + final CalciteConnectionConfig config) { + return false; + } + + @Override public Table stream() { + return this; + } + + /** Type of table. */ + @Override public Schema.TableType getJdbcTableType() { + return Schema.TableType.STREAM; + } +} +// End KafkaStreamTable.java diff --git a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableConstants.java b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableConstants.java new file mode 100644 index 000000000000..aa0582fc6233 --- /dev/null +++ b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableConstants.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.kafka; + +/** + * Parameter constants used to define a Kafka table. + */ +interface KafkaTableConstants { + String SCHEMA_TOPIC_NAME = "topic.name"; + String SCHEMA_BOOTSTRAP_SERVERS = "bootstrap.servers"; + String SCHEMA_ROW_CONVERTER = "row.converter"; + String SCHEMA_CUST_CONSUMER = "consumer.cust"; + String SCHEMA_CONSUMER_PARAMS = "consumer.params"; +} +// End KafkaTableConstants.java diff --git a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableFactory.java b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableFactory.java new file mode 100644 index 000000000000..1f1252da40f5 --- /dev/null +++ b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableFactory.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.kafka; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.TableFactory; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; + +import java.lang.reflect.InvocationTargetException; +import java.util.Locale; +import java.util.Map; + +/** + * Implementation of {@link TableFactory} for Apache Kafka. Currently an Apache Kafka + * topic is mapping to a STREAM table. + */ +public class KafkaTableFactory implements TableFactory { + public KafkaTableFactory() { + } + + /** Creates a Table. + * @param schema Schema this table belongs to + * @param name Name of this table + * @param operand The "operand" JSON property + * @param rowType Row type. Specified if the "columns" JSON property. + */ + @Override public KafkaStreamTable create(SchemaPlus schema, + String name, + Map operand, + RelDataType rowType) { + KafkaTableOptions tableOptionBuilder = new KafkaTableOptions(); + + tableOptionBuilder.setBootstrapServers( + (String) operand.getOrDefault(KafkaTableConstants.SCHEMA_BOOTSTRAP_SERVERS, null)); + tableOptionBuilder.setTopicName( + (String) operand.getOrDefault(KafkaTableConstants.SCHEMA_TOPIC_NAME, null)); + + KafkaRowConverter rowConverter = null; + if (operand.containsKey(KafkaTableConstants.SCHEMA_ROW_CONVERTER)) { + String rowConverterClass = (String) operand.get(KafkaTableConstants.SCHEMA_ROW_CONVERTER); + try { + rowConverter = (KafkaRowConverter) Class.forName(rowConverterClass).newInstance(); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + final String details = String.format( + Locale.ROOT, + "Fail to create table '%s' with configuration: \n" + + "'%s'\n" + + "KafkaRowConverter '%s' is invalid", + name, operand, rowConverterClass); + throw new RuntimeException(details, e); + } + } else { + rowConverter = new KafkaRowConverterImpl(); + } + tableOptionBuilder.setRowConverter(rowConverter); + + if (operand.containsKey(KafkaTableConstants.SCHEMA_CONSUMER_PARAMS)) { + tableOptionBuilder.setConsumerParams((Map) operand.get( + KafkaTableConstants.SCHEMA_CONSUMER_PARAMS)); + } + if (operand.containsKey(KafkaTableConstants.SCHEMA_CUST_CONSUMER)) { + String custConsumerClass = (String) operand.get(KafkaTableConstants.SCHEMA_CUST_CONSUMER); + try { + tableOptionBuilder.setConsumer( + (Consumer) Class.forName(custConsumerClass) + .getConstructor(OffsetResetStrategy.class) + .newInstance(OffsetResetStrategy.NONE) + ); + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException + | InstantiationException | InvocationTargetException e) { + final String details = String.format( + Locale.ROOT, + "Fail to create table '%s' with configuration: \n" + + "'%s'\n" + + "KafkaCustConsumer '%s' is invalid", + name, operand, custConsumerClass); + throw new RuntimeException(details, e); + } + } + + return new KafkaStreamTable(tableOptionBuilder); + } +} +// End KafkaTableFactory.java diff --git a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableOptions.java b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableOptions.java new file mode 100644 index 000000000000..230689e97965 --- /dev/null +++ b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableOptions.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.kafka; + +import org.apache.kafka.clients.consumer.Consumer; + +import java.util.Map; + +/** + * Available options for {@link KafkaStreamTable}. + */ +public final class KafkaTableOptions { + private String bootstrapServers; + private String topicName; + private KafkaRowConverter rowConverter; + private Map consumerParams; + //added to inject MockConsumer for testing. + private Consumer consumer; + + public String getBootstrapServers() { + return bootstrapServers; + } + + public KafkaTableOptions setBootstrapServers(final String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + return this; + } + + public String getTopicName() { + return topicName; + } + + public KafkaTableOptions setTopicName(final String topicName) { + this.topicName = topicName; + return this; + } + + public KafkaRowConverter getRowConverter() { + return rowConverter; + } + + public KafkaTableOptions setRowConverter( + final KafkaRowConverter rowConverter) { + this.rowConverter = rowConverter; + return this; + } + + public Map getConsumerParams() { + return consumerParams; + } + + public KafkaTableOptions setConsumerParams(final Map consumerParams) { + this.consumerParams = consumerParams; + return this; + } + + public Consumer getConsumer() { + return consumer; + } + + public KafkaTableOptions setConsumer(final Consumer consumer) { + this.consumer = consumer; + return this; + } +} +// End KafkaTableOptions.java diff --git a/kafka/src/main/java/org/apache/calcite/adapter/kafka/package-info.java b/kafka/src/main/java/org/apache/calcite/adapter/kafka/package-info.java new file mode 100644 index 000000000000..a303c7062d45 --- /dev/null +++ b/kafka/src/main/java/org/apache/calcite/adapter/kafka/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Kafka query provider. + * + *

One Kafka topic is mapping to one STREAM table.

+ */ +@PackageMarker +package org.apache.calcite.adapter.kafka; + +import org.apache.calcite.avatica.util.PackageMarker; + +// End package-info.java diff --git a/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java new file mode 100644 index 000000000000..79709cc08ea2 --- /dev/null +++ b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.kafka; + +import org.apache.calcite.test.CalciteAssert; + +import com.google.common.io.Resources; + +import org.junit.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Objects; + +/** + * Unit test cases for Kafka adapter. + */ +public class KafkaAdapterTest { + protected static final URL MODEL = KafkaAdapterTest.class.getResource("/kafka.model.json"); + + private CalciteAssert.AssertThat assertModel(String model) { + // ensure that Schema from this instance is being used + model = model.replace(KafkaAdapterTest.class.getName(), KafkaAdapterTest.class.getName()); + + return CalciteAssert.that() + .withModel(model); + } + + private CalciteAssert.AssertThat assertModel(URL url) { + Objects.requireNonNull(url, "url"); + try { + return assertModel(Resources.toString(url, StandardCharsets.UTF_8)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Test public void testSelect() { + assertModel(MODEL) + .query("SELECT STREAM * FROM KAFKA.MOCKTABLE") + .limit(2) + + .typeIs("[MSG_PARTITION INTEGER NOT NULL" + + ", MSG_TIMESTAMP BIGINT NOT NULL" + + ", MSG_OFFSET BIGINT NOT NULL" + + ", MSG_KEY_BYTES VARBINARY" + + ", MSG_VALUE_BYTES VARBINARY NOT NULL]") + + .returnsUnordered( + "MSG_PARTITION=0; MSG_TIMESTAMP=-1; MSG_OFFSET=0; MSG_KEY_BYTES=mykey0; MSG_VALUE_BYTES=myvalue0", + "MSG_PARTITION=0; MSG_TIMESTAMP=-1; MSG_OFFSET=1" + + "; MSG_KEY_BYTES=mykey1; MSG_VALUE_BYTES=myvalue1") + + .explainContains("PLAN=EnumerableInterpreter\n" + + " BindableTableScan(table=[[KAFKA, MOCKTABLE, (STREAM)]])\n"); + } + + @Test public void testFilterWithProject() { + assertModel(MODEL) + .query("SELECT STREAM MSG_PARTITION,MSG_OFFSET,MSG_VALUE_BYTES FROM KAFKA.MOCKTABLE" + + " WHERE MSG_OFFSET>0") + .limit(1) + + .returnsUnordered( + "MSG_PARTITION=0; MSG_OFFSET=1; MSG_VALUE_BYTES=myvalue1") + .explainContains( + "PLAN=EnumerableCalc(expr#0..4=[{inputs}], expr#5=[0], expr#6=[>($t2, $t5)], MSG_PARTITION=[$t0], MSG_OFFSET=[$t2], MSG_VALUE_BYTES=[$t4], $condition=[$t6])\n" + + " EnumerableInterpreter\n" + + " BindableTableScan(table=[[KAFKA, MOCKTABLE, (STREAM)]])"); + } + + @Test public void testCustRowConverter() { + assertModel(MODEL) + .query("SELECT STREAM * FROM KAFKA.MOCKTABLE_CUST_ROW_CONVERTER") + .limit(2) + + .typeIs("[TOPIC_NAME VARCHAR NOT NULL" + + ", PARTITION_ID INTEGER NOT NULL" + + ", TIMESTAMP_TYPE VARCHAR]") + + .returnsUnordered( + "TOPIC_NAME=testtopic; PARTITION_ID=0; TIMESTAMP_TYPE=NoTimestampType", + "TOPIC_NAME=testtopic; PARTITION_ID=0; TIMESTAMP_TYPE=NoTimestampType") + + .explainContains("PLAN=EnumerableInterpreter\n" + + " BindableTableScan(table=[[KAFKA, MOCKTABLE_CUST_ROW_CONVERTER, (STREAM)]])\n"); + } + + + @Test public void testAsBatch() { + assertModel(MODEL) + .query("SELECT * FROM KAFKA.MOCKTABLE") + .failsAtValidation("Cannot convert stream 'MOCKTABLE' to relation"); + } +} +// End KafkaAdapterTest.java diff --git a/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaMockConsumer.java b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaMockConsumer.java new file mode 100644 index 000000000000..cccbcfd0dcd9 --- /dev/null +++ b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaMockConsumer.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; + +/** + * A mock consumer to test Kafka adapter. + */ +public class KafkaMockConsumer extends MockConsumer { + public KafkaMockConsumer(final OffsetResetStrategy offsetResetStrategy) { + super(OffsetResetStrategy.EARLIEST); + + assign(Arrays.asList(new TopicPartition("testtopic", 0))); + + HashMap beginningOffsets = new HashMap<>(); + beginningOffsets.put(new TopicPartition("testtopic", 0), 0L); + updateBeginningOffsets(beginningOffsets); + + for (int idx = 0; idx < 10; ++idx) { + addRecord(new ConsumerRecord("testtopic", + 0, idx, ("mykey" + idx).getBytes(StandardCharsets.UTF_8), + ("myvalue" + idx).getBytes(StandardCharsets.UTF_8))); + } + } +} +// End KafkaMockConsumer.java diff --git a/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaRowConverterTest.java b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaRowConverterTest.java new file mode 100644 index 000000000000..8bbebb487bd5 --- /dev/null +++ b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaRowConverterTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.kafka; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.sql.type.SqlTypeName; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +/** + * Implementation of {@link KafkaRowConverter} for test, both key and value are saved as byte[]. + */ +public class KafkaRowConverterTest implements KafkaRowConverter { + /** + * Generate row schema for a given Kafka topic. + * + * @param topicName, Kafka topic name; + * @return row type + */ + @Override public RelDataType rowDataType(final String topicName) { + final RelDataTypeFactory typeFactory = + new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + final RelDataTypeFactory.Builder fieldInfo = typeFactory.builder(); + fieldInfo.add("TOPIC_NAME", typeFactory.createSqlType(SqlTypeName.VARCHAR)).nullable(false); + fieldInfo.add("PARTITION_ID", typeFactory.createSqlType(SqlTypeName.INTEGER)).nullable(false); + fieldInfo.add("TIMESTAMP_TYPE", typeFactory.createSqlType(SqlTypeName.VARCHAR)).nullable(true); + + return fieldInfo.build(); + } + + /** + * Parse and reformat Kafka message from consumer, to fit with row schema + * defined as {@link #rowDataType(String)}. + * @param message, the raw Kafka message record; + * @return fields in the row + */ + @Override public Object[] toRow(final ConsumerRecord message) { + Object[] fields = new Object[4]; + fields[0] = message.topic(); + fields[1] = message.partition(); + fields[2] = message.timestampType().name; + + return fields; + } +} +// End KafkaRowConverterTest.java diff --git a/kafka/src/test/resources/kafka.model.json b/kafka/src/test/resources/kafka.model.json new file mode 100644 index 000000000000..a37d499bcc1b --- /dev/null +++ b/kafka/src/test/resources/kafka.model.json @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "version": "1.0", + "defaultSchema": "KAFKA", + "schemas": [ + { + "name": "KAFKA", + "tables": [ + { + "name": "MOCKTABLE", + "type": "custom", + "factory": "org.apache.calcite.adapter.kafka.KafkaTableFactory", + "operand": { + "consumer.cust": "org.apache.calcite.adapter.kafka.KafkaMockConsumer" + } + } + ,{ + "name": "MOCKTABLE_CUST_ROW_CONVERTER", + "type": "custom", + "factory": "org.apache.calcite.adapter.kafka.KafkaTableFactory", + "operand": { + "consumer.cust": "org.apache.calcite.adapter.kafka.KafkaMockConsumer", + "row.converter": "org.apache.calcite.adapter.kafka.KafkaRowConverterTest", + "consumer.params": { + "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer", + "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer" + } + } + } + ] + } + ] +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index b5f6161cd650..5e9890099996 100644 --- a/pom.xml +++ b/pom.xml @@ -143,6 +143,8 @@ limitations under the License. 0.9.16 2.4.0 + 2.0.0 + -html5 @@ -188,6 +190,7 @@ limitations under the License. spark splunk ubenchmark + kafka + +**Note**: + +KafkaAdapter is an experimental feature, changes in public API and usage are expected. + +For instructions on downloading and building Calcite, start with the[tutorial]({{ site.baseurl }}/docs/tutorial.html). + +The Kafka adapter exposes an Apache Kafka topic as a STREAM table, so it can be queried using +[Calcite Stream SQL]({{ site.baseurl }}/docs/stream.html). Note that the adapter will not attempt to scan all topics, +instead users need to configure tables manually, one Kafka stream table is mapping to one Kafka topic. + +A basic example of a model file is given below: + +{% highlight json %} +{ + "version": "1.0", + "defaultSchema": "KAFKA", + "schemas": [ + { + "name": "KAFKA", + "tables": [ + { + "name": "TABLE_NAME", + "type": "custom", + "factory": "org.apache.calcite.adapter.kafka.KafkaTableFactory", + "row.converter": "com.example.CustKafkaRowConverter", + "operand": { + "bootstrap.servers": "host1:port,host2:port", + "topic.name": "kafka.topic.name", + "consumer.params": { + "key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer", + "value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer" + } + } + } + ] + } + ] +} +{% endhighlight %} + +Note that: + +1. As Kafka message is schemaless, a [KafkaRowConverter]({{ site.apiRoot }}/org/apache/calcite/adapter/kafka/KafkaRowConverter.html) + is required to specify row schema explicitly(with parameter `row.converter`), and + how to decode Kafka message to Calcite row. [KafkaRowConverterImpl]({{ site.apiRoot }}/org/apache/calcite/adapter/kafka/KafkaRowConverterImpl.html) + is used if not provided; + +2. More consumer settings can be added in parameter `consumer.params`; + +Assuming this file is stored as `kafka.model.json`, you can connect to Kafka via +[`sqlline`](https://github.com/julianhyde/sqlline) as follows: + +{% highlight bash %} +$ ./sqlline +sqlline> !connect jdbc:calcite:model=kafka.model.json admin admin +{% endhighlight %} + +`sqlline` will now accept SQL queries which access your Kafka topics. + +With the Kafka table configured in above model. We can run a simple query to fetch messages: + +{% highlight sql %} +sqlline> SELECT STREAM * + FROM KAFKA.TABLE_NAME; ++---------------+---------------------+---------------------+---------------+-----------------+ +| MSG_PARTITION | MSG_TIMESTAMP | MSG_OFFSET | MSG_KEY_BYTES | MSG_VALUE_BYTES | ++---------------+---------------------+---------------------+---------------+-----------------+ +| 0 | -1 | 0 | mykey0 | myvalue0 | +| 0 | -1 | 1 | mykey1 | myvalue1 | ++---------------+---------------------+---------------------+---------------+-----------------+ +{% endhighlight %} + +Kafka table is a streaming table, which runs continuously, `LIMIT` can be added to return fast as below: + +{% highlight sql %} +sqlline> SELECT STREAM * + FROM KAFKA.TABLE_NAME + LIMIT 5; +{% endhighlight %} diff --git a/sqlline b/sqlline index c26f001ba77e..dfabbd03f3b4 100755 --- a/sqlline +++ b/sqlline @@ -37,7 +37,7 @@ if [ ! -f target/fullclasspath.txt ]; then fi CP= -for module in core cassandra druid elasticsearch2 elasticsearch5 file mongodb server spark splunk geode example/csv example/function; do +for module in core cassandra druid elasticsearch2 elasticsearch5 file kafka mongodb server spark splunk geode example/csv example/function; do CP=${CP}${module}/target/classes: CP=${CP}${module}/target/test-classes: done