From 03b3a26e53e788adbcbbda8c77cc0bd4b8eccac6 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Sat, 13 Mar 2021 18:41:27 -0800 Subject: [PATCH] Support writing general records to Pulsar sink (#9590) --- ...tate.yaml => ci-integration-function.yaml} | 4 +- build/run_integration_group.sh | 4 +- .../pulsar/functions/sink/PulsarSink.java | 46 +++- .../pulsar/functions/sink/PulsarSinkTest.java | 166 +++++++++++++ .../integration/io/GenericRecordSource.java | 88 +++++++ .../io/GenericRecordSourceTest.java | 234 ++++++++++++++++++ ...function-state.xml => pulsar-function.xml} | 5 +- .../integration/src/test/resources/pulsar.xml | 2 +- 8 files changed, 531 insertions(+), 18 deletions(-) rename .github/workflows/{ci-integration-function-state.yaml => ci-integration-function.yaml} (96%) create mode 100644 tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/GenericRecordSource.java create mode 100644 tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/GenericRecordSourceTest.java rename tests/integration/src/test/resources/{pulsar-function-state.xml => pulsar-function.xml} (81%) diff --git a/.github/workflows/ci-integration-function-state.yaml b/.github/workflows/ci-integration-function.yaml similarity index 96% rename from .github/workflows/ci-integration-function-state.yaml rename to .github/workflows/ci-integration-function.yaml index 3150b20bd36ccd..ca7d79ab752e1e 100644 --- a/.github/workflows/ci-integration-function-state.yaml +++ b/.github/workflows/ci-integration-function.yaml @@ -17,7 +17,7 @@ # under the License. # -name: CI - Integration - Function State +name: CI - Integration - Function & IO on: pull_request: branches: @@ -93,4 +93,4 @@ jobs: - name: run integration tests if: steps.docs.outputs.changed_only == 'no' - run: ./build/run_integration_group.sh FUNCTION_STATE \ No newline at end of file + run: ./build/run_integration_group.sh FUNCTION diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh index ea9c455c8e9ad3..b3fa5a099f6861 100755 --- a/build/run_integration_group.sh +++ b/build/run_integration_group.sh @@ -67,8 +67,8 @@ test_group_cli() { mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-auth.xml -DintegrationTests } -test_group_function_state() { - mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-function-state.xml -DintegrationTests +test_group_function() { + mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-function.xml -DintegrationTests } test_group_messaging() { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 50c8d8a90fde29..80d82a12df60eb 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -24,7 +24,6 @@ import lombok.Builder; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import lombok.val; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.BatcherBuilder; import org.apache.pulsar.client.api.CompressionType; @@ -39,12 +38,15 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.KeyValueSchema; import org.apache.pulsar.common.functions.ConsumerConfig; import org.apache.pulsar.common.functions.CryptoConfig; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.ProducerConfig; import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.instance.FunctionResultRouter; import org.apache.pulsar.functions.instance.SinkRecord; @@ -93,10 +95,10 @@ private interface PulsarSinkProcessor { void close() throws Exception; } - private abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor { + abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor { protected Map> publishProducers = new ConcurrentHashMap<>(); protected Schema schema; - protected Crypto crypto; + protected Crypto crypto; protected PulsarSinkProcessorBase(Schema schema, Crypto crypto) { this.schema = schema; @@ -153,11 +155,16 @@ protected Producer getProducer(String destinationTopic, Schema schema) { protected Producer getProducer(String producerId, String producerName, String topicName, Schema schema) { return publishProducers.computeIfAbsent(producerId, s -> { try { - return createProducer( + log.info("Initializing producer {} on topic {} with schema {}", + producerName, topicName, schema); + Producer producer = createProducer( client, topicName, producerName, schema != null ? schema : this.schema); + log.info("Initialized producer {} on topic {} with schema {}: {} -> {}", + producerName, topicName, schema, producerId, producer); + return producer; } catch (PulsarClientException e) { log.error("Failed to create Producer while doing user publish", e); throw new RuntimeException(e); @@ -209,13 +216,21 @@ public Function getPublishErrorHandler(SinkRecord record, bo class PulsarSinkAtMostOnceProcessor extends PulsarSinkProcessorBase { public PulsarSinkAtMostOnceProcessor(Schema schema, Crypto crypto) { super(schema, crypto); - // initialize default topic - try { - publishProducers.put(pulsarSinkConfig.getTopic(), + if (!(schema instanceof AutoConsumeSchema)) { + // initialize default topic + try { + publishProducers.put(pulsarSinkConfig.getTopic(), createProducer(client, pulsarSinkConfig.getTopic(), null, schema)); - } catch (PulsarClientException e) { - log.error("Failed to create Producer while doing user publish", e); - throw new RuntimeException(e); } + } catch (PulsarClientException e) { + log.error("Failed to create Producer while doing user publish", e); + throw new RuntimeException(e); + } + } else { + if (log.isDebugEnabled()) { + log.debug("The Pulsar producer is not initialized until the first record is" + + " published for `AUTO_CONSUME` schema."); + } + } } @Override @@ -400,7 +415,16 @@ Schema initializeSchema() throws ClassNotFoundException { ConsumerConfig consumerConfig = new ConsumerConfig(); consumerConfig.setSchemaProperties(pulsarSinkConfig.getSchemaProperties()); if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) { - consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType()); + if (GenericRecord.class.isAssignableFrom(typeArg)) { + consumerConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString()); + SchemaType configuredSchemaType = SchemaType.valueOf(pulsarSinkConfig.getSchemaType()); + if (SchemaType.AUTO_CONSUME != configuredSchemaType) { + log.info("The configured schema type {} is not able to write GenericRecords." + + " So overwrite the schema type to be {}", configuredSchemaType, SchemaType.AUTO_CONSUME); + } + } else { + consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType()); + } return (Schema) topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg, consumerConfig, false); } else { diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java index a61df129bceed7..8848d412937d08 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java @@ -29,12 +29,14 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.io.IOException; import java.util.HashMap; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -51,12 +53,21 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.GenericRecordBuilder; +import org.apache.pulsar.client.api.schema.GenericSchema; +import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; +import org.apache.pulsar.client.api.schema.SchemaBuilder; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.instance.SinkRecord; import org.apache.pulsar.functions.instance.stats.ComponentStatsManager; +import org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessorBase; import org.apache.pulsar.functions.source.TopicSchema; import org.apache.pulsar.io.core.SinkContext; import org.testng.Assert; @@ -252,6 +263,62 @@ public void testComplexOuputType() throws PulsarClientException { } } + @Test + public void testInitializeSchema() throws Exception { + PulsarClient pulsarClient = getPulsarClient(); + + // generic record type (no serde and no schema type) + PulsarSinkConfig pulsarSinkConfig = getPulsarConfigs(); + pulsarSinkConfig.setSerdeClassName(null); + pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName()); + PulsarSink sink = new PulsarSink( + pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class), + Thread.currentThread().getContextClassLoader()); + Schema schema = sink.initializeSchema(); + assertTrue(schema instanceof AutoConsumeSchema); + + // generic record type (default serde and no schema type) + pulsarSinkConfig = getPulsarConfigs(); + pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName()); + sink = new PulsarSink( + pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class), + Thread.currentThread().getContextClassLoader()); + schema = sink.initializeSchema(); + assertTrue(schema instanceof AutoConsumeSchema); + + // generic record type (no serde and wrong schema type) + pulsarSinkConfig = getPulsarConfigs(); + pulsarSinkConfig.setSerdeClassName(null); + pulsarSinkConfig.setSchemaType(SchemaType.AVRO.toString()); + pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName()); + sink = new PulsarSink( + pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class), + Thread.currentThread().getContextClassLoader()); + schema = sink.initializeSchema(); + assertTrue(schema instanceof AutoConsumeSchema); + + // generic record type (no serde and AUTO_CONSUME schema type) + pulsarSinkConfig = getPulsarConfigs(); + pulsarSinkConfig.setSerdeClassName(null); + pulsarSinkConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString()); + pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName()); + sink = new PulsarSink( + pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class), + Thread.currentThread().getContextClassLoader()); + schema = sink.initializeSchema(); + assertTrue(schema instanceof AutoConsumeSchema); + + // generic record type (default serde and AUTO_CONSUME schema type) + pulsarSinkConfig = getPulsarConfigs(); + pulsarSinkConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString()); + pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName()); + sink = new PulsarSink( + pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class), + Thread.currentThread().getContextClassLoader()); + schema = sink.initializeSchema(); + assertTrue(schema instanceof AutoConsumeSchema); + } + @Test public void testSinkAndMessageRouting() throws Exception { @@ -415,6 +482,105 @@ public Optional getRecordSequence() { } } + @Test + public void testWriteGenericRecordsAtMostOnce() throws Exception { + testWriteGenericRecords(ProcessingGuarantees.ATMOST_ONCE); + } + + @Test + public void testWriteGenericRecordsAtLeastOnce() throws Exception { + testWriteGenericRecords(ProcessingGuarantees.ATLEAST_ONCE); + } + + @Test + public void testWriteGenericRecordsEOS() throws Exception { + testWriteGenericRecords(ProcessingGuarantees.EFFECTIVELY_ONCE); + } + + private void testWriteGenericRecords(ProcessingGuarantees guarantees) throws Exception { + String defaultTopic = "default"; + + PulsarSinkConfig sinkConfig = getPulsarConfigs(); + sinkConfig.setTopic(defaultTopic); + sinkConfig.setTypeClassName(GenericRecord.class.getName()); + sinkConfig.setProcessingGuarantees(guarantees); + + PulsarClient client = getPulsarClient(); + PulsarSink pulsarSink = new PulsarSink( + client, sinkConfig, new HashMap<>(), mock(ComponentStatsManager.class), + Thread.currentThread().getContextClassLoader()); + + pulsarSink.open(new HashMap<>(), mock(SinkContext.class)); + + if (ProcessingGuarantees.ATMOST_ONCE == guarantees) { + assertTrue(pulsarSink.pulsarSinkProcessor instanceof PulsarSink.PulsarSinkAtMostOnceProcessor); + } else if (ProcessingGuarantees.ATLEAST_ONCE == guarantees) { + assertTrue(pulsarSink.pulsarSinkProcessor instanceof PulsarSink.PulsarSinkAtLeastOnceProcessor); + } else { + assertTrue(pulsarSink.pulsarSinkProcessor instanceof PulsarSink.PulsarSinkEffectivelyOnceProcessor); + } + PulsarSinkProcessorBase processor = (PulsarSinkProcessorBase) pulsarSink.pulsarSinkProcessor; + assertFalse(processor.publishProducers.containsKey(defaultTopic)); + + String[] topics = { "topic-1", "topic-2", "topic-3" }; + for (String topic : topics) { + + RecordSchemaBuilder builder = SchemaBuilder.record("MyRecord"); + builder.field("number").type(SchemaType.INT32); + builder.field("text").type(SchemaType.STRING); + GenericSchema schema = Schema.generic(builder.build(SchemaType.AVRO)); + + GenericRecordBuilder recordBuilder = schema.newRecordBuilder(); + recordBuilder.set("number", 1); + recordBuilder.set("text", topic); + + GenericRecord genericRecord = recordBuilder.build(); + + SinkRecord record = new SinkRecord<>(new Record() { + + @Override + public Optional getDestinationTopic() { + return Optional.of(topic); + } + + @Override + public Schema getSchema() { + return schema; + } + + @Override + public GenericRecord getValue() { + return genericRecord; + } + + @Override + public Optional getPartitionId() { + return Optional.of(topic + "-id-1"); + } + + @Override + public Optional getRecordSequence() { + return Optional.of(1L); + } + }, genericRecord); + + pulsarSink.write(record); + + if (ProcessingGuarantees.EFFECTIVELY_ONCE == guarantees) { + assertTrue(processor.publishProducers.containsKey(String.format("%s-%s-id-1", topic, topic))); + } else { + assertTrue(processor.publishProducers.containsKey(topic)); + } + verify(client.newProducer(), times(1)) + .topic(argThat( + otherTopic -> topic != null ? topic.equals(otherTopic) : defaultTopic.equals(otherTopic))); + + verify(client, times(1)) + .newProducer(argThat( + otherSchema -> Objects.equals(otherSchema, schema))); + } + } + private Optional getTopicOptional(String topic) { if (topic != null) { return Optional.of(topic); diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/GenericRecordSource.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/GenericRecordSource.java new file mode 100644 index 00000000000000..f36b3df0c82cda --- /dev/null +++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/GenericRecordSource.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.io; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.Field; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.GenericSchema; +import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; +import org.apache.pulsar.client.api.schema.SchemaBuilder; +import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.Source; +import org.apache.pulsar.io.core.SourceContext; + +/** + * A source that generates {@link GenericRecord}s. + */ +@Slf4j +public class GenericRecordSource implements Source { + + private RecordSchemaBuilder recordSchemaBuilder; + private GenericSchema schema; + private List fields; + private AtomicInteger count = new AtomicInteger(); + + @Override + public void open(Map config, SourceContext sourceContext) throws Exception { + this.recordSchemaBuilder = SchemaBuilder.record("MyBean"); + this.recordSchemaBuilder.field("number").type(SchemaType.INT32); + this.recordSchemaBuilder.field("text").type(SchemaType.STRING); + schema = Schema.generic(this.recordSchemaBuilder.build(SchemaType.AVRO)); + fields = Arrays.asList(new Field("number", 0), + new Field("text", 1)); + log.info("created source, schema {}", new String(schema.getSchemaInfo().getSchema(), StandardCharsets.UTF_8)); + } + + @Override + public Record read() throws Exception { + // slow down the production of values + Thread.sleep(20); + + int value = count.incrementAndGet(); + GenericRecord record = schema.newRecordBuilder() + .set("number", value) + .set("text", "value-" + value) + .build(); + log.info("produced {}", record); + return new Record() { + @Override + public GenericRecord getValue() { + return record; + } + + @Override + public Schema getSchema() { + return schema; + } + }; + } + + @Override + public void close() { + // no-op + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/GenericRecordSourceTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/GenericRecordSourceTest.java new file mode 100644 index 00000000000000..1b591ec8e2ff41 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/GenericRecordSourceTest.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.io; + +import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR; +import static org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.Data; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.common.policies.data.SourceStatus; +import org.apache.pulsar.tests.integration.containers.StandaloneContainer; +import org.apache.pulsar.tests.integration.docker.ContainerExecException; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.testng.annotations.Test; + +/** + * This tests demonstrates how a Source can create messages using GenericRecord API + * and the consumer is able to consume it as AVRO messages, with GenericRecord and with Java Model + */ +@Slf4j +public class GenericRecordSourceTest extends PulsarStandaloneTestSuite { + + @Test(groups = {"source"}) + public void testGenericRecordSource() throws Exception { + String outputTopicName = "test-state-source-output-" + randomName(8); + String sourceName = "test-state-source-" + randomName(8); + int numMessages = 10; + + submitSourceConnector( + sourceName, + outputTopicName, + "org.apache.pulsar.tests.integration.io.GenericRecordSource", JAVAJAR); + + // get source info + getSourceInfoSuccess(container, sourceName); + + // get source status + getSourceStatus(container, sourceName); + + try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) { + + retryStrategically((test) -> { + try { + SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName); + return status.getInstances().size() > 0 + && status.getInstances().get(0).getStatus().numWritten >= 10; + } catch (PulsarAdminException e) { + return false; + } + }, 10, 200); + + SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName); + assertEquals(status.getInstances().size(), 1); + assertTrue(status.getInstances().get(0).getStatus().numWritten >= 10); + } + + consumeMessages(container, outputTopicName, numMessages); + + // delete source + deleteSource(container, sourceName); + + getSourceInfoNotFound(container, sourceName); + + } + + private void submitSourceConnector(String sourceName, + String outputTopicName, + String className, + String archive) throws Exception { + String[] commands = { + PulsarCluster.ADMIN_SCRIPT, + "sources", "create", + "--name", sourceName, + "--destinationTopicName", outputTopicName, + "--archive", archive, + "--classname", className + }; + log.info("Run command : {}", StringUtils.join(commands, ' ')); + ContainerExecResult result = container.execCmd(commands); + assertTrue( + result.getStdout().contains("\"Created successfully\""), + result.getStdout()); + } + + private static void getSourceInfoSuccess(StandaloneContainer container, String sourceName) throws Exception { + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sources", + "get", + "--tenant", "public", + "--namespace", "default", + "--name", sourceName + ); + assertTrue(result.getStdout().contains("\"name\": \"" + sourceName + "\"")); + } + + private static void getSourceStatus(StandaloneContainer container,String sourceName) throws Exception { + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sources", + "status", + "--tenant", "public", + "--namespace", "default", + "--name", sourceName + ); + assertTrue(result.getStdout().contains("\"running\" : true")); + } + + private static void consumeMessages(StandaloneContainer container, String outputTopic, + int numMessages) throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(container.getPlainTextServiceUrl()) + .build(); + + // read using Pulsar GenericRecord abstraction + @Cleanup + Consumer consumer = client.newConsumer(Schema.AUTO_CONSUME()) + .topic(outputTopic) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName("test-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .startMessageIdInclusive() + .subscribe(); + + for (int i = 0; i < numMessages; i++) { + Message msg = consumer.receive(10, TimeUnit.SECONDS); + if (msg == null) { + fail("message "+i+" not received in time"); + return; + } + log.info("received {}", msg.getValue()); + msg.getValue().getFields().forEach( f -> { + log.info("field {} {}", f, msg.getValue().getField(f)); + }); + String text = (String) msg.getValue().getField("text"); + int number = (Integer) msg.getValue().getField("number"); + + assertEquals(text, "value-" + number); + } + + @Cleanup + Consumer typedConsumer = client.newConsumer(Schema.AVRO(MyBean.class)) + .topic(outputTopic) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName("test-sub-typed") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .startMessageIdInclusive() + .subscribe(); + + for (int i = 0; i < numMessages; i++) { + Message msg = typedConsumer.receive(10, TimeUnit.SECONDS); + if (msg == null) { + fail("message "+i+" not received in time"); + return; + } + log.info("received {}", msg.getValue()); + String text = msg.getValue().getText(); + int number = msg.getValue().getNumber(); + assertEquals(text, "value-" + number); + } + + } + + @Data + @ToString + public static class MyBean { + String text; + int number; + } + + private static void deleteSource(StandaloneContainer container, String sourceName) throws Exception { + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sources", + "delete", + "--tenant", "public", + "--namespace", "default", + "--name", sourceName + ); + assertTrue(result.getStdout().contains("Delete source successfully")); + assertTrue(result.getStderr().isEmpty()); + } + + private static void getSourceInfoNotFound(StandaloneContainer container, String sourceName) throws Exception { + try { + container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sources", + "get", + "--tenant", "public", + "--namespace", "default", + "--name", sourceName); + fail("Command should have exited with non-zero"); + } catch (ContainerExecException e) { + assertTrue(e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist")); + } + } + +} \ No newline at end of file diff --git a/tests/integration/src/test/resources/pulsar-function-state.xml b/tests/integration/src/test/resources/pulsar-function.xml similarity index 81% rename from tests/integration/src/test/resources/pulsar-function-state.xml rename to tests/integration/src/test/resources/pulsar-function.xml index 56ff01b8a169d0..026b77fe9dfeb3 100644 --- a/tests/integration/src/test/resources/pulsar-function-state.xml +++ b/tests/integration/src/test/resources/pulsar-function.xml @@ -19,10 +19,11 @@ --> - - + + + diff --git a/tests/integration/src/test/resources/pulsar.xml b/tests/integration/src/test/resources/pulsar.xml index debfed202c7098..ac7de1f315c1e2 100644 --- a/tests/integration/src/test/resources/pulsar.xml +++ b/tests/integration/src/test/resources/pulsar.xml @@ -30,7 +30,7 @@ - +