From cca9ba71ea595b79c16842180029e8475bbcd096 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 8 May 2017 12:14:25 +0900 Subject: [PATCH] STORM-2496 Dependency artifacts should be uploaded to blobstore with READ permission for all * When uploading dependencies, set ACL properly so that it can be shared to other users as well * but allows only READ so that it can't be deleted from others --- .../storm/dependency/DependencyUploader.java | 13 ++++++++++--- .../storm/dependency/DependencyUploaderTest.java | 15 ++++++++++++++- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/storm-core/src/jvm/org/apache/storm/dependency/DependencyUploader.java b/storm-core/src/jvm/org/apache/storm/dependency/DependencyUploader.java index 636f45440a0..16a3f61cd5d 100644 --- a/storm-core/src/jvm/org/apache/storm/dependency/DependencyUploader.java +++ b/storm-core/src/jvm/org/apache/storm/dependency/DependencyUploader.java @@ -20,8 +20,10 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.storm.blobstore.AtomicOutputStream; import org.apache.storm.blobstore.BlobStoreUtils; +import org.apache.storm.blobstore.BlobStoreAclHandler; import org.apache.storm.blobstore.ClientBlobStore; import org.apache.storm.generated.AccessControl; +import org.apache.storm.generated.AccessControlType; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.KeyAlreadyExistsException; import org.apache.storm.generated.KeyNotFoundException; @@ -146,9 +148,14 @@ private boolean uploadDependencyToBlobStore(String key, File dependency) // as a workaround, we call getBlobMeta() for all keys getBlobStore().getBlobMeta(key); } catch (KeyNotFoundException e) { - // TODO: do we want to add ACL here? - AtomicOutputStream blob = getBlobStore() - .createBlob(key, new SettableBlobMeta(new ArrayList())); + // set acl to below so that it can be shared by other users as well, but allows only read + List acls = new ArrayList<>(); + acls.add(new AccessControl(AccessControlType.USER, + BlobStoreAclHandler.READ | BlobStoreAclHandler.WRITE | BlobStoreAclHandler.ADMIN)); + acls.add(new AccessControl(AccessControlType.OTHER, + BlobStoreAclHandler.READ)); + + AtomicOutputStream blob = getBlobStore().createBlob(key, new SettableBlobMeta(acls)); Files.copy(dependency.toPath(), blob); blob.close(); diff --git a/storm-core/test/jvm/org/apache/storm/dependency/DependencyUploaderTest.java b/storm-core/test/jvm/org/apache/storm/dependency/DependencyUploaderTest.java index 526e758690c..e5d9fafcec6 100644 --- a/storm-core/test/jvm/org/apache/storm/dependency/DependencyUploaderTest.java +++ b/storm-core/test/jvm/org/apache/storm/dependency/DependencyUploaderTest.java @@ -20,14 +20,17 @@ import com.google.common.collect.Lists; import com.google.common.io.Files; import org.apache.storm.blobstore.AtomicOutputStream; -import org.apache.storm.blobstore.BlobStoreUtils; +import org.apache.storm.blobstore.BlobStoreAclHandler; import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.generated.AccessControl; +import org.apache.storm.generated.AccessControlType; import org.apache.storm.generated.KeyNotFoundException; import org.apache.storm.generated.ReadableBlobMeta; import org.apache.storm.generated.SettableBlobMeta; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -171,6 +174,16 @@ public Object answer(InvocationOnMock invocation) throws Throwable { assertTrue(counter.get() > 0); verify(mockOutputStream).close(); + + ArgumentCaptor blobMetaArgumentCaptor = ArgumentCaptor.forClass(SettableBlobMeta.class); + verify(mockBlobStore).createBlob(anyString(), blobMetaArgumentCaptor.capture()); + + SettableBlobMeta actualBlobMeta = blobMetaArgumentCaptor.getValue(); + List actualAcls = actualBlobMeta.get_acl(); + assertTrue(actualAcls.contains(new AccessControl(AccessControlType.USER, + BlobStoreAclHandler.READ | BlobStoreAclHandler.WRITE | BlobStoreAclHandler.ADMIN))); + assertTrue(actualAcls.contains(new AccessControl(AccessControlType.OTHER, + BlobStoreAclHandler.READ))); } @Test(expected = FileNotAvailableException.class)