From ce73a8b9fc4ee13620d07c1f621a36a7e327a872 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 13 Sep 2016 11:40:13 +0900 Subject: [PATCH 1/2] STORM-2089 Replace Consumer of ISqlTridentDataSource with SqlTridentConsumer * SqlTridentConsumer contains StateFactory and StateUpdater which is needed to store tuples to State via batch * Apply the change to storm-sql-kafka * move out JsonScheme and JsonSerializer to runtime * it will be used from other external sql modules * add javadoc to ISqlTridentDataSource --- .../trident/TridentLogicalPlanCompiler.java | 9 +- .../backends/trident/TestPlanCompiler.java | 33 +++-- .../sql/kafka/KafkaDataSourcesProvider.java | 61 +++----- .../kafka/TestKafkaDataSourcesProvider.java | 26 +++- external/sql/storm-sql-runtime/pom.xml | 4 + .../apache/storm/sql/runtime/FieldInfo.java | 4 +- .../sql/runtime/ISqlTridentDataSource.java | 54 ++++++- .../sql/runtime/serde/json}/JsonScheme.java | 4 +- .../runtime/serde/json}/JsonSerializer.java | 9 +- .../storm/sql}/TestJsonRepresentation.java | 4 +- .../test/org/apache/storm/sql/TestUtils.java | 137 +++++++++--------- 11 files changed, 198 insertions(+), 147 deletions(-) rename external/sql/{storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka => storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json}/JsonScheme.java (95%) rename external/sql/{storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka => storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json}/JsonSerializer.java (88%) rename external/sql/{storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka => storm-sql-runtime/src/test/org/apache/storm/sql}/TestJsonRepresentation.java (92%) diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java index 688b164ad86..a0bfa21d893 100644 --- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/TridentLogicalPlanCompiler.java @@ -131,8 +131,13 @@ public IAggregatableStream visitTableModify(TableModify modify, List inputFields = input.getRowType().getFieldNames(); List outputFields = modify.getRowType().getFieldNames(); - return inputStream.each(new Fields(inputFields), sources.get(tableName).getConsumer(), new Fields(outputFields)) - .name(stageName); + ISqlTridentDataSource.SqlTridentConsumer consumer = sources.get(tableName).getConsumer(); + + // In fact this is normally the end of stream, but to match the return type we open new streams based on State values + return inputStream + .partitionPersist(consumer.getStateFactory(), new Fields(inputFields), consumer.getStateUpdater(), + new Fields(outputFields)) + .newValuesStream().name(stageName); } @Override diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java index 35d1364c086..85510a0d6ab 100644 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java @@ -29,7 +29,6 @@ import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.sql.TestUtils; -import org.apache.storm.sql.TestUtils.MockSqlTridentDataSource.CollectDataFunction; import org.apache.storm.sql.compiler.TestCompilerUtils; import org.apache.storm.sql.runtime.ISqlTridentDataSource; import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor; @@ -46,7 +45,7 @@ import java.util.Map; import java.util.concurrent.Callable; -import static org.apache.storm.sql.TestUtils.MockSqlTridentDataSource.CollectDataFunction.getCollectedValues; +import static org.apache.storm.sql.TestUtils.MockState.getCollectedValues; public class TestPlanCompiler { private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl( @@ -68,7 +67,8 @@ public void testCompile() throws Exception { final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); Fields f = proc.outputStream().getOutputFields(); - proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream(); + proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), + f, new TestUtils.MockStateUpdater(), new Fields()); runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); Assert.assertArrayEquals(new Values[] { new Values(3), new Values(4)}, getCollectedValues().toArray()); } @@ -84,7 +84,8 @@ public void testCompileGroupByExp() throws Exception { final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); Fields f = proc.outputStream().getOutputFields(); - proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream(); + proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), + f, new TestUtils.MockStateUpdater(), new Fields()); runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); Assert.assertArrayEquals(new Values[] { new Values(0, 5L, 5, 1, 3, 4)}, getCollectedValues().toArray()); @@ -101,7 +102,8 @@ public void testCompileGroupByExpWithExprInAggCall() throws Exception { final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); Fields f = proc.outputStream().getOutputFields(); - proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream(); + proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), + f, new TestUtils.MockStateUpdater(), new Fields()); runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); Assert.assertArrayEquals(new Values[] { new Values(0, 5L, 39)}, getCollectedValues().toArray()); @@ -119,7 +121,8 @@ public void testCompileEquiJoinAndGroupBy() throws Exception { final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); Fields f = proc.outputStream().getOutputFields(); - proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream(); + proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), + f, new TestUtils.MockStateUpdater(), new Fields()); runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); assertListsAreEqualIgnoringOrder(Lists.newArrayList(new Values(1, 2L), new Values(0, 2L)), getCollectedValues()); @@ -137,7 +140,8 @@ public void testCompileEquiJoinWithLeftOuterJoin() throws Exception { final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); Fields f = proc.outputStream().getOutputFields(); - proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream(); + proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), + f, new TestUtils.MockStateUpdater(), new Fields()); runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); assertListsAreEqualIgnoringOrder(Lists.newArrayList(new Values(2, null), new Values(3, null), new Values(4, null)), getCollectedValues()); @@ -155,7 +159,8 @@ public void testCompileEquiJoinWithRightOuterJoin() throws Exception { final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); Fields f = proc.outputStream().getOutputFields(); - proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream(); + proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), + f, new TestUtils.MockStateUpdater(), new Fields()); runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); assertListsAreEqualIgnoringOrder(Lists.newArrayList(new Values(2, null), new Values(3, null), new Values(4, null)), getCollectedValues()); @@ -173,7 +178,8 @@ public void testCompileEquiJoinWithFullOuterJoin() throws Exception { final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); Fields f = proc.outputStream().getOutputFields(); - proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream(); + proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), + f, new TestUtils.MockStateUpdater(), new Fields()); runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); assertListsAreEqualIgnoringOrder(Lists.newArrayList(new Values(null, "dept-2"), new Values(null, "dept-3"), new Values(null, "dept-4"), @@ -207,7 +213,8 @@ public void testLogicalExpr() throws Exception { AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); Fields f = proc.outputStream().getOutputFields(); - proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream(); + proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), + f, new TestUtils.MockStateUpdater(), new Fields()); runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); Assert.assertArrayEquals(new Values[] { new Values(true, false, true) }, getCollectedValues().toArray()); } @@ -225,7 +232,8 @@ public void testUdf() throws Exception { AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); Fields f = proc.outputStream().getOutputFields(); - proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream(); + proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), + f, new TestUtils.MockStateUpdater(), new Fields()); runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); Assert.assertArrayEquals(new Values[] { new Values(5) }, getCollectedValues().toArray()); } @@ -241,7 +249,8 @@ public void testUdaf() throws Exception { AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); Fields f = proc.outputStream().getOutputFields(); - proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream(); + proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), + f, new TestUtils.MockStateUpdater(), new Fields()); runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); Assert.assertArrayEquals(new Values[] { new Values(0, 5L, 15L, 15L) }, getCollectedValues().toArray()); } diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java index 9f8eeac4f96..d19e65d48aa 100644 --- a/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java +++ b/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java @@ -17,6 +17,8 @@ */ package org.apache.storm.sql.kafka; +import org.apache.storm.kafka.trident.TridentKafkaStateFactory; +import org.apache.storm.kafka.trident.TridentKafkaUpdater; import org.apache.storm.spout.SchemeAsMultiScheme; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; @@ -24,14 +26,10 @@ import org.apache.storm.kafka.ZkHosts; import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout; import org.apache.storm.kafka.trident.TridentKafkaConfig; -import org.apache.storm.kafka.trident.TridentKafkaState; import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper; import org.apache.storm.kafka.trident.selector.DefaultTopicSelector; -import org.apache.storm.kafka.trident.selector.KafkaTopicSelector; -import org.apache.storm.trident.operation.BaseFunction; -import org.apache.storm.trident.operation.Function; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.operation.TridentOperationContext; +import org.apache.storm.sql.runtime.serde.json.JsonScheme; +import org.apache.storm.sql.runtime.serde.json.JsonSerializer; import org.apache.storm.trident.spout.ITridentDataSource; import org.apache.storm.trident.tuple.TridentTuple; @@ -68,42 +66,6 @@ public ByteBuffer getMessageFromTuple(TridentTuple tuple) { } } - static class KafkaTridentSink extends BaseFunction { - private transient TridentKafkaState state; - private final String topic; - private final int primaryKeyIndex; - private final Properties producerProperties; - private final List fieldNames; - - private KafkaTridentSink(String topic, int primaryKeyIndex, Properties producerProperties, - List fieldNames) { - this.topic = topic; - this.primaryKeyIndex = primaryKeyIndex; - this.producerProperties = producerProperties; - this.fieldNames = fieldNames; - } - - @Override - public void cleanup() { - super.cleanup(); - } - - @Override - public void prepare(Map conf, TridentOperationContext context) { - JsonSerializer serializer = new JsonSerializer(fieldNames); - SqlKafkaMapper m = new SqlKafkaMapper(primaryKeyIndex, serializer); - state = new TridentKafkaState() - .withKafkaTopicSelector(new DefaultTopicSelector(topic)) - .withTridentTupleToKafkaMapper(m); - state.prepare(producerProperties); - } - - @Override - public void execute(TridentTuple tuple, TridentCollector collector) { - state.updateState(Collections.singletonList(tuple), collector); - } - } - private static class KafkaTridentDataSource implements ISqlTridentDataSource { private final TridentKafkaConfig conf; private final String topic; @@ -125,7 +87,7 @@ public ITridentDataSource getProducer() { } @Override - public Function getConsumer() { + public SqlTridentConsumer getConsumer() { Preconditions.checkNotNull(producerProperties, "Writable Kafka Table " + topic + " must contain producer config"); Properties props = new Properties(); @@ -141,7 +103,18 @@ public Function getConsumer() { } Preconditions.checkState(props.containsKey("bootstrap.servers"), "Writable Kafka Table " + topic + " must contain \"bootstrap.servers\" config"); - return new KafkaTridentSink(topic, primaryKeyIndex, props, fields); + + JsonSerializer serializer = new JsonSerializer(fields); + SqlKafkaMapper mapper = new SqlKafkaMapper(primaryKeyIndex, serializer); + + TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() + .withKafkaTopicSelector(new DefaultTopicSelector(topic)) + .withProducerProperties(props) + .withTridentTupleToKafkaMapper(mapper); + + TridentKafkaUpdater stateUpdater = new TridentKafkaUpdater(); + + return new SqlTridentConsumer(stateFactory, stateUpdater); } } diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java b/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java index f6e75abb020..2725893864e 100644 --- a/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java +++ b/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java @@ -22,16 +22,18 @@ import com.google.common.collect.Lists; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.storm.sql.kafka.KafkaDataSourcesProvider.KafkaTridentSink; +import org.apache.storm.kafka.trident.TridentKafkaState; +import org.apache.storm.kafka.trident.TridentKafkaStateFactory; +import org.apache.storm.kafka.trident.TridentKafkaUpdater; import org.apache.storm.sql.runtime.DataSourcesRegistry; import org.apache.storm.sql.runtime.FieldInfo; import org.apache.storm.sql.runtime.ISqlTridentDataSource; +import org.apache.storm.sql.runtime.serde.json.JsonSerializer; +import org.apache.storm.trident.tuple.TridentTuple; import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentMatcher; import org.mockito.internal.util.reflection.Whitebox; -import org.apache.storm.kafka.trident.TridentKafkaState; -import org.apache.storm.trident.tuple.TridentTuple; import java.net.URI; import java.nio.ByteBuffer; @@ -40,7 +42,12 @@ import java.util.List; import java.util.concurrent.Future; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; public class TestKafkaDataSourcesProvider { private static final List FIELDS = ImmutableList.of( @@ -63,12 +70,17 @@ public void testKafkaSink() { ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource( URI.create("kafka://mock?topic=foo"), null, null, TBL_PROPERTIES, FIELDS); Assert.assertNotNull(ds); - KafkaTridentSink sink = (KafkaTridentSink) ds.getConsumer(); - sink.prepare(null, null); - TridentKafkaState state = (TridentKafkaState) Whitebox.getInternalState(sink, "state"); + + ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer(); + + Assert.assertEquals(TridentKafkaStateFactory.class, consumer.getStateFactory().getClass()); + Assert.assertEquals(TridentKafkaUpdater.class, consumer.getStateUpdater().getClass()); + + TridentKafkaState state = (TridentKafkaState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1); KafkaProducer producer = mock(KafkaProducer.class); doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class)); Whitebox.setInternalState(state, "producer", producer); + List tupleList = mockTupleList(); for (TridentTuple t : tupleList) { state.updateState(Collections.singletonList(t), null); diff --git a/external/sql/storm-sql-runtime/pom.xml b/external/sql/storm-sql-runtime/pom.xml index 0f02be209d6..d911f9b1032 100644 --- a/external/sql/storm-sql-runtime/pom.xml +++ b/external/sql/storm-sql-runtime/pom.xml @@ -73,6 +73,10 @@ + + com.fasterxml.jackson.core + jackson-databind + org.codehaus.janino diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java index cb1176bfa07..03b030b6631 100644 --- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java +++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java @@ -17,10 +17,12 @@ */ package org.apache.storm.sql.runtime; +import java.io.Serializable; + /** * Describe each column of the field */ -public class FieldInfo { +public class FieldInfo implements Serializable { private final String name; private final Class type; private final boolean isPrimary; diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java index 92961dcca16..544ed6c0cbb 100644 --- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java +++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java @@ -17,14 +17,62 @@ */ package org.apache.storm.sql.runtime; -import org.apache.storm.trident.operation.Function; -import org.apache.storm.trident.spout.IBatchSpout; import org.apache.storm.trident.spout.ITridentDataSource; +import org.apache.storm.trident.state.StateFactory; +import org.apache.storm.trident.state.StateUpdater; /** * A ISqlTridentDataSource specifies how an external data source produces and consumes data. */ public interface ISqlTridentDataSource { + /** + * SqlTridentConsumer is a data structure containing StateFactory and StateUpdater for consuming tuples with State. + * + * Please note that StateFactory and StateUpdater should use same class which implements State. + * + * @see org.apache.storm.trident.state.StateFactory + * @see org.apache.storm.trident.state.StateUpdater + */ + class SqlTridentConsumer { + private StateFactory stateFactory; + private StateUpdater stateUpdater; + + public SqlTridentConsumer(StateFactory stateFactory, StateUpdater stateUpdater) { + this.stateFactory = stateFactory; + this.stateUpdater = stateUpdater; + } + + public StateFactory getStateFactory() { + return stateFactory; + } + + public StateUpdater getStateUpdater() { + return stateUpdater; + } + } + + /** + * Provides instance of ITridentDataSource which can be used as producer in Trident. + * + * Since ITridentDataSource is a marker interface for Trident Spout interfaces, this method should effectively + * return an instance of one of these interfaces (can be changed if Trident API evolves) or descendants: + * - IBatchSpout + * - ITridentSpout + * - IPartitionedTridentSpout + * - IOpaquePartitionedTridentSpout + * + * @see org.apache.storm.trident.spout.ITridentDataSource + * @see org.apache.storm.trident.spout.IBatchSpout + * @see org.apache.storm.trident.spout.ITridentSpout + * @see org.apache.storm.trident.spout.IPartitionedTridentSpout + * @see org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout + */ ITridentDataSource getProducer(); - Function getConsumer(); + + /** + * Provides instance of SqlTridentConsumer which can be used as consumer (State) in Trident. + * + * @see SqlTridentConsumer + */ + SqlTridentConsumer getConsumer(); } diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java similarity index 95% rename from external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java rename to external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java index eed128237e7..3d4de0d5dff 100644 --- a/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java +++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonScheme.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.sql.kafka; +package org.apache.storm.sql.runtime.serde.json; import org.apache.storm.spout.Scheme; import org.apache.storm.tuple.Fields; @@ -31,7 +31,7 @@ public class JsonScheme implements Scheme { private final List fields; - JsonScheme(List fields) { + public JsonScheme(List fields) { this.fields = fields; } diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java similarity index 88% rename from external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java rename to external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java index e3d5d01d303..1e825c4dfb2 100644 --- a/external/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java +++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/serde/json/JsonSerializer.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.sql.kafka; +package org.apache.storm.sql.runtime.serde.json; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; @@ -23,16 +23,17 @@ import org.apache.storm.sql.runtime.IOutputSerializer; import java.io.IOException; +import java.io.Serializable; import java.io.StringWriter; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.List; -class JsonSerializer implements IOutputSerializer { +public class JsonSerializer implements IOutputSerializer, Serializable { private final List fieldNames; - private transient final JsonFactory jsonFactory; + private final JsonFactory jsonFactory; - JsonSerializer(List fieldNames) { + public JsonSerializer(List fieldNames) { this.fieldNames = fieldNames; jsonFactory = new JsonFactory(); } diff --git a/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestJsonRepresentation.java similarity index 92% rename from external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java rename to external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestJsonRepresentation.java index 7e8541037ba..6ca18772ae9 100644 --- a/external/sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java +++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestJsonRepresentation.java @@ -15,8 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.sql.kafka; +package org.apache.storm.sql; +import org.apache.storm.sql.runtime.serde.json.JsonScheme; +import org.apache.storm.sql.runtime.serde.json.JsonSerializer; import org.apache.storm.utils.Utils; import com.google.common.collect.Lists; import org.junit.Test; diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java index 5d0384aac9c..1536db70fa9 100644 --- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java +++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java @@ -21,7 +21,12 @@ import org.apache.storm.ILocalCluster; import org.apache.storm.LocalCluster; +import org.apache.storm.task.IMetricsContext; import org.apache.storm.task.TopologyContext; +import org.apache.storm.trident.operation.TridentOperationContext; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.state.StateFactory; +import org.apache.storm.trident.state.StateUpdater; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.sql.runtime.ChannelContext; @@ -160,31 +165,70 @@ public void open(ChannelContext ctx) { } } - public static class MockSqlTridentDataSource implements ISqlTridentDataSource { - @Override - public IBatchSpout getProducer() { - return new MockSpout(); + public static class MockState implements State { + /** + * Collect all values in a static variable as the instance will go through serialization and deserialization. + * NOTE: This should be cleared before or after running each test. + */ + private transient static final List > VALUES = new ArrayList<>(); + + public static List> getCollectedValues() { + return VALUES; } @Override - public Function getConsumer() { - return new CollectDataFunction(); + public void beginCommit(Long txid) { + // NOOP } - public static class CollectDataFunction extends BaseFunction { - /** - * Collect all values in a static variable as the instance will go through serialization and deserialization. - */ - private transient static final List > VALUES = new ArrayList<>(); - public static List> getCollectedValues() { - return VALUES; - } + @Override + public void commit(Long txid) { + // NOOP + } - @Override - public void execute(TridentTuple tuple, TridentCollector collector) { + public void updateState(List tuples, TridentCollector collector) { + for (TridentTuple tuple : tuples) { VALUES.add(tuple.getValues()); } } + } + + public static class MockStateFactory implements StateFactory { + + @Override + public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { + return new MockState(); + } + } + + public static class MockStateUpdater implements StateUpdater { + + @Override + public void updateState(MockState state, List tuples, TridentCollector collector) { + state.updateState(tuples, collector); + } + + @Override + public void prepare(Map conf, TridentOperationContext context) { + // NOOP + } + + @Override + public void cleanup() { + // NOOP + } + } + + public static class MockSqlTridentDataSource implements ISqlTridentDataSource { + @Override + public IBatchSpout getProducer() { + return new MockSpout(); + } + + @Override + public SqlTridentConsumer getConsumer() { + return new SqlTridentConsumer(new MockStateFactory(), new MockStateUpdater()); + } private static class MockSpout implements IBatchSpout { private final ArrayList RECORDS = new ArrayList<>(); @@ -241,23 +285,8 @@ public IBatchSpout getProducer() { } @Override - public Function getConsumer() { - return new CollectDataFunction(); - } - - public static class CollectDataFunction extends BaseFunction { - /** - * Collect all values in a static variable as the instance will go through serialization and deserialization. - */ - private transient static final List > VALUES = new ArrayList<>(); - public static List> getCollectedValues() { - return VALUES; - } - - @Override - public void execute(TridentTuple tuple, TridentCollector collector) { - VALUES.add(tuple.getValues()); - } + public SqlTridentConsumer getConsumer() { + return new SqlTridentConsumer(new MockStateFactory(), new MockStateUpdater()); } private static class MockGroupedSpout implements IBatchSpout { @@ -315,23 +344,8 @@ public IBatchSpout getProducer() { } @Override - public Function getConsumer() { - return new CollectDataFunction(); - } - - public static class CollectDataFunction extends BaseFunction { - /** - * Collect all values in a static variable as the instance will go through serialization and deserialization. - */ - private transient static final List > VALUES = new ArrayList<>(); - public static List> getCollectedValues() { - return VALUES; - } - - @Override - public void execute(TridentTuple tuple, TridentCollector collector) { - VALUES.add(tuple.getValues()); - } + public SqlTridentConsumer getConsumer() { + return new SqlTridentConsumer(new MockStateFactory(), new MockStateUpdater()); } private static class MockSpout implements IBatchSpout { @@ -392,23 +406,8 @@ public IBatchSpout getProducer() { } @Override - public Function getConsumer() { - return new CollectDataFunction(); - } - - public static class CollectDataFunction extends BaseFunction { - /** - * Collect all values in a static variable as the instance will go through serialization and deserialization. - */ - private transient static final List > VALUES = new ArrayList<>(); - public static List> getCollectedValues() { - return VALUES; - } - - @Override - public void execute(TridentTuple tuple, TridentCollector collector) { - VALUES.add(tuple.getValues()); - } + public SqlTridentConsumer getConsumer() { + return new SqlTridentConsumer(new MockStateFactory(), new MockStateUpdater()); } private static class MockSpout implements IBatchSpout { @@ -490,8 +489,4 @@ public static long monotonicNow() { final long NANOSECONDS_PER_MILLISECOND = 1000000; return System.nanoTime() / NANOSECONDS_PER_MILLISECOND; } - - public static ILocalCluster newLocalCluster() { - return new LocalCluster(); - } } From 3e050569e06276446e2bcec42dff80bfd659e8f4 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 22 Sep 2016 15:38:44 +0900 Subject: [PATCH 2/2] STORM-2111 [Storm SQL] support 'LIKE' and 'SIMILAR TO' * associate 'LIKE' and 'SIMILAR TO' to its implementation Calcite is providing * NOTE: SIMILAR TO takes SQL regex, not Java regex * add unit tests --- .../storm/sql/compiler/ExprCompiler.java | 9 +- .../backends/trident/TestPlanCompiler.java | 86 ++++++++++++++++++- .../test/org/apache/storm/sql/TestUtils.java | 8 +- 3 files changed, 96 insertions(+), 7 deletions(-) diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java index 4e1c127e9fd..1e5c3dfa6c4 100644 --- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java @@ -196,6 +196,8 @@ private ImpTable() { .put(builtInMethod(CHAR_LENGTH, BuiltInMethod.CHAR_LENGTH, NullPolicy.STRICT)) .put(builtInMethod(CONCAT, BuiltInMethod.STRING_CONCAT, NullPolicy.STRICT)) .put(builtInMethod(ITEM, BuiltInMethod.ANY_ITEM, NullPolicy.STRICT)) + .put(builtInMethod(LIKE, BuiltInMethod.LIKE, NullPolicy.STRICT)) + .put(builtInMethod(SIMILAR_TO, BuiltInMethod.SIMILAR, NullPolicy.STRICT)) .put(infixBinary(LESS_THAN, "<", "lt")) .put(infixBinary(LESS_THAN_OR_EQUAL, "<=", "le")) .put(infixBinary(GREATER_THAN, ">", "gt")) @@ -405,7 +407,7 @@ public String translate( String s; if (rhsNullable) { s = foldNullExpr( - String.format("(%2$s != null && !(%2$s)) ? Boolean.FALSE : ((%1$s == null || %2$s == null) ? null : Boolean.TRUE)", + String.format("(%2$s != null && !(%2$s)) ? Boolean.FALSE : ((%1$s == null || %2$s == null) ? ((Boolean) null) : Boolean.TRUE)", lhs, rhs), "null", op1); } else { s = String.format("!(%2$s) ? Boolean.FALSE : %1$s", lhs, rhs); @@ -446,7 +448,7 @@ public String translate( String s; if (rhsNullable) { s = foldNullExpr( - String.format("(%2$s != null && %2$s) ? Boolean.TRUE : ((%1$s == null || %2$s == null) ? null : Boolean.FALSE)", + String.format("(%2$s != null && %2$s) ? Boolean.TRUE : ((%1$s == null || %2$s == null) ? ((Boolean) null) : Boolean.FALSE)", lhs, rhs), "null", op1); } else { @@ -463,6 +465,7 @@ public String translate( @Override public String translate( ExprCompiler compiler, RexCall call) { + Boolean b = new Boolean(false); String val = compiler.reserveName(); PrintWriter pw = compiler.pw; RexNode op = call.getOperands().get(0); @@ -474,7 +477,7 @@ public String translate( pw.print(String.format("%1$s = !(%2$s);\n", val, lhs)); } else { String s = foldNullExpr( - String.format("%1$s == null ? null : !(%1$s)", lhs), "null", op); + String.format("(%1$s == null) ? ((%2$s) null) : !(%1$s)", lhs, compiler.javaTypeName(call)), "null", op); pw.print(String.format("%1$s = %2$s;\n", val, s)); } return val; diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java index 85510a0d6ab..ac0290e3ca0 100644 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java @@ -199,7 +199,7 @@ public void testInsert() throws Exception { final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); final TridentTopology topo = proc.build(data); runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); - Assert.assertArrayEquals(new Values[] { new Values(4, "x", "y")}, getCollectedValues().toArray()); + Assert.assertArrayEquals(new Values[] { new Values(4, "abcde", "y")}, getCollectedValues().toArray()); } @Test @@ -255,6 +255,90 @@ public void testUdaf() throws Exception { Assert.assertArrayEquals(new Values[] { new Values(0, 5L, 15L, 15L) }, getCollectedValues().toArray()); } + @Test + public void testLike() throws Exception { + int EXPECTED_VALUE_SIZE = 2; + + // 'abcd', 'abcde' matched + String sql = "SELECT ID FROM FOO WHERE NAME LIKE '%c_%'"; + TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql); + + final Map data = new HashMap<>(); + data.put("FOO", new TestUtils.MockSqlTridentDataSource()); + PlanCompiler compiler = new PlanCompiler(data, typeFactory); + final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); + final TridentTopology topo = proc.build(data); + Fields f = proc.outputStream().getOutputFields(); + proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), + f, new TestUtils.MockStateUpdater(), new Fields()); + runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); + + Assert.assertArrayEquals(new Values[] { new Values(3), new Values(4) }, getCollectedValues().toArray()); + } + + @Test + public void testSimilar() throws Exception { + int EXPECTED_VALUE_SIZE = 2; + + // 'abc' and 'abcd' matched + String sql = "SELECT ID FROM FOO WHERE NAME SIMILAR TO '[a-zA-Z]+[cd]{1}'"; + TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql); + + final Map data = new HashMap<>(); + data.put("FOO", new TestUtils.MockSqlTridentDataSource()); + PlanCompiler compiler = new PlanCompiler(data, typeFactory); + final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); + final TridentTopology topo = proc.build(data); + Fields f = proc.outputStream().getOutputFields(); + proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), + f, new TestUtils.MockStateUpdater(), new Fields()); + runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); + + Assert.assertArrayEquals(new Values[] { new Values(2), new Values(3) }, getCollectedValues().toArray()); + } + + @Test + public void testNotLike() throws Exception { + int EXPECTED_VALUE_SIZE = 3; + + // 'a', 'ab', 'abc' matched + String sql = "SELECT ID FROM FOO WHERE NAME NOT LIKE '%c_%'"; + TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql); + + final Map data = new HashMap<>(); + data.put("FOO", new TestUtils.MockSqlTridentDataSource()); + PlanCompiler compiler = new PlanCompiler(data, typeFactory); + final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); + final TridentTopology topo = proc.build(data); + Fields f = proc.outputStream().getOutputFields(); + proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), + f, new TestUtils.MockStateUpdater(), new Fields()); + runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); + + Assert.assertArrayEquals(new Values[] { new Values(0), new Values(1), new Values(2) }, getCollectedValues().toArray()); + } + + @Test + public void testNotSimilar() throws Exception { + int EXPECTED_VALUE_SIZE = 3; + + // 'a', 'ab', 'abcde' matched + String sql = "SELECT ID FROM FOO WHERE NAME NOT SIMILAR TO '[a-zA-Z]+[cd]{1}'"; + TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql); + + final Map data = new HashMap<>(); + data.put("FOO", new TestUtils.MockSqlTridentDataSource()); + PlanCompiler compiler = new PlanCompiler(data, typeFactory); + final AbstractTridentProcessor proc = compiler.compileForTest(state.tree()); + final TridentTopology topo = proc.build(data); + Fields f = proc.outputStream().getOutputFields(); + proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), + f, new TestUtils.MockStateUpdater(), new Fields()); + runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); + + Assert.assertArrayEquals(new Values[] { new Values(0), new Values(1), new Values(4) }, getCollectedValues().toArray()); + } + private void runTridentTopology(final int expectedValueSize, AbstractTridentProcessor proc, TridentTopology topo) throws Exception { final Config conf = new Config(); diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java index 1536db70fa9..c3c91c39d0a 100644 --- a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java +++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java @@ -235,9 +235,11 @@ private static class MockSpout implements IBatchSpout { private final Fields OUTPUT_FIELDS = new Fields("ID", "NAME", "ADDR"); public MockSpout() { - for (int i = 0; i < 5; ++i) { - RECORDS.add(new Values(i, "x", "y")); - } + RECORDS.add(new Values(0, "a", "y")); + RECORDS.add(new Values(1, "ab", "y")); + RECORDS.add(new Values(2, "abc", "y")); + RECORDS.add(new Values(3, "abcd", "y")); + RECORDS.add(new Values(4, "abcde", "y")); } private boolean emitted = false;