diff --git a/.github/workflows/ci-integration-backwards-compatibility.yaml b/.github/workflows/ci-integration-backwards-compatibility.yaml index f3f78b3a18ca4..47fc5f6215f61 100644 --- a/.github/workflows/ci-integration-backwards-compatibility.yaml +++ b/.github/workflows/ci-integration-backwards-compatibility.yaml @@ -41,11 +41,20 @@ jobs: - name: checkout uses: actions/checkout@v1 + - name: run install by skip tests + run: mvn clean install -DskipTests + - name: build artifacts and docker image - run: mvn -B install -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR -Pdocker -DskipTests + run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests - - name: clean up + - name: clean docker container run: docker system prune -f + - name: remove docker node image + run: docker rmi -f node:10 && docker rmi -f node:12 && docker rmi -f buildpack-deps:stretch + + - name: remove docker builder and microsoft image + run: docker rmi -f jekyll/builder:latest && docker rmi -f mcr.microsoft.com/azure-pipelines/node8-typescript:latest + - name: run integration tests run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-backwards-compatibility.xml -DintegrationTests -DredirectTestOutputToFile=false diff --git a/.github/workflows/ci-integration-process.yaml b/.github/workflows/ci-integration-process.yaml index f8f2f722b5049..4fda27149fe6a 100644 --- a/.github/workflows/ci-integration-process.yaml +++ b/.github/workflows/ci-integration-process.yaml @@ -41,8 +41,26 @@ jobs: - name: checkout uses: actions/checkout@v1 + - name: run install by skip tests + run: mvn clean install -DskipTests + - name: build artifacts and docker image - run: mvn -B install -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR -Pdocker -DskipTests + run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests + + - name: clean docker container + run: docker system prune -f + + - name: remove docker node image + run: docker rmi -f node:10 && docker rmi -f node:12 && docker rmi -f buildpack-deps:stretch + + - name: remove docker builder and microsoft image + run: docker rmi -f jekyll/builder:latest && docker rmi -f mcr.microsoft.com/azure-pipelines/node8-typescript:latest + + - name: run integration function + run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-process.xml -DintegrationTests -DredirectTestOutputToFile=false -Dgroups=function + + - name: run integration source + run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-process.xml -DintegrationTests -DredirectTestOutputToFile=false -Dgroups=source - - name: run integration tests - run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-process.xml -DintegrationTests -DredirectTestOutputToFile=false + - name: run integraion sink + run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-process.xml -DintegrationTests -DredirectTestOutputToFile=false -Dgroups=sink diff --git a/.github/workflows/ci-integration-thread.yaml b/.github/workflows/ci-integration-thread.yaml index 8c2405f55bb93..282d44a959738 100644 --- a/.github/workflows/ci-integration-thread.yaml +++ b/.github/workflows/ci-integration-thread.yaml @@ -41,8 +41,26 @@ jobs: - name: checkout uses: actions/checkout@v1 + - name: run install by skip tests + run: mvn clean install -DskipTests + - name: build artifacts and docker image - run: mvn -B install -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR -Pdocker -DskipTests + run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests + + - name: clean docker container + run: docker system prune -f + + - name: remove docker node image + run: docker rmi -f node:10 && docker rmi -f node:12 && docker rmi -f buildpack-deps:stretch + + - name: remove docker builder and microsoft image + run: docker rmi -f jekyll/builder:latest && docker rmi -f mcr.microsoft.com/azure-pipelines/node8-typescript:latest + + - name: run integration function + run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-thread.xml -DintegrationTests -DredirectTestOutputToFile=false -Dgroups=function + + - name: run integration source + run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-thread.xml -DintegrationTests -DredirectTestOutputToFile=false -Dgroups=source - - name: run integration tests - run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-thread.xml -DintegrationTests -DredirectTestOutputToFile=false + - name: run integraion sink + run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-thread.xml -DintegrationTests -DredirectTestOutputToFile=false -Dgroups=sink diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index fe03fbb34dae5..e112474f73593 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -81,10 +81,10 @@ @Slf4j public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { - final Duration ONE_MINUTE = Duration.ofMinutes(1); - final Duration TEN_SECONDS = Duration.ofSeconds(10); + final Duration ONE_MINUTE = Duration.ofMinutes(1); + final Duration TEN_SECONDS = Duration.ofSeconds(10); - final RetryPolicy statusRetryPolicy = new RetryPolicy() + final RetryPolicy statusRetryPolicy = new RetryPolicy() .withMaxDuration(ONE_MINUTE) .withDelay(TEN_SECONDS) .onRetry(e -> log.error("Retry ... ")); @@ -93,59 +93,63 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { super(functionRuntimeType); } - @Test + @Test(groups = "sink") public void testKafkaSink() throws Exception { final String kafkaContainerName = "kafka-" + randomName(8); testSink(new KafkaSinkTester(kafkaContainerName), true, new KafkaSourceTester(kafkaContainerName)); } - @Test(enabled = false) + @Test(enabled = false, groups = "sink") public void testCassandraSink() throws Exception { testSink(CassandraSinkTester.createTester(true), true); } - @Test(enabled = false) + @Test(enabled = false, groups = "sink") public void testCassandraArchiveSink() throws Exception { testSink(CassandraSinkTester.createTester(false), false); } - @Test(enabled = false) + @Test(enabled = false, groups = "sink") public void testHdfsSink() throws Exception { testSink(new HdfsSinkTester(), false); } - @Test + @Test(groups = "sink") public void testJdbcSink() throws Exception { testSink(new JdbcSinkTester(), true); } - @Test(enabled = false) + @Test(enabled = false, groups = "sink") public void testElasticSearchSink() throws Exception { testSink(new ElasticSearchSinkTester(), true); } - @Test + @Test(groups = "sink") public void testRabbitMQSink() throws Exception { final String containerName = "rabbitmq-" + randomName(8); testSink(new RabbitMQSinkTester(containerName), true, new RabbitMQSourceTester(containerName)); } - @Test + @Test(groups = "source") public void testDebeziumMySqlSource() throws Exception { testDebeziumMySqlConnect(); } - @Test + @Test(groups = "source") public void testDebeziumPostgreSqlSource() throws Exception { testDebeziumPostgreSqlConnect(); } - @Test + @Test(groups = "source") public void testDebeziumMongoDbSource() throws Exception{ testDebeziumMongoDbConnect(); } private void testSink(SinkTester tester, boolean builtin) throws Exception { + if (pulsarCluster == null) { + super.setupCluster(); + super.setupFunctionWorkers(); + } tester.startServiceContainer(pulsarCluster); try { runSinkTester(tester, builtin); @@ -159,6 +163,10 @@ private void testSink(SinkTester sourceTester) throws Exception { + if (pulsarCluster == null) { + super.setupCluster(); + super.setupFunctionWorkers(); + } ServiceContainerT serviceContainer = sinkTester.startServiceContainer(pulsarCluster); try { runSinkTester(sinkTester, builtinSink); @@ -830,12 +838,12 @@ protected void getSourceInfoNotFound(String tenant, String namespace, String sou } } - @Test + @Test(groups = "function") public void testPythonFunctionLocalRun() throws Exception { testFunctionLocalRun(Runtime.PYTHON); } - @Test + @Test(groups = "function") public void testJavaFunctionLocalRun() throws Exception { testFunctionLocalRun(Runtime.JAVA); } @@ -845,6 +853,11 @@ public void testFunctionLocalRun(Runtime runtime) throws Exception { return; } + if (pulsarCluster == null) { + super.setupCluster(); + super.setupFunctionWorkers(); + } + String inputTopicName = "persistent://public/default/test-function-local-run-" + runtime + "-input-" + randomName(8); String outputTopicName = "test-function-local-run-" + runtime + "-output-" + randomName(8); @@ -961,6 +974,10 @@ public void testWindowFunction(String type, String[] expectedResults) throws Exc String inputTopicName = "test-" + type + "-count-window-" + functionRuntimeType + "-input-" + randomName(8); String outputTopicName = "test-" + type + "-count-window-" + functionRuntimeType + "-output-" + randomName(8); + if (pulsarCluster == null) { + super.setupCluster(); + super.setupFunctionWorkers(); + } try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) { admin.topics().createNonPartitionedTopic(inputTopicName); admin.topics().createNonPartitionedTopic(outputTopicName); @@ -1053,7 +1070,7 @@ public void testWindowFunction(String type, String[] expectedResults) throws Exc getFunctionInfoNotFound(functionName); } - @Test + @Test(groups="function") public void testSlidingCountWindowTest() throws Exception { String[] EXPECTED_RESULTS = { "0,1,2,3,4", @@ -1081,7 +1098,7 @@ public void testSlidingCountWindowTest() throws Exception { testWindowFunction("sliding", EXPECTED_RESULTS); } - @Test + @Test(groups = "function") public void testTumblingCountWindowTest() throws Exception { String[] EXPECTED_RESULTS = { "0,1,2,3,4,5,6,7,8,9", @@ -1103,12 +1120,12 @@ public void testTumblingCountWindowTest() throws Exception { // Test CRUD functions on different runtimes. // - @Test + @Test(groups = "function") public void testPythonFunctionNegAck() throws Exception { testFunctionNegAck(Runtime.PYTHON); } - @Test + @Test(groups = "function") public void testJavaFunctionNegAck() throws Exception { testFunctionNegAck(Runtime.JAVA); } @@ -1118,6 +1135,11 @@ private void testFunctionNegAck(Runtime runtime) throws Exception { return; } + if (pulsarCluster == null) { + super.setupCluster(); + super.setupFunctionWorkers(); + } + Schema schema; if (Runtime.JAVA == runtime) { schema = Schema.STRING; @@ -1284,12 +1306,12 @@ private void testFunctionNegAck(Runtime runtime) throws Exception { checkSubscriptionsCleanup(inputTopicName); } - @Test + @Test(groups = "function") public void testPythonPublishFunction() throws Exception { testPublishFunction(Runtime.PYTHON); } - @Test + @Test(groups = "function") public void testJavaPublishFunction() throws Exception { testPublishFunction(Runtime.JAVA); } @@ -1306,6 +1328,11 @@ private void testPublishFunction(Runtime runtime) throws Exception { schema = Schema.BYTES; } + if (pulsarCluster == null) { + super.setupCluster(); + super.setupFunctionWorkers(); + } + String inputTopicName = "persistent://public/default/test-publish-" + runtime + "-input-" + randomName(8); String outputTopicName = "test-publish-" + runtime + "-output-" + randomName(8); try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) { @@ -1405,7 +1432,7 @@ private void testPublishFunction(Runtime runtime) throws Exception { checkSubscriptionsCleanup(inputTopicName); } - @Test + @Test(groups = "function") public void testSerdeFunction() throws Exception { testCustomSerdeFunction(); } @@ -1415,6 +1442,11 @@ private void testCustomSerdeFunction() throws Exception { return; } + if (pulsarCluster == null) { + super.setupCluster(); + super.setupFunctionWorkers(); + } + String inputTopicName = "persistent://public/default/test-serde-java-input-" + randomName(8); String outputTopicName = "test-publish-serde-output-" + randomName(8); try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) { @@ -1447,32 +1479,32 @@ private void testCustomSerdeFunction() throws Exception { assertEquals(functionStatus.getInstances().get(0).getStatus().isRunning(), true); } - @Test + @Test(groups = "function") public void testPythonExclamationFunction() throws Exception { testExclamationFunction(Runtime.PYTHON, false, false, false); } - @Test + @Test(groups = "function") public void testPythonExclamationFunctionWithExtraDeps() throws Exception { testExclamationFunction(Runtime.PYTHON, false, false, true); } - @Test + @Test(groups = "function") public void testPythonExclamationZipFunction() throws Exception { testExclamationFunction(Runtime.PYTHON, false, true, false); } - @Test + @Test(groups = "function") public void testPythonExclamationTopicPatternFunction() throws Exception { testExclamationFunction(Runtime.PYTHON, true, false, false); } - @Test + @Test(groups = "function") public void testJavaExclamationFunction() throws Exception { testExclamationFunction(Runtime.JAVA, false, false, false); } - @Test + @Test(groups = "function") public void testJavaExclamationTopicPatternFunction() throws Exception { testExclamationFunction(Runtime.JAVA, true, false, false); } @@ -1486,6 +1518,11 @@ private void testExclamationFunction(Runtime runtime, return; } + if (pulsarCluster == null) { + super.setupCluster(); + super.setupFunctionWorkers(); + } + Schema schema; if (Runtime.JAVA == runtime) { schema = Schema.STRING; @@ -1839,7 +1876,7 @@ private static void checkSubscriptionsCleanup(String topic) throws Exception { PulsarCluster.ADMIN_SCRIPT, "topics", "stats", - topic); + topic); TopicStats topicStats = new Gson().fromJson(result.getStdout(), TopicStats.class); assertEquals(topicStats.subscriptions.size(), 0); @@ -1994,13 +2031,18 @@ private static void deleteFunction(String functionName) throws Exception { assertTrue(result.getStderr().isEmpty()); } - @Test + @Test(groups = "function") public void testAutoSchemaFunction() throws Exception { String inputTopicName = "test-autoschema-input-" + randomName(8); String outputTopicName = "test-autoshcema-output-" + randomName(8); String functionName = "test-autoschema-fn-" + randomName(8); final int numMessages = 10; + if (pulsarCluster == null) { + super.setupCluster(); + super.setupFunctionWorkers(); + } + // submit the exclamation function submitFunction( Runtime.JAVA, @@ -2072,6 +2114,11 @@ private void testDebeziumMySqlConnect() // This is the binlog count that contained in mysql container. final int numMessages = 47; + if (pulsarCluster == null) { + super.setupCluster(); + super.setupFunctionWorkers(); + } + @Cleanup PulsarClient client = PulsarClient.builder() .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) @@ -2162,6 +2209,11 @@ private void testDebeziumPostgreSqlConnect() throws Exception { // This is the binlog count that contained in postgresql container. final int numMessages = 26; + if (pulsarCluster == null) { + super.setupCluster(); + super.setupFunctionWorkers(); + } + @Cleanup PulsarClient client = PulsarClient.builder() .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) @@ -2252,6 +2304,11 @@ private void testDebeziumMongoDbConnect() throws Exception { // This is the binlog count that contained in mongodb container. final int numMessages = 17; + if (pulsarCluster == null) { + super.setupCluster(); + super.setupFunctionWorkers(); + } + @Cleanup PulsarClient client = PulsarClient.builder() .serviceUrl(pulsarCluster.getPlainTextServiceUrl())