[PIP-50] Package management implementation (part-1)#5613
[PIP-50] Package management implementation (part-1)#5613zymap wants to merge 5 commits intoapache:masterfrom
Conversation
--- *Motivation* Implementation of package management. https://github.com/apache/pulsar/wiki/PIP-50%3A-Package-Management *Modifications* - Implement package management - Abstraction storage
|
@sijie @jiazhai @wolfstudy PTAL |
|
run java8 tests |
pulsar-package-management/src/main/java/pkg/management/PackageStorageProvider.java
Outdated
Show resolved
Hide resolved
pulsar-package-management/src/main/java/pkg/management/PkgMetadata.java
Outdated
Show resolved
Hide resolved
pulsar-package-management/src/main/java/pkg/management/exception/PackageNotFoundException.java
Outdated
Show resolved
Hide resolved
pulsar-package-management/src/main/java/pkg/management/storage/bk/BKStorage.java
Outdated
Show resolved
Hide resolved
pulsar-package-management/src/main/java/pkg/management/storage/bk/BKStorage.java
Outdated
Show resolved
Hide resolved
pulsar-package-management/src/main/java/pkg/management/storage/bk/BKStorage.java
Outdated
Show resolved
Hide resolved
pulsar-package-management/src/main/java/pkg/management/storage/bk/BKStorage.java
Outdated
Show resolved
Hide resolved
pulsar-package-management/src/main/java/pkg/management/storage/bk/BKStorage.java
Outdated
Show resolved
Hide resolved
ac36499 to
d8c6ce8
Compare
d8c6ce8 to
2dca4a2
Compare
abb5121 to
30232be
Compare
|
@sijie PTAL. Thanks. |
sijie
left a comment
There was a problem hiding this comment.
Please create an issue to track adding documentation about this new package management system.
pulsar-package-manager/src/main/java/org/apache/pulsar/packages/manager/PackageStorage.java
Outdated
Show resolved
Hide resolved
...package-manager/src/main/java/org/apache/pulsar/packages/manager/PackageStorageProvider.java
Outdated
Show resolved
Hide resolved
| /** | ||
| * Thrown package metadata already existAsync exceptions. | ||
| */ | ||
| public class PackageMetaAlreadyExistsException extends PackageException { |
There was a problem hiding this comment.
why do we have two AlreadyExistsException? Does it really happen that the package exists but the metadata doesn't exist?
There was a problem hiding this comment.
Because the metadata and package are two files, delete a package that needs to delete metadata as well. Delete package and metadata are two operations and I can not promise after deleting package the metadata will delete successfully.
If one of the delete operations was failed and the user ignores the error and uploads the same package again, the metadata or the package may be already existent.
Do you have some advice?
| /** | ||
| * Thrown package metadata not found exceptions. | ||
| */ | ||
| public class PackageMetaNotFoundException extends PackageException { |
There was a problem hiding this comment.
Same comments similar to AlreadyExistsException.
pulsar-package-manager/src/main/java/org/apache/pulsar/packages/manager/impl/PackageImpl.java
Outdated
Show resolved
Hide resolved
| return packageStorage.existAsync(metadataPath) | ||
| .thenCompose(metaExists -> metaExists | ||
| ? packageStorage.deleteAsync(metadataPath) | ||
| : FutureUtil.failedFuture(new PackageMetaNotFoundException("Package metadata does not exists"))) |
There was a problem hiding this comment.
data can still exists when the metadata file doesn't exist. e.g. a delete operation failed in between deleting metadat and data.
pulsar-package-manager/src/main/java/org/apache/pulsar/packages/manager/impl/PackageImpl.java
Outdated
Show resolved
Hide resolved
| .thenApply(names -> names.stream().map(PackageName::get).collect(Collectors.toList())); | ||
| } | ||
|
|
||
| private String getMetadataStoragePath(PackageName packageName) { |
pulsar-package-manager/src/main/java/org/apache/pulsar/packages/manager/impl/PackageImpl.java
Outdated
Show resolved
Hide resolved
pulsar-package-manager/src/main/java/org/apache/pulsar/packages/manager/impl/PackageImpl.java
Outdated
Show resolved
Hide resolved
| return get(pkgName); | ||
| } | ||
|
|
||
| public static PackageName get(String type, String name, String version) { |
There was a problem hiding this comment.
don't you need to ensure the name is a validate "topic" name?
| try { | ||
| return cache.get(packageName); | ||
| } catch (ExecutionException e) { | ||
| throw (RuntimeException) e.getCause(); |
There was a problem hiding this comment.
Are you sure it is a safe cast to RuntimeException?
| this.namespaceName = NamespaceName.get(parts.get(0), parts.get(1)); | ||
|
|
||
| rest = parts.get(2); | ||
| parts = Splitter.on("@").splitToList(rest); |
There was a problem hiding this comment.
don't you need to check the length first? e.g. if people don't specify the version "@".
Also if people don't specify the version, should we make the default version to latest?
| return e; | ||
| } | ||
| } | ||
| throw new IllegalArgumentException("Invalid topic domain: '" + value + "'"); |
| CompletableFuture<Boolean> future = new CompletableFuture<>(); | ||
| CompletableFuture.runAsync(() -> { | ||
| try { | ||
| future.complete(namespace.logExists(path)); |
There was a problem hiding this comment.
namespace.getNamespaceDriver().getLogMetadataStore().getLogLocation() would return a completable future.
There was a problem hiding this comment.
getLogLocation() seems not the log path. It's a URI. I find the namespace.getNamespaceDriver().getLogStreamMetadataStore(NamespaceDriver.Role.WRITER).logExists(uriOptional.get(), path) can reture a completable future.
| List<CompletableFuture<DLOutputStream>> writeFuture = new ArrayList<>(); | ||
| byte[] bytes = new byte[1024]; | ||
| try { | ||
| while (inputStream.read(bytes) != -1) { |
There was a problem hiding this comment.
inputStream.read is a synchronous operation. you have to call this in a separate thread to not blocking the thread calling writeAsync.
| import org.testng.annotations.BeforeMethod; | ||
| import org.testng.annotations.Test; | ||
|
|
||
| public class PackagesImplTest { |
There was a problem hiding this comment.
The test needs to cover different test cases.
| import org.testng.annotations.BeforeMethod; | ||
| import org.testng.annotations.Test; | ||
|
|
||
| public class BKStorageTest { |
There was a problem hiding this comment.
Rename this to BKStorageMockTest and add one test for interacting with a real BK cluster.
| /** | ||
| * Unit test of {@link DLInputStream}. | ||
| */ | ||
| public class DLInputStreamTest { |
There was a problem hiding this comment.
Please all add test cases for error cases.
| /** | ||
| * Unit test of {@link DLOutputStream}. | ||
| */ | ||
| public class DLOutputStreamTest { |
There was a problem hiding this comment.
Please all add test cases for error cases.
063b0a7 to
c656f2a
Compare
|
@sijie I can not retrigger the test. So please take a look when you have time. |
Motivation
Implementation of package management.
https://github.com/apache/pulsar/wiki/PIP-50%3A-Package-Management
Modifications