Skip to content

Commit

Permalink
Add Ability to List Child Containers to BlobContainer (#42653) (#43903)
Browse files Browse the repository at this point in the history
* Add Ability to List Child Containers to BlobContainer (#42653)

* Add Ability to List Child Containers to BlobContainer
* This is a prerequisite of #42189
  • Loading branch information
original-brownbear committed Jul 3, 2019
1 parent 9077c44 commit 455b12a
Show file tree
Hide file tree
Showing 23 changed files with 411 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.common.blobstore.url;

import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
Expand Down Expand Up @@ -74,6 +75,11 @@ public Map<String, BlobMetaData> listBlobs() throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}

@Override
public Map<String, BlobContainer> children() throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}

/**
* This operation is not supported by URLBlobContainer
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ protected BlobStore getBlobStore() {
}

@Override
protected BlobPath basePath() {
public BlobPath basePath() {
return basePath;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
Expand Down Expand Up @@ -169,6 +170,16 @@ public Map<String, BlobMetaData> listBlobs() throws IOException {
return listBlobsByPrefix(null);
}

@Override
public Map<String, BlobContainer> children() throws IOException {
final BlobPath path = path();
try {
return blobStore.children(path);
} catch (URISyntaxException | StorageException e) {
throw new IOException("Failed to list children in path [" + path.buildAsString() + "].", e);
}
}

protected String buildKey(String blobName) {
return keyPath + (blobName == null ? "" : blobName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
import java.io.InputStream;
import java.net.URISyntaxException;
import java.nio.file.FileAlreadyExistsException;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;

Expand Down Expand Up @@ -97,6 +100,11 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String keyPath, String prefix
return service.listBlobsByPrefix(clientName, container, keyPath, prefix);
}

public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException {
return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect(
Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool))));
}

public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws URISyntaxException, StorageException, FileAlreadyExistsException {
service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ protected AzureBlobStore createBlobStore() {
}

@Override
protected BlobPath basePath() {
public BlobPath basePath() {
return basePath;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import com.microsoft.azure.storage.blob.BlobInputStream;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
import com.microsoft.azure.storage.blob.ListBlobItem;
Expand All @@ -39,6 +41,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
Expand All @@ -54,8 +57,11 @@
import java.net.URISyntaxException;
import java.nio.file.FileAlreadyExistsException;
import java.security.InvalidKeyException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -209,15 +215,40 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String account, String contai
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
final String blobPath = uri.getPath().substring(1 + container.length() + 1);
final BlobProperties properties = ((CloudBlockBlob) blobItem).getProperties();
final String name = blobPath.substring(keyPath.length());
logger.trace(() -> new ParameterizedMessage("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength()));
blobsBuilder.put(name, new PlainBlobMetaData(name, properties.getLength()));
if (blobItem instanceof CloudBlob) {
final BlobProperties properties = ((CloudBlob) blobItem).getProperties();
final String name = blobPath.substring(keyPath.length());
logger.trace(() -> new ParameterizedMessage("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength()));
blobsBuilder.put(name, new PlainBlobMetaData(name, properties.getLength()));
}
}
});
return blobsBuilder.immutableMap();
}

public Set<String> children(String account, String container, BlobPath path) throws URISyntaxException, StorageException {
final Set<String> blobsBuilder = new HashSet<>();
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
final String keyPath = path.buildAsString();
final EnumSet<BlobListingDetails> enumBlobListingDetails = EnumSet.of(BlobListingDetails.METADATA);

SocketAccess.doPrivilegedVoidException(() -> {
for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath, false, enumBlobListingDetails, null, client.v2().get())) {
if (blobItem instanceof CloudBlobDirectory) {
final URI uri = blobItem.getUri();
logger.trace(() -> new ParameterizedMessage("blob url [{}]", uri));
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
// Lastly, we add the length of keyPath to the offset to strip this container's path.
final String uriPath = uri.getPath();
blobsBuilder.add(uriPath.substring(1 + container.length() + 1 + keyPath.length(), uriPath.length() - 1));
}
}
});
return Collections.unmodifiableSet(blobsBuilder);
}

public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
boolean failIfAlreadyExists)
throws URISyntaxException, StorageException, FileAlreadyExistsException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.repositories.gcs;

import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStoreException;
Expand Down Expand Up @@ -55,6 +56,11 @@ public Map<String, BlobMetaData> listBlobs() throws IOException {
return blobStore.listBlobs(path);
}

@Override
public Map<String, BlobContainer> children() throws IOException {
return blobStore.listChildren(path());
}

@Override
public Map<String, BlobMetaData> listBlobsByPrefix(String prefix) throws IOException {
return blobStore.listBlobsByPrefix(path, prefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,23 @@ Map<String, BlobMetaData> listBlobsByPrefix(String path, String prefix) throws I
return mapBuilder.immutableMap();
}

Map<String, BlobContainer> listChildren(BlobPath path) throws IOException {
final String pathStr = path.buildAsString();
final MapBuilder<String, BlobContainer> mapBuilder = MapBuilder.newMapBuilder();
SocketAccess.doPrivilegedVoidIOException
(() -> client().get(bucketName).list(BlobListOption.currentDirectory(), BlobListOption.prefix(pathStr)).iterateAll().forEach(
blob -> {
if (blob.isDirectory()) {
assert blob.getName().startsWith(pathStr);
final String suffixName = blob.getName().substring(pathStr.length());
if (suffixName.isEmpty() == false) {
mapBuilder.put(suffixName, new GoogleCloudStorageBlobContainer(path.add(suffixName), this));
}
}
}));
return mapBuilder.immutableMap();
}

/**
* Returns true if the blob exists in the specific bucket
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected GoogleCloudStorageBlobStore createBlobStore() {
}

@Override
protected BlobPath basePath() {
public BlobPath basePath() {
return basePath;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.Path;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
Expand Down Expand Up @@ -137,11 +138,13 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS

@Override
public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable final String prefix) throws IOException {
FileStatus[] files = store.execute(fileContext -> (fileContext.util().listStatus(path,
path -> prefix == null || path.getName().startsWith(prefix))));
Map<String, BlobMetaData> map = new LinkedHashMap<String, BlobMetaData>();
FileStatus[] files = store.execute(fileContext -> fileContext.util().listStatus(path,
path -> prefix == null || path.getName().startsWith(prefix)));
Map<String, BlobMetaData> map = new LinkedHashMap<>();
for (FileStatus file : files) {
map.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen()));
if (file.isFile()) {
map.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen()));
}
}
return Collections.unmodifiableMap(map);
}
Expand All @@ -151,6 +154,19 @@ public Map<String, BlobMetaData> listBlobs() throws IOException {
return listBlobsByPrefix(null);
}

@Override
public Map<String, BlobContainer> children() throws IOException {
FileStatus[] files = store.execute(fileContext -> fileContext.util().listStatus(path));
Map<String, BlobContainer> map = new LinkedHashMap<>();
for (FileStatus file : files) {
if (file.isDirectory()) {
final String name = file.getPath().getName();
map.put(name, new HdfsBlobContainer(path().add(name), store, new Path(path, name), bufferSize, securityContext));
}
}
return Collections.unmodifiableMap(map);
}

/**
* Exists to wrap underlying InputStream methods that might make socket connections in
* doPrivileged blocks. This is due to the way that hdfs client libraries might open
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ protected HdfsBlobStore createBlobStore() {
}

@Override
protected BlobPath basePath() {
public BlobPath basePath() {
return basePath;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.hdfs;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.bootstrap.JavaVersion;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.SecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;

import java.util.Collection;

import static org.hamcrest.Matchers.equalTo;

@ThreadLeakFilters(filters = HdfsClientThreadLeakFilter.class)
public class HdfsRepositoryTests extends AbstractThirdPartyRepositoryTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(HdfsPlugin.class);
}

@Override
protected SecureSettings credentials() {
return new MockSecureSettings();
}

@Override
protected void createRepository(String repoName) {
assumeFalse("https://github.com/elastic/elasticsearch/issues/31498", JavaVersion.current().equals(JavaVersion.parse("11")));
AcknowledgedResponse putRepositoryResponse = client().admin().cluster().preparePutRepository(repoName)
.setType("hdfs")
.setSettings(Settings.builder()
.put("uri", "hdfs:///")
.put("conf.fs.AbstractFileSystem.hdfs.impl", TestingFs.class.getName())
.put("path", "foo")
.put("chunk_size", randomIntBetween(100, 1000) + "k")
.put("compress", randomBoolean())
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
}
}
2 changes: 1 addition & 1 deletion plugins/repository-s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ if (useFixture) {
def minioAddress = {
int minioPort = postProcessFixture.ext."test.fixtures.minio-fixture.tcp.9000"
assert minioPort > 0
return 'http://127.0.0.1:' + minioPort
'http://127.0.0.1:' + minioPort
}

File minioAddressFile = new File(project.buildDir, 'generated-resources/s3Fixture.address')
Expand Down
Loading

0 comments on commit 455b12a

Please sign in to comment.