diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 8b28810a318f7..8d9c564160ad4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1350,7 +1350,7 @@ private void startPackagesManagementService() throws IOException { .newProvider(config.getPackagesManagementStorageProvider()); DefaultPackagesStorageConfiguration storageConfiguration = new DefaultPackagesStorageConfiguration(); storageConfiguration.setProperty(config.getProperties()); - PackagesStorage storage = storageProvider.getStorage(new DefaultPackagesStorageConfiguration()); + PackagesStorage storage = storageProvider.getStorage(storageConfiguration); storage.initialize(); packagesManagement.initialize(storage); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPackages.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPackages.java index bb9e6669f184d..4c863e97a522e 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPackages.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPackages.java @@ -36,7 +36,7 @@ class CmdPackages extends CmdBase { private final Packages packages; - CmdPackages(PulsarAdmin admin) { + public CmdPackages(PulsarAdmin admin) { super("packages", admin); this.packages = admin.packages(); @@ -133,7 +133,7 @@ private class ListPackageVersionsCmd extends CliCommand { @Override void run() throws Exception { - print(packages.listPackageVersions(packageName).toString()); + print(packages.listPackageVersions(packageName)); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java index 65281ccc3675a..514b1b822d66e 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java @@ -142,6 +142,8 @@ public class PulsarAdminTool { // TODO eventually remove this commandMap.put("source", CmdSources.class); commandMap.put("sink", CmdSinks.class); + + commandMap.put("packages", CmdPackages.class); } private void setupCommands(Function adminFactory) { diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java index f0409b4d0aff5..0f8a2ee9dd605 100644 --- a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java +++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java @@ -60,9 +60,9 @@ public void initialize() { DistributedLogConfiguration conf = new DistributedLogConfiguration() .setImmediateFlushEnabled(true) .setOutputBufferSize(0) - .setWriteQuorumSize(configuration.getNumReplicas()) - .setEnsembleSize(configuration.getNumReplicas()) - .setAckQuorumSize(configuration.getNumReplicas()) + .setWriteQuorumSize(configuration.getPackagesReplicas()) + .setEnsembleSize(configuration.getPackagesReplicas()) + .setAckQuorumSize(configuration.getPackagesReplicas()) .setLockTimeout(DistributedLogConstants.LOCK_IMMEDIATE); if (!Strings.isNullOrEmpty(configuration.getBookkeeperClientAuthenticationPlugin())) { conf.setProperty("bkc.clientAuthProviderFactoryClass", @@ -82,9 +82,11 @@ public void initialize() { } private URI initializeDlogNamespace() throws IOException { - BKDLConfig bkdlConfig = new BKDLConfig(configuration.getZkServers(), configuration.getLedgersRootPath()); + BKDLConfig bkdlConfig = new BKDLConfig(configuration.getZookeeperServers(), + configuration.getPackagesManagementLedgerRootPath()); DLMetadata dlMetadata = DLMetadata.create(bkdlConfig); - URI dlogURI = URI.create(String.format("distributedlog://%s/pulsar/packages", configuration.getZkServers())); + URI dlogURI = URI.create(String.format("distributedlog://%s/pulsar/packages", + configuration.getZookeeperServers())); try { dlMetadata.create(dlogURI); } catch (ZKException e) { diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java index 10b372149b956..226b80abeaa30 100644 --- a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java +++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java @@ -34,16 +34,16 @@ public class BookKeeperPackagesStorageConfiguration implements PackagesStorageCo this.configuration = configuration; } - int getNumReplicas() { - return Integer.parseInt(getProperty("numReplicas")); + int getPackagesReplicas() { + return Integer.parseInt(getProperty("packagesReplicas")); } - String getZkServers() { - return getProperty("zkServers"); + String getZookeeperServers() { + return getProperty("zookeeperServers"); } - String getLedgersRootPath() { - return getProperty("ledgerRootPath"); + String getPackagesManagementLedgerRootPath() { + return getProperty("packagesManagementLedgerRootPath"); } String getBookkeeperClientAuthenticationPlugin() { diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java index 5e0954aadc9d5..859c485c3e081 100644 --- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java +++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java @@ -53,9 +53,9 @@ public void setup() throws Exception { PackagesStorageProvider provider = PackagesStorageProvider .newProvider(BookKeeperPackagesStorageProvider.class.getName()); DefaultPackagesStorageConfiguration configuration = new DefaultPackagesStorageConfiguration(); - configuration.setProperty("zkServers", zkUtil.getZooKeeperConnectString()); - configuration.setProperty("numReplicas", "1"); - configuration.setProperty("ledgerRootPath", "/ledgers"); + configuration.setProperty("zookeeperServers", zkUtil.getZooKeeperConnectString()); + configuration.setProperty("packagesReplicas", "1"); + configuration.setProperty("packagesManagementLedgerRootPath", "/ledgers"); storage = provider.getStorage(configuration); storage.initialize(); } @@ -71,9 +71,9 @@ public void teardown() throws Exception { public void testConfiguration() { assertTrue(storage instanceof BookKeeperPackagesStorage); BookKeeperPackagesStorage bkStorage = (BookKeeperPackagesStorage) storage; - assertEquals(bkStorage.configuration.getZkServers(), zkUtil.getZooKeeperConnectString()); - assertEquals(bkStorage.configuration.getNumReplicas(), 1); - assertEquals(bkStorage.configuration.getLedgersRootPath(), "/ledgers"); + assertEquals(bkStorage.configuration.getZookeeperServers(), zkUtil.getZooKeeperConnectString()); + assertEquals(bkStorage.configuration.getPackagesReplicas(), 1); + assertEquals(bkStorage.configuration.getPackagesManagementLedgerRootPath(), "/ledgers"); } @Test(timeOut = 60000) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java new file mode 100644 index 0000000000000..3792a12b19e67 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.cli; + +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.testcontainers.shaded.org.apache.commons.lang.RandomStringUtils; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Map; + +public class PackagesCliTest { + + private final static String clusterNamePrefix = "packages-service"; + private PulsarCluster pulsarCluster; + + @BeforeClass + public void setup() throws Exception { + PulsarClusterSpec spec = PulsarClusterSpec.builder() + .clusterName(String.format("%s-%s", clusterNamePrefix, RandomStringUtils.randomAlphabetic(6))) + .brokerEnvs(getPackagesManagementServiceEnvs()) + .build(); + pulsarCluster = PulsarCluster.forSpec(spec); + pulsarCluster.start(); + } + + @AfterClass + public void teardown() { + if (pulsarCluster != null) { + pulsarCluster.stop(); + pulsarCluster = null; + } + } + + private Map getPackagesManagementServiceEnvs() { + Map envs = new HashMap<>(); + envs.put("enablePackagesManagement", "true"); + return envs; + } + + @Test(timeOut = 60000 * 10) + public void testPackagesOperationsWithoutUploadingPackages() throws Exception { + ContainerExecResult result = runPackagesCommand("list", "--type", "function", "public/default"); + assertEquals(result.getExitCode(), 0); + assertTrue(result.getStdout().isEmpty()); + + result = runPackagesCommand("list-versions", "function://public/default/test"); + assertEquals(result.getExitCode(), 0); + assertTrue(result.getStdout().isEmpty()); + + try { + result = runPackagesCommand("download", "function://public/default/test@v1", "--path", "test-admin"); + fail("this command should be failed"); + } catch (Exception e) { + // expected exception + } + } + + // TODO: the upload command has some problem when uploading packages, enable this tests when issue + // https://github.com/apache/pulsar/issues/8874 is fixed. + public void testPackagesOperationsWithUploadingPackages() throws Exception { + String testPackageName = "function://public/default/test@v1"; + ContainerExecResult result = runPackagesCommand("upload", "--description", "a test package", + "--path", PulsarCluster.ADMIN_SCRIPT, testPackageName); + assertEquals(result.getExitCode(), 0); + + result = runPackagesCommand("get-metadata", testPackageName); + assertEquals(result.getExitCode(), 0); + assertFalse(result.getStdout().isEmpty()); + assertTrue(result.getStdout().contains("a test package")); + + result = runPackagesCommand("list", "--type", "function", "public/default"); + assertEquals(result.getExitCode(), 0); + assertFalse(result.getStdout().isEmpty()); + assertTrue(result.getStdout().contains(testPackageName)); + + result = runPackagesCommand("list-versions", "--type", "function://public/default/test"); + assertEquals(result.getExitCode(), 0); + assertFalse(result.getStdout().isEmpty()); + assertTrue(result.getStdout().contains("v1")); + + String contact = "test@apache.org"; + result = runPackagesCommand("update-metadata", "--contact", contact, "-PpropertyA=A", testPackageName); + assertEquals(result.getExitCode(), 0); + + result = runPackagesCommand("get-metadata", testPackageName); + assertEquals(result.getExitCode(), 0); + assertFalse(result.getStdout().isEmpty()); + assertTrue(result.getStdout().contains("a test package")); + assertTrue(result.getStdout().contains(contact)); + assertTrue(result.getStdout().contains("propertyA")); + + result = runPackagesCommand("delete", testPackageName); + assertEquals(result.getExitCode(), 0); + + result = runPackagesCommand("list", "--type", "functions", "public/default"); + assertEquals(result.getExitCode(), 0); + assertTrue(result.getStdout().isEmpty()); + } + + private ContainerExecResult runPackagesCommand(String... commands) throws Exception { + String[] cmds = new String[commands.length + 1]; + cmds[0] = "packages"; + System.arraycopy(commands, 0, cmds, 1, commands.length); + return pulsarCluster.runAdminCommandOnAnyBroker(cmds); + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index c75bd59e5d6b7..8d835dd59002e 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -171,6 +171,9 @@ private PulsarCluster(PulsarClusterSpec spec) { brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10"); brokerContainer.withEnv("bookkeeperUseV2WireProtocol", "false"); } + if (spec.brokerEnvs != null) { + brokerContainer.withEnv(spec.brokerEnvs); + } return brokerContainer; } )); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java index 8d48fb26c396b..b0c49bdf29042 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java @@ -137,4 +137,9 @@ public class PulsarClusterSpec { * Specify envs for proxy. */ Map proxyEnvs; + + /** + * Specify envs for broker. + */ + Map brokerEnvs; } diff --git a/tests/integration/src/test/resources/pulsar-cli.xml b/tests/integration/src/test/resources/pulsar-cli.xml index 1be3a1d4c4094..d6a9a15e1e24d 100644 --- a/tests/integration/src/test/resources/pulsar-cli.xml +++ b/tests/integration/src/test/resources/pulsar-cli.xml @@ -28,6 +28,7 @@ +