diff --git a/plugins/cloud-gce/pom.xml b/plugins/cloud-gce/pom.xml index 7ad404d05292f..45d4e49b5724e 100644 --- a/plugins/cloud-gce/pom.xml +++ b/plugins/cloud-gce/pom.xml @@ -23,12 +23,15 @@ governing permissions and limitations under the License. --> cloud-gce Plugin: Cloud: Google Compute Engine - The Google Compute Engine (GCE) Cloud plugin allows to use GCE API for the unicast discovery mechanism. + The Google Compute Engine (GCE) Cloud plugin allows to use GCE API for the unicast discovery + mechanism. + - org.elasticsearch.plugin.cloud.gce.CloudGcePlugin + org.elasticsearch.plugin.cloud.gce.CloudGcePlugin + v1-rev144-1.21.0 - 1.0.0 + 1.0.0 cloud_gce false @@ -39,21 +42,21 @@ governing permissions and limitations under the License. --> - com.google.apis - google-api-services-compute - ${google.gce.version} - - - com.google.guava - guava-jdk5 - - + com.google.apis + google-api-services-compute + ${google.gce.version} + + + com.google.guava + guava-jdk5 + + + + + com.google.cloud + google-cloud-storage + ${google.gcs.version} - - com.google.cloud - google-cloud-storage - ${google.gcs.version} - @@ -65,7 +68,7 @@ governing permissions and limitations under the License. --> - + de.thetaphi forbiddenapis @@ -117,8 +120,8 @@ governing permissions and limitations under the License. --> - - + + true @@ -133,8 +136,8 @@ governing permissions and limitations under the License. --> - - + + true @@ -165,28 +168,29 @@ governing permissions and limitations under the License. --> maven-assembly-plugin - org.apache.maven.plugins - maven-antrun-plugin - - - - certificate-setup - generate-test-resources - - run - - - - - - - - - - ${skip.unit.tests} - - - + org.apache.maven.plugins + maven-antrun-plugin + + + + certificate-setup + generate-test-resources + + run + + + + + + + + + + ${skip.unit.tests} + + + + diff --git a/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gce/GceComputeService.java b/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gce/GceComputeService.java index 3ba69fe496685..f1c57ae8c7c88 100644 --- a/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gce/GceComputeService.java +++ b/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gce/GceComputeService.java @@ -42,6 +42,7 @@ final class Fields { /** * Return a collection of running instances within the same GCE project + * * @return a collection of running instances within the same GCE project */ Collection instances(); @@ -54,10 +55,11 @@ final class Fields { *
  • `hostname` when we need to resolve the host name
  • *
  • `network-interfaces/0/ip` when we need to resolve private IP
  • * - * @see org.elasticsearch.cloud.gce.network.GceNameResolver for bindings + * * @param metadataPath path to metadata information * @return extracted information (for example a hostname or an IP address) * @throws IOException in case metadata URL is not accessible + * @see org.elasticsearch.cloud.gce.network.GceNameResolver for bindings */ String metadata(String metadataPath) throws IOException; } diff --git a/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gce/GceComputeServiceImpl.java b/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gce/GceComputeServiceImpl.java index 0615c013b4709..e76f78f4efda9 100644 --- a/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gce/GceComputeServiceImpl.java +++ b/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gce/GceComputeServiceImpl.java @@ -55,6 +55,7 @@ import java.util.List; import static org.elasticsearch.common.util.CollectionUtils.eagerTransform; + import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.*; @@ -83,47 +84,47 @@ public class GceComputeServiceImpl extends AbstractLifecycleComponent instances() { - logger.debug("get instances for project [{}], zones [{}]", project, zones); - - List> instanceListByZone = eagerTransform(zones, new Function>() { - @Override - public List apply(final String zoneId) { - try { - // hack around code messiness in GCE code - // TODO: get this fixed - SecurityManager sm = System.getSecurityManager(); - if (sm != null) { - sm.checkPermission(new SpecialPermission()); - } - InstanceList instanceList = AccessController.doPrivileged(new PrivilegedExceptionAction() { - @Override - public InstanceList run() throws Exception { - Compute.Instances.List list = client().instances().list(project, zoneId); - return list.execute(); - } - }); - if (instanceList.isEmpty() || instanceList.getItems() == null) { - return Collections.EMPTY_LIST; + logger.debug("get instances for project [{}], zones [{}]", project, zones); + + List> instanceListByZone = eagerTransform(zones, new Function>() { + @Override + public List apply(final String zoneId) { + try { + // hack around code messiness in GCE code + // TODO: get this fixed + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + sm.checkPermission(new SpecialPermission()); + } + InstanceList instanceList = AccessController.doPrivileged(new PrivilegedExceptionAction() { + @Override + public InstanceList run() throws Exception { + Compute.Instances.List list = client().instances().list(project, zoneId); + return list.execute(); } - - return instanceList.getItems(); - } catch (PrivilegedActionException e) { - logger.warn("Problem fetching instance list for zone {}", zoneId); - logger.debug("Full exception:", e); - + }); + if (instanceList.isEmpty() || instanceList.getItems() == null) { return Collections.EMPTY_LIST; } - } - }); - // Collapse instances from all zones into one neat list - List instanceList = CollectionUtils.iterableAsArrayList(Iterables.concat(instanceListByZone)); + return instanceList.getItems(); + } catch (PrivilegedActionException e) { + logger.warn("Problem fetching instance list for zone {}", zoneId); + logger.debug("Full exception:", e); - if (instanceList.size() == 0) { - logger.warn("disabling GCE discovery. Can not get list of nodes"); + return Collections.EMPTY_LIST; + } } + }); + + // Collapse instances from all zones into one neat list + List instanceList = CollectionUtils.iterableAsArrayList(Iterables.concat(instanceListByZone)); + + if (instanceList.size() == 0) { + logger.warn("disabling GCE discovery. Can not get list of nodes"); + } - return instanceList; + return instanceList; } @Override @@ -146,9 +147,9 @@ public HttpHeaders run() throws IOException { headers.put("Metadata-Flavor", "Google"); HttpResponse response; response = getGceHttpTransport().createRequestFactory() - .buildGetRequest(new GenericUrl(url)) - .setHeaders(headers) - .execute(); + .buildGetRequest(new GenericUrl(url)) + .setHeaders(headers) + .execute(); String metadata = response.parseAsString(); logger.debug("metadata found [{}]", metadata); return metadata; @@ -161,10 +162,14 @@ public HttpHeaders run() throws IOException { private TimeValue refreshInterval = null; private long lastRefresh; - /** Global instance of the HTTP transport. */ + /** + * Global instance of the HTTP transport. + */ private HttpTransport gceHttpTransport; - /** Global instance of the JSON factory. */ + /** + * Global instance of the JSON factory. + */ private JsonFactory gceJsonFactory; @Inject @@ -176,7 +181,7 @@ public GceComputeServiceImpl(Settings settings, NetworkService networkService) { networkService.addCustomNameResolver(new GceNameResolver(settings, this)); this.gceHost = settings.get("cloud.gce.host", DEFAULT_GCE_HOST); - this.metaDataUrl = gceHost + "/computeMetadata/v1/instance"; + this.metaDataUrl = gceHost + "/computeMetadata/v1/instance"; this.gceRootUrl = settings.get("cloud.gce.root_url", DEFAULT_GCE_ROOT_URL); this.tokenServerEncodedUrl = metaDataUrl + "/service-accounts/default/token"; this.validateCerts = settings.getAsBoolean("cloud.gce.validate_certificates", true); @@ -197,7 +202,7 @@ protected synchronized HttpTransport getGceHttpTransport() throws GeneralSecurit public synchronized Compute client() { if (refreshInterval != null && refreshInterval.millis() != 0) { if (client != null && - (refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) { + (refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) { if (logger.isTraceEnabled()) logger.trace("using cache to retrieve client"); return client; } @@ -209,8 +214,8 @@ public synchronized Compute client() { logger.info("starting GCE discovery service"); final ComputeCredential credential = new ComputeCredential.Builder(getGceHttpTransport(), gceJsonFactory) - .setTokenServerEncodedUrl(this.tokenServerEncodedUrl) - .build(); + .setTokenServerEncodedUrl(this.tokenServerEncodedUrl) + .build(); // hack around code messiness in GCE code // TODO: get this fixed @@ -228,12 +233,13 @@ public Void run() throws IOException { logger.debug("token [{}] will expire in [{}] s", credential.getAccessToken(), credential.getExpiresInSeconds()); if (credential.getExpiresInSeconds() != null) { - refreshInterval = TimeValue.timeValueSeconds(credential.getExpiresInSeconds()-1); + refreshInterval = TimeValue.timeValueSeconds(credential.getExpiresInSeconds() - 1); } final boolean ifRetry = settings.getAsBoolean(Fields.RETRY, true); final Compute.Builder builder = new Compute.Builder(getGceHttpTransport(), gceJsonFactory, null) - .setApplicationName(Fields.VERSION).setRootUrl(gceRootUrl);; + .setApplicationName(Fields.VERSION).setRootUrl(gceRootUrl); + ; if (ifRetry) { int maxWait = settings.getAsInt(Fields.MAXWAIT, -1); diff --git a/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gce/network/GceNameResolver.java b/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gce/network/GceNameResolver.java index 22d79fb1614ae..a0cfa1d9d79fe 100644 --- a/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gce/network/GceNameResolver.java +++ b/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gce/network/GceNameResolver.java @@ -101,7 +101,7 @@ private InetAddress[] resolve(String value) throws IOException { gceMetadataPath = Strings.replace(GceAddressResolverType.PRIVATE_IP.gceName, "{{network}}", network); } else { throw new IllegalArgumentException("[" + value + "] is not one of the supported GCE network.host setting. " + - "Expecting _gce_, _gce:privateIp:X_, _gce:hostname_"); + "Expecting _gce_, _gce:privateIp:X_, _gce:hostname_"); } try { @@ -110,7 +110,7 @@ private InetAddress[] resolve(String value) throws IOException { throw new IOException("no gce metadata returned from [" + gceMetadataPath + "] for [" + value + "]"); } // only one address: because we explicitly ask for only one via the GceHostnameType - return new InetAddress[] { InetAddress.getByName(metadataResult) }; + return new InetAddress[]{InetAddress.getByName(metadataResult)}; } catch (IOException e) { throw new IOException("IOException caught when fetching InetAddress from [" + gceMetadataPath + "]", e); } diff --git a/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/GCSModule.java b/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/GCSModule.java index ad51e2fcf8799..6cead5b49cf88 100644 --- a/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/GCSModule.java +++ b/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/GCSModule.java @@ -22,16 +22,16 @@ import org.elasticsearch.common.inject.AbstractModule; public class GCSModule extends AbstractModule { - - static Class storageServiceImpl = GCSServiceImpl.class; - public static Class getStorageServiceImpl() { - return storageServiceImpl; + static Class storageServiceImpl = GCSServiceImpl.class; + + public static Class getStorageServiceImpl() { + return storageServiceImpl; } - @Override - protected void configure() { - bind(GCSService.class).to(storageServiceImpl).asEagerSingleton(); - } + @Override + protected void configure() { + bind(GCSService.class).to(storageServiceImpl).asEagerSingleton(); + } -} \ No newline at end of file +} diff --git a/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/GCSService.java b/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/GCSService.java index 2f88a8c7d0264..4773d6e66535e 100644 --- a/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/GCSService.java +++ b/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/GCSService.java @@ -29,17 +29,19 @@ public interface GCSService extends LifecycleComponent { - final class RepositoryGCS { - private RepositoryGCS() {} - public static final String BUCKET = "repositories.gcs.bucket"; - public static final String BASE_PATH = "repositories.gcs.base_path"; - public static final String APPLICATION_NAME = "repositories.gcs.application_name"; - public static final String SERVICE_ACCOUNT = "repositories.gcs.service_account"; - public static final String HTTP_READ_TIMEOUT = "repositories.gcs.http.read_timeout"; - public static final String HTTP_CONNECT_TIMEOUT = "repositories.gcs.http.connect_timeout"; + final class RepositoryGCS { + private RepositoryGCS() { + } + + public static final String BUCKET = "repositories.gcs.bucket"; + public static final String BASE_PATH = "repositories.gcs.base_path"; + public static final String APPLICATION_NAME = "repositories.gcs.application_name"; + public static final String SERVICE_ACCOUNT = "repositories.gcs.service_account"; + public static final String HTTP_READ_TIMEOUT = "repositories.gcs.http.read_timeout"; + public static final String HTTP_CONNECT_TIMEOUT = "repositories.gcs.http.connect_timeout"; public static final String CHUNK_SIZE = "repositories.gcs.chunk_size"; - public static final String COMPRESS = "repositories.gcs.compress"; - } - - Storage createClient(String serviceAccount,String application,TimeValue connectTimeout, TimeValue readTimeout) throws IOException, GeneralSecurityException; + public static final String COMPRESS = "repositories.gcs.compress"; + } + + Storage createClient(String serviceAccount, String application, TimeValue connectTimeout, TimeValue readTimeout) throws IOException, GeneralSecurityException; } diff --git a/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/GCSServiceImpl.java b/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/GCSServiceImpl.java index b8f627eeebb86..c5c259b3416c7 100644 --- a/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/GCSServiceImpl.java +++ b/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/GCSServiceImpl.java @@ -46,108 +46,108 @@ public class GCSServiceImpl extends AbstractLifecycleComponent implements GCSService { - private Environment env; - - @Inject - public GCSServiceImpl(Environment env) { - super(env.settings()); - this.env = env; - } - - @Override - public Storage createClient(final String serviceAccount, final String application, final TimeValue connectTimeout, - final TimeValue readTimeout) throws IOException, GeneralSecurityException { - - final NetHttpTransport.Builder builder = new NetHttpTransport.Builder(); - builder.trustCertificates(GoogleUtils.getCertificateTrustStore()); - final HttpTransport httpTransport = builder.build(); - - final HttpTransportOptions httpTransportOptions = HttpTransportOptions.newBuilder() - .setConnectTimeout(RepoUtil.toTimeout(connectTimeout)) - .setReadTimeout(RepoUtil.toTimeout(readTimeout)) - .setHttpTransportFactory(new HttpTransportFactory() { - @Override - public HttpTransport create() { - return httpTransport; - } - }) - .build(); - - final ServiceAccountCredentials serviceAccountCredentials = loadCredentials(serviceAccount); - - - final String projectId = getProjectId(serviceAccountCredentials); - - logger.debug("Project ID: [{}]", projectId); - logger.debug("initializing client with service account [{}]", serviceAccountCredentials.toString()); - - return StorageOptions.newBuilder() - .setCredentials(serviceAccountCredentials) - .setTransportOptions(httpTransportOptions) - .setProjectId(projectId) - .build() - .getService(); - } - - /** - * Retrieve the cloud project Id from the service account - * @param serviceAccountCredentials - * @return Project Id - */ - private String getProjectId(ServiceAccountCredentials serviceAccountCredentials) { - - //Assuming that the client email is from the same cloud project - - String[] emailParts = serviceAccountCredentials.getClientEmail().split("@"); - String[] domainPart = emailParts[1].split("\\."); - return domainPart[0]; - } - - /** - * HTTP request initializer that loads credentials from the service account file - * and manages authentication for HTTP requests - */ - private ServiceAccountCredentials loadCredentials(String serviceAccount) throws IOException { - if (serviceAccount == null) { - throw new ElasticsearchException("Cannot load Google Cloud Storage service account file from a null path"); - } - - Path account = env.configFile().resolve(serviceAccount); - if (!Files.exists(account)) { - throw new ElasticsearchException( - "Unable to find service account file [" + serviceAccount + "] defined for repository"); - } - else { - logger.info("Found service account: [{}]", account.toAbsolutePath()); - } - - try (InputStream is = Files.newInputStream(account)) { - return RepoUtil.doPrivileged(new PrivilegedExceptionAction() { - @Override - public ServiceAccountCredentials run() throws Exception { - final Collection scopes = Collections.singleton(StorageScopes.DEVSTORAGE_FULL_CONTROL); - final ServiceAccountCredentials credentials = ServiceAccountCredentials.fromStream(is); - if (credentials.createScopedRequired()) { - return (ServiceAccountCredentials) credentials.createScoped(scopes); - } - return credentials; - } - }); - } - } - - @Override - protected void doStart() { - logger.debug("Started Component GCSServiceImpl"); - } - - @Override - protected void doStop() { - logger.debug("Stopped Component GCSServiceImpl"); - } - - @Override - protected void doClose() { - logger.debug("Closed Component GCSServiceImpl"); - } + private Environment env; + + @Inject + public GCSServiceImpl(Environment env) { + super(env.settings()); + this.env = env; + } + + @Override + public Storage createClient(final String serviceAccount, final String application, final TimeValue connectTimeout, + final TimeValue readTimeout) throws IOException, GeneralSecurityException { + + final NetHttpTransport.Builder builder = new NetHttpTransport.Builder(); + builder.trustCertificates(GoogleUtils.getCertificateTrustStore()); + final HttpTransport httpTransport = builder.build(); + + final HttpTransportOptions httpTransportOptions = HttpTransportOptions.newBuilder() + .setConnectTimeout(RepoUtil.toTimeout(connectTimeout)) + .setReadTimeout(RepoUtil.toTimeout(readTimeout)) + .setHttpTransportFactory(new HttpTransportFactory() { + @Override + public HttpTransport create() { + return httpTransport; + } + }) + .build(); + + final ServiceAccountCredentials serviceAccountCredentials = loadCredentials(serviceAccount); + + + final String projectId = getProjectId(serviceAccountCredentials); + + logger.debug("Project ID: [{}]", projectId); + logger.debug("initializing client with service account [{}]", serviceAccountCredentials.toString()); + + return StorageOptions.newBuilder() + .setCredentials(serviceAccountCredentials) + .setTransportOptions(httpTransportOptions) + .setProjectId(projectId) + .build() + .getService(); + } + + /** + * Retrieve the cloud project Id from the service account + * + * @param serviceAccountCredentials + * @return Project Id + */ + private String getProjectId(ServiceAccountCredentials serviceAccountCredentials) { + + //Assuming that the client email is from the same cloud project + + String[] emailParts = serviceAccountCredentials.getClientEmail().split("@"); + String[] domainPart = emailParts[1].split("\\."); + return domainPart[0]; + } + + /** + * HTTP request initializer that loads credentials from the service account file + * and manages authentication for HTTP requests + */ + private ServiceAccountCredentials loadCredentials(String serviceAccount) throws IOException { + if (serviceAccount == null) { + throw new ElasticsearchException("Cannot load Google Cloud Storage service account file from a null path"); + } + + Path account = env.configFile().resolve(serviceAccount); + if (!Files.exists(account)) { + throw new ElasticsearchException( + "Unable to find service account file [" + serviceAccount + "] defined for repository"); + } else { + logger.info("Found service account: [{}]", account.toAbsolutePath()); + } + + try (InputStream is = Files.newInputStream(account)) { + return RepoUtil.doPrivileged(new PrivilegedExceptionAction() { + @Override + public ServiceAccountCredentials run() throws Exception { + final Collection scopes = Collections.singleton(StorageScopes.DEVSTORAGE_FULL_CONTROL); + final ServiceAccountCredentials credentials = ServiceAccountCredentials.fromStream(is); + if (credentials.createScopedRequired()) { + return (ServiceAccountCredentials) credentials.createScoped(scopes); + } + return credentials; + } + }); + } + } + + @Override + protected void doStart() { + logger.debug("Started Component GCSServiceImpl"); + } + + @Override + protected void doStop() { + logger.debug("Stopped Component GCSServiceImpl"); + } + + @Override + protected void doClose() { + logger.debug("Closed Component GCSServiceImpl"); + } } diff --git a/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/blobstore/GCSBlobContainer.java b/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/blobstore/GCSBlobContainer.java index 59979bb6a851a..4dc7ff418ef39 100644 --- a/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/blobstore/GCSBlobContainer.java +++ b/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/blobstore/GCSBlobContainer.java @@ -34,84 +34,84 @@ public class GCSBlobContainer extends AbstractBlobContainer { - private final ESLogger logger = Loggers.getLogger(getClass()); - private final GCSBlobStore blobStore; - private final String path; - - protected GCSBlobContainer(BlobPath path, GCSBlobStore blobStore) { - super(path); - this.blobStore = blobStore; - String keyPath = path.buildAsString("/"); - if (!keyPath.isEmpty()) { + private final ESLogger logger = Loggers.getLogger(getClass()); + private final GCSBlobStore blobStore; + private final String path; + + protected GCSBlobContainer(BlobPath path, GCSBlobStore blobStore) { + super(path); + this.blobStore = blobStore; + String keyPath = path.buildAsString("/"); + if (!keyPath.isEmpty()) { keyPath = keyPath + "/"; } this.path = keyPath; - + logger.debug("Path: {}", this.path); - } - - @Override - public boolean blobExists(String blobName) { - logger.debug("Check if blob {} exists", blobName); - try { - return blobStore.blobExists(buildKey(blobName)); - } catch (Exception e) { - throw new BlobStoreException("Failed to check if blob [" + blobName + "] exists", e); - } - } - - @Override - public Map listBlobs() throws IOException { - logger.debug("List all blobs"); - return blobStore.listBlobs(path); - } - - @Override - public Map listBlobsByPrefix(String prefix) throws IOException { - logger.debug("List all blobs by prefix: {}", prefix); - return blobStore.listBlobsByPrefix(path, prefix); - } - - @Override - public InputStream readBlob(String blobName) throws IOException { - logger.debug("Read blob {}", blobName); - return blobStore.readBlob(buildKey(blobName)); - } - - @Override - public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { - logger.debug("InputStream - Write to blob {}", blobName); - - if (blobExists(blobName)) { - throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite"); - } - blobStore.writeBlob(buildKey(blobName), inputStream, blobSize); - } - - @Override - public void writeBlob(String blobName, BytesReference bytes) throws IOException { - logger.debug("BytesReference - Write to blob {}", blobName); - - if (blobExists(blobName)) { - throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite"); - } - blobStore.writeBlob(buildKey(blobName), bytes); - } - - @Override - public void deleteBlob(String blobName) throws IOException { - logger.debug("Delete to blob {}", blobName); - blobStore.deleteBlob(buildKey(blobName)); - } - - @Override - public void move(String sourceBlobName, String targetBlobName) throws IOException { - logger.debug("Move blob from {} to {}", sourceBlobName,targetBlobName); - blobStore.moveBlob(buildKey(sourceBlobName), buildKey(targetBlobName)); - } - - protected String buildKey(String blobName) { - assert blobName != null; - return path + blobName; - } + } + + @Override + public boolean blobExists(String blobName) { + logger.debug("Check if blob {} exists", blobName); + try { + return blobStore.blobExists(buildKey(blobName)); + } catch (Exception e) { + throw new BlobStoreException("Failed to check if blob [" + blobName + "] exists", e); + } + } + + @Override + public Map listBlobs() throws IOException { + logger.debug("List all blobs"); + return blobStore.listBlobs(path); + } + + @Override + public Map listBlobsByPrefix(String prefix) throws IOException { + logger.debug("List all blobs by prefix: {}", prefix); + return blobStore.listBlobsByPrefix(path, prefix); + } + + @Override + public InputStream readBlob(String blobName) throws IOException { + logger.debug("Read blob {}", blobName); + return blobStore.readBlob(buildKey(blobName)); + } + + @Override + public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { + logger.debug("InputStream - Write to blob {}", blobName); + + if (blobExists(blobName)) { + throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite"); + } + blobStore.writeBlob(buildKey(blobName), inputStream, blobSize); + } + + @Override + public void writeBlob(String blobName, BytesReference bytes) throws IOException { + logger.debug("BytesReference - Write to blob {}", blobName); + + if (blobExists(blobName)) { + throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite"); + } + blobStore.writeBlob(buildKey(blobName), bytes); + } + + @Override + public void deleteBlob(String blobName) throws IOException { + logger.debug("Delete to blob {}", blobName); + blobStore.deleteBlob(buildKey(blobName)); + } + + @Override + public void move(String sourceBlobName, String targetBlobName) throws IOException { + logger.debug("Move blob from {} to {}", sourceBlobName, targetBlobName); + blobStore.moveBlob(buildKey(sourceBlobName), buildKey(targetBlobName)); + } + + protected String buildKey(String blobName) { + assert blobName != null; + return path + blobName; + } } diff --git a/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/blobstore/GCSBlobStore.java b/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/blobstore/GCSBlobStore.java index 8e09d2d5e8106..7ffbbb0d8fafb 100644 --- a/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/blobstore/GCSBlobStore.java +++ b/plugins/cloud-gce/src/main/java/org/elasticsearch/cloud/gcs/blobstore/GCSBlobStore.java @@ -19,6 +19,23 @@ package org.elasticsearch.cloud.gcs.blobstore; +import com.google.api.gax.paging.Page; +import com.google.cloud.ReadChannel; +import com.google.cloud.WriteChannel; +import com.google.cloud.storage.*; +import com.google.cloud.storage.Storage.BlobListOption; +import com.google.cloud.storage.Storage.CopyRequest; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.blobstore.*; +import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.repositories.gcs.RepoUtil; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -28,433 +45,432 @@ import java.nio.channels.WritableByteChannel; import java.nio.file.NoSuchFileException; import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.common.blobstore.BlobPath; -import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.blobstore.BlobMetaData; -import org.elasticsearch.common.blobstore.BlobStoreException; -import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.repositories.gcs.RepoUtil; -import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.io.Streams; - -import com.google.cloud.ReadChannel; -import com.google.cloud.WriteChannel; -import com.google.cloud.storage.Blob; -import com.google.cloud.storage.BlobId; -import com.google.cloud.storage.BlobInfo; -import com.google.cloud.storage.Storage; -import com.google.cloud.storage.Storage.BlobListOption; -import com.google.cloud.storage.Storage.CopyRequest; -import com.google.cloud.storage.StorageException; +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; public class GCSBlobStore extends AbstractComponent implements BlobStore { - - // The recommended maximum size of a blob that should be uploaded in a single - // request. Larger files should be uploaded over multiple requests (this is - // called "resumable upload") - // https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload - private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024; - - private final Storage storage; - private final String bucket; - - public GCSBlobStore(Settings settings, String bucket, Storage storage) { - super(settings); - this.bucket = bucket; - this.storage = storage; - - if (!doesBucketExist(bucket)) { - throw new BlobStoreException("Bucket [" + bucket + "] does not exist"); - } - } - - @Override - public BlobContainer blobContainer(BlobPath path) { - logger.info("Set blob container with path: {}", path); - return new GCSBlobContainer(path, this); - } - - @Override - public void delete(BlobPath path) throws IOException { - logger.debug("Delete path: {}", path.toString()); - String keyPath = path.buildAsString("/"); - if (!keyPath.isEmpty()) { - keyPath = keyPath + "/"; - } - logger.debug("Delete path: {}", keyPath); - deleteBlobsByPrefix(keyPath); - } - - @Override - public void close() { - logger.debug("Closed GCSBlobStore"); - } - - /** - * Return true if the given bucket exists - * - * @param bucketName name of the bucket - * @return true if the bucket exists, false otherwise - */ - boolean doesBucketExist(final String bucketName) { - logger.debug("Checking if bucekt exists: {}", bucketName); - try { - return RepoUtil.doPrivileged(new PrivilegedExceptionAction() { - @Override - public Boolean run() throws Exception { - return storage.get(bucketName) != null; - } - }); - } catch (IOException e) { - logger.error("Error checking if bucket exists: {}", e, bucketName); - throw new BlobStoreException("Unable to check if bucket [" + bucketName + "] exists", e); - } - } - - /** - * List all blobs in the bucket - * - * @param path base path of the blobs to list - * @return a map of blob names and their metadata - */ - Map listBlobs(final String path) throws IOException { - return listBlobsByPrefix(path, ""); - } - - /** - * List all blobs in the bucket which have a prefix - * - * @param path base path of the blobs to list - * @param prefix prefix of the blobs to list - * @return a map of blob names and their metadata - */ - Map listBlobsByPrefix(final String path, final String prefix) throws IOException { - logger.debug("List blobs for path: {}, prefix: {}", path, prefix); - final String pathPrefix = buildKey(path, prefix); - logger.debug("Path prefix: {}", pathPrefix); - final MapBuilder mapBuilder = MapBuilder.newMapBuilder(); - - return RepoUtil.doPrivileged(new PrivilegedExceptionAction>() { - @Override - public Map run() throws Exception { - Iterable iterable = storage.get(bucket).list(BlobListOption.prefix(pathPrefix)).iterateAll(); - for (Blob blob : iterable) { - logger.debug("Blob: {}", blob.toString()); - assert blob.getName().startsWith(path); - final String suffixName = blob.getName().substring(path.length()); - logger.debug("Suffix: {}", suffixName); - mapBuilder.put(suffixName, new PlainBlobMetaData(suffixName, blob.getSize())); - } - - return mapBuilder.immutableMap(); - } - }); - } - - private Blob getBlob(final BlobId blobId) throws IOException { - return RepoUtil.doPrivileged(new PrivilegedExceptionAction() { - @Override - public Blob run() throws Exception { - return storage.get(blobId); - } - }); - } - - /** - * Returns true if the blob exists in the bucket - * - * @param blobName name of the blob - * @return true if the blob exists, false otherwise - */ - boolean blobExists(String blobName) throws IOException { - final BlobId blobId = BlobId.of(bucket, blobName); - logger.debug("Blob id: [{}]", blobId); - - Blob blob = getBlob(blobId); - - return blob != null; - } - - /** - * Returns an {@link java.io.InputStream} for a given blob - * - * @param blobName name of the blob - * @return an InputStream - */ - InputStream readBlob(final String blobName) throws IOException { - final BlobId blobId = BlobId.of(bucket, blobName); - logger.debug("Blob id: [{}]", blobId); - final Blob blob = getBlob(blobId); - - if (blob == null) { - throw new NoSuchFileException("Blob [" + blobName + "] does not exit"); - } - - final ReadChannel readChannel = RepoUtil.doPrivileged(new PrivilegedExceptionAction() { - @Override - public ReadChannel run() throws Exception { - return blob.reader(); - } - }); - - return Channels.newInputStream(new ReadableByteChannel() { - @Override - public boolean isOpen() { - return readChannel.isOpen(); - } - - @Override - public void close() throws IOException { - RepoUtil.doPrivilegedVoid(new PrivilegedExceptionAction() { - @Override - public Void run() throws Exception { - readChannel.close(); - return null; - } - }); - } - - @Override - public int read(final ByteBuffer dst) throws IOException { - return RepoUtil.doPrivileged(new PrivilegedExceptionAction() { - @Override - public Integer run() throws Exception { - return readChannel.read(dst); - } - }); - } - }); - } - - /** - * Writes a blob in the bucket. - * - * @param blobName name of the blob - * @param inputStream content of the blob to be written - * @param blobSize expected size of the blob to be written - */ - void writeBlob(final String blobName, final InputStream inputStream, final long blobSize) throws IOException { - - final BlobInfo blobInfo = BlobInfo.newBuilder(bucket, blobName).build(); - - logger.debug("Blob info: {}, size: {}, size threshold: {}", blobInfo, blobSize, LARGE_BLOB_THRESHOLD_BYTE_SIZE); - - if (blobSize > LARGE_BLOB_THRESHOLD_BYTE_SIZE) { - writeBlobResumable(blobInfo, inputStream); - } else { - writeBlobMultipart(blobInfo, inputStream, blobSize); - } - } - - /** - * Uploads a blob using the "resumable upload" method (multiple requests, which - * can be independently retried in case of failure, see - * https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload - * - * @param blobInfo the info for the blob to be uploaded - * @param inputStream the stream containing the blob data - */ - private void writeBlobResumable(final BlobInfo blobInfo, InputStream inputStream) throws IOException { - logger.info("Running resumable blob write"); - final WriteChannel writeChannel = RepoUtil.doPrivileged(new PrivilegedExceptionAction() { - @Override - public WriteChannel run() throws Exception { - return storage.writer(blobInfo); - } - }); - - Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() { - @Override - public boolean isOpen() { - return writeChannel.isOpen(); - } - - @Override - public void close() throws IOException { - RepoUtil.doPrivilegedVoid(new PrivilegedExceptionAction() { - @Override - public Void run() throws Exception { - writeChannel.close(); - return null; - } - }); - } - - @SuppressForbidden(reason = "Channel is based of a socket not a file") - @Override - public int write(final ByteBuffer src) throws IOException { - return RepoUtil.doPrivileged(new PrivilegedExceptionAction() { - @Override - public Integer run() throws Exception { - return writeChannel.write(src); - } - }); - } - })); - } - - /** - * Uploads a blob using the "multipart upload" method (a single - * 'multipart/related' request containing both data and metadata. The request is - * gziped), see: - * https://cloud.google.com/storage/docs/json_api/v1/how-tos/multipart-upload - * - * @param blobInfo the info for the blob to be uploaded - * @param inputStream the stream containing the blob data - * @param blobSize the size - */ - private void writeBlobMultipart(final BlobInfo blobInfo, InputStream inputStream, long blobSize) - throws IOException { - logger.info("Running multipart blob write"); - assert blobSize <= LARGE_BLOB_THRESHOLD_BYTE_SIZE : "large blob uploads should use the resumable upload method"; - - final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) blobSize); - Streams.copy(inputStream, baos); - - Blob blob = RepoUtil.doPrivileged(new PrivilegedExceptionAction() { - @Override - public Blob run() throws Exception { - try { - return storage.create(blobInfo, baos.toByteArray()); - } catch(final StorageException se) { - logger.error("Failed to upload data to bucket: {}, Error: {}", se, blobInfo.getBlobId().getName(),se.getMessage()); - - throw new IOException(se.getCause()); - } - } - }); - - logger.info("Created Blob: [{}]", blob); - } - - /** - * - * @param blobName name of the blob - * @param bytes - * @throws IOException - */ - public void writeBlob(final String blobName, final BytesReference bytes) throws IOException { - StreamInput input = StreamInput.wrap(bytes); - writeBlob(blobName, input, bytes.length()); - } - - /** - * Deletes a blob in the bucket - * - * @param blobName name of the blob - */ - void deleteBlob(final String blobName) throws IOException { - final BlobId blobId = BlobId.of(bucket, blobName); - logger.debug("Blob id: {}", blobId); - - final boolean deleted = RepoUtil.doPrivileged(new PrivilegedExceptionAction() { - @Override - public Boolean run() throws Exception { - return storage.delete(blobId); - } - }); - - if (!deleted) { - throw new NoSuchFileException("Blob [" + blobName + "] does not exist"); - } - } - - /** - * Deletes multiple blobs in the bucket that have a given prefix - * - * @param prefix prefix of the buckets to delete - */ - void deleteBlobsByPrefix(final String prefix) throws IOException { - deleteBlobs(listBlobsByPrefix("", prefix).keySet()); - } - - /** - * Deletes multiple blobs in the given bucket (uses a batch request to perform - * this) - * - * @param blobNames names of the bucket to delete - */ - void deleteBlobs(final Collection blobNames) throws IOException { - if (blobNames == null || blobNames.isEmpty()) { - return; - } - - if (blobNames.size() == 1) { - deleteBlob(blobNames.iterator().next()); - return; - } - - final List blobIdsToDelete = new ArrayList<>(); - for(String blobName: blobNames) { - blobIdsToDelete.add(BlobId.of(bucket, blobName)); - } - - final List deletedStatuses = RepoUtil.doPrivileged(new PrivilegedExceptionAction>() { - @Override - public List run() throws Exception { - return storage.delete(blobIdsToDelete); - } - }); - - assert blobIdsToDelete.size() == deletedStatuses.size(); - boolean failed = false; - for (int i = 0; i < blobIdsToDelete.size(); i++) { - if (Boolean.FALSE.equals(deletedStatuses.get(i))) { - logger.error("Failed to delete blob [{}] in bucket [{}]", blobIdsToDelete.get(i).getName(), bucket); - failed = true; - } - } - if (failed) { - throw new IOException("Failed to delete all [" + blobIdsToDelete.size() + "] blobs"); - } - } - - /** - * Moves a blob within the same bucket - * - * @param sourceBlob name of the blob to move - * @param targetBlob new name of the blob in the target bucket - */ - void moveBlob(final String sourceBlobName, final String targetBlobName) throws IOException { - logger.info("Moving Blobs from: {}, to: {}", sourceBlobName, targetBlobName); - final BlobId sourceBlobId = BlobId.of(bucket, sourceBlobName); - final BlobId targetBlobId = BlobId.of(bucket, targetBlobName); - - final CopyRequest request = CopyRequest.newBuilder().setSource(sourceBlobId).setTarget(targetBlobId).build(); - - RepoUtil.doPrivilegedVoid(new PrivilegedExceptionAction() { - @Override - public Void run() throws Exception { - // There's no atomic "move" in GCS so we need to copy and delete - Blob copiedBlob = storage.copy(request).getResult(); - logger.info("Copied Blob: [{}]", copiedBlob.toString()); - final boolean deleted = storage.delete(sourceBlobId); - if (!deleted) { - throw new IOException( - "Failed to move source [" + sourceBlobName + "] to target [" + targetBlobName + "]"); - } - else { - logger.info("Deleted source blob: {}", sourceBlobId); - } - - return null; - } - }); - } - - private String buildKey(String keyPath, String s) { - assert s != null; - return keyPath + s; - } + + // The recommended maximum size of a blob that should be uploaded in a single + // request. Larger files should be uploaded over multiple requests (this is + // called "resumable upload") + // https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload + private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024; + + private final Storage storage; + private final String bucket; + + public GCSBlobStore(Settings settings, String bucket, Storage storage) { + super(settings); + this.bucket = bucket; + this.storage = storage; + + if (!doesBucketExist(bucket)) { + throw new BlobStoreException("Bucket [" + bucket + "] does not exist"); + } + } + + @Override + public BlobContainer blobContainer(BlobPath path) { + logger.info("Set blob container with path: {}", path); + return new GCSBlobContainer(path, this); + } + + @Override + public void delete(BlobPath path) throws IOException { + logger.debug("Delete path: {}", path.toString()); + String keyPath = path.buildAsString("/"); + if (!keyPath.isEmpty()) { + keyPath = keyPath + "/"; + } + logger.debug("Delete path: {}", keyPath); + deleteBlobsByPrefix(keyPath); + } + + @Override + public void close() { + logger.debug("Closed GCSBlobStore"); + } + + /** + * Return true if the given bucket exists + * + * @param bucketName name of the bucket + * @return true if the bucket exists, false otherwise + */ + boolean doesBucketExist(final String bucketName) { + logger.debug("Checking if bucekt exists: {}", bucketName); + try { + return RepoUtil.doPrivileged(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws Exception { + return storage.get(bucketName) != null; + } + }); + } catch (IOException e) { + logger.error("Error checking if bucket exists: {}", e, bucketName); + throw new BlobStoreException("Unable to check if bucket [" + bucketName + "] exists", e); + } + } + + /** + * List all blobs in the bucket + * + * @param path base path of the blobs to list + * @return a map of blob names and their metadata + */ + Map listBlobs(final String path) throws IOException { + return listBlobsByPrefix(path, ""); + } + + /** + * List all blobs in the bucket which have a prefix + * + * @param path base path of the blobs to list + * @param prefix prefix of the blobs to list + * @return a map of blob names and their metadata + */ + Map listBlobsByPrefix(final String path, final String prefix) throws IOException { + logger.debug("List blobs for path: {}, prefix: {}", path, prefix); + final String pathPrefix = buildKey(path, prefix); + logger.debug("Path prefix: {}", pathPrefix); + final MapBuilder mapBuilder = MapBuilder.newMapBuilder(); + + final AtomicLong counter = new AtomicLong(); + + try { + return RepoUtil.doPrivileged(new PrivilegedExceptionAction>() { + @Override + public Map run() throws Exception { + Page page = storage.get(bucket).list(BlobListOption.prefix(pathPrefix)); + + String nextPageToken = page.getNextPageToken(); + Iterator currentPageIterator = page.getValues().iterator(); + counter.addAndGet(processIterator(currentPageIterator, path, mapBuilder)); + + while (nextPageToken != null) { + page = storage.get(bucket) + .list(Storage.BlobListOption.prefix(pathPrefix), + Storage.BlobListOption.pageToken(nextPageToken)); + currentPageIterator = page.getValues().iterator(); + nextPageToken = page.getNextPageToken(); + counter.addAndGet(processIterator(currentPageIterator, path, mapBuilder)); + } + + return mapBuilder.immutableMap(); + } + }); + } catch (Exception e) { + logger.info("Failed after {}", counter.get()); + logger.error("Error on prefix {}: {}", e, pathPrefix, e); + throw e; + } + } + + private long processIterator(Iterator iterator, String path, MapBuilder mapBuilder) { + long counter = 0; + while (iterator.hasNext()) { + Blob blob = iterator.next(); + logger.debug("Blob: {}", blob.toString()); + assert blob.getName().startsWith(path); + final String suffixName = blob.getName().substring(path.length()); + logger.debug("Suffix: {}", suffixName); + mapBuilder.put(suffixName, new PlainBlobMetaData(suffixName, blob.getSize())); + counter++; + } + return counter; + } + + private Blob getBlob(final BlobId blobId) throws IOException { + return RepoUtil.doPrivileged(new PrivilegedExceptionAction() { + @Override + public Blob run() throws Exception { + return storage.get(blobId); + } + }); + } + + /** + * Returns true if the blob exists in the bucket + * + * @param blobName name of the blob + * @return true if the blob exists, false otherwise + */ + boolean blobExists(String blobName) throws IOException { + final BlobId blobId = BlobId.of(bucket, blobName); + logger.debug("Blob id: [{}]", blobId); + + Blob blob = getBlob(blobId); + + return blob != null; + } + + /** + * Returns an {@link java.io.InputStream} for a given blob + * + * @param blobName name of the blob + * @return an InputStream + */ + InputStream readBlob(final String blobName) throws IOException { + final BlobId blobId = BlobId.of(bucket, blobName); + logger.debug("Blob id: [{}]", blobId); + final Blob blob = getBlob(blobId); + + if (blob == null) { + throw new NoSuchFileException("Blob [" + blobName + "] does not exit"); + } + + final ReadChannel readChannel = RepoUtil.doPrivileged(new PrivilegedExceptionAction() { + @Override + public ReadChannel run() throws Exception { + return blob.reader(); + } + }); + + return Channels.newInputStream(new ReadableByteChannel() { + @Override + public boolean isOpen() { + return readChannel.isOpen(); + } + + @Override + public void close() throws IOException { + RepoUtil.doPrivilegedVoid(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + readChannel.close(); + return null; + } + }); + } + + @Override + public int read(final ByteBuffer dst) throws IOException { + return RepoUtil.doPrivileged(new PrivilegedExceptionAction() { + @Override + public Integer run() throws Exception { + return readChannel.read(dst); + } + }); + } + }); + } + + /** + * Writes a blob in the bucket. + * + * @param blobName name of the blob + * @param inputStream content of the blob to be written + * @param blobSize expected size of the blob to be written + */ + void writeBlob(final String blobName, final InputStream inputStream, final long blobSize) throws IOException { + + final BlobInfo blobInfo = BlobInfo.newBuilder(bucket, blobName).build(); + + logger.debug("Blob info: {}, size: {}, size threshold: {}", blobInfo, blobSize, LARGE_BLOB_THRESHOLD_BYTE_SIZE); + + if (blobSize > LARGE_BLOB_THRESHOLD_BYTE_SIZE) { + writeBlobResumable(blobInfo, inputStream); + } else { + writeBlobMultipart(blobInfo, inputStream, blobSize); + } + } + + /** + * Uploads a blob using the "resumable upload" method (multiple requests, which + * can be independently retried in case of failure, see + * https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload + * + * @param blobInfo the info for the blob to be uploaded + * @param inputStream the stream containing the blob data + */ + private void writeBlobResumable(final BlobInfo blobInfo, InputStream inputStream) throws IOException { + logger.info("Running resumable blob write"); + final WriteChannel writeChannel = RepoUtil.doPrivileged(new PrivilegedExceptionAction() { + @Override + public WriteChannel run() throws Exception { + return storage.writer(blobInfo); + } + }); + + Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() { + @Override + public boolean isOpen() { + return writeChannel.isOpen(); + } + + @Override + public void close() throws IOException { + RepoUtil.doPrivilegedVoid(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + writeChannel.close(); + return null; + } + }); + } + + @SuppressForbidden(reason = "Channel is based of a socket not a file") + @Override + public int write(final ByteBuffer src) throws IOException { + return RepoUtil.doPrivileged(new PrivilegedExceptionAction() { + @Override + public Integer run() throws Exception { + return writeChannel.write(src); + } + }); + } + })); + } + + /** + * Uploads a blob using the "multipart upload" method (a single + * 'multipart/related' request containing both data and metadata. The request is + * gziped), see: + * https://cloud.google.com/storage/docs/json_api/v1/how-tos/multipart-upload + * + * @param blobInfo the info for the blob to be uploaded + * @param inputStream the stream containing the blob data + * @param blobSize the size + */ + private void writeBlobMultipart(final BlobInfo blobInfo, InputStream inputStream, long blobSize) + throws IOException { + logger.info("Running multipart blob write"); + assert blobSize <= LARGE_BLOB_THRESHOLD_BYTE_SIZE : "large blob uploads should use the resumable upload method"; + + final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) blobSize); + Streams.copy(inputStream, baos); + + Blob blob = RepoUtil.doPrivileged(new PrivilegedExceptionAction() { + @Override + public Blob run() throws Exception { + try { + return storage.create(blobInfo, baos.toByteArray()); + } catch (final StorageException se) { + logger.error("Failed to upload data to bucket: {}, Error: {}", se, blobInfo.getBlobId().getName(), se.getMessage()); + + throw new IOException(se.getCause()); + } + } + }); + + logger.info("Created Blob: [{}]", blob); + } + + /** + * @param blobName name of the blob + * @param bytes + * @throws IOException + */ + public void writeBlob(final String blobName, final BytesReference bytes) throws IOException { + StreamInput input = StreamInput.wrap(bytes); + writeBlob(blobName, input, bytes.length()); + } + + /** + * Deletes a blob in the bucket + * + * @param blobName name of the blob + */ + void deleteBlob(final String blobName) throws IOException { + final BlobId blobId = BlobId.of(bucket, blobName); + logger.debug("Blob id: {}", blobId); + + final boolean deleted = RepoUtil.doPrivileged(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws Exception { + return storage.delete(blobId); + } + }); + + if (!deleted) { + throw new NoSuchFileException("Blob [" + blobName + "] does not exist"); + } + } + + /** + * Deletes multiple blobs in the bucket that have a given prefix + * + * @param prefix prefix of the buckets to delete + */ + void deleteBlobsByPrefix(final String prefix) throws IOException { + deleteBlobs(listBlobsByPrefix("", prefix).keySet()); + } + + /** + * Deletes multiple blobs in the given bucket (uses a batch request to perform + * this) + * + * @param blobNames names of the bucket to delete + */ + void deleteBlobs(final Collection blobNames) throws IOException { + if (blobNames == null || blobNames.isEmpty()) { + return; + } + + if (blobNames.size() == 1) { + deleteBlob(blobNames.iterator().next()); + return; + } + + final List blobIdsToDelete = new ArrayList<>(); + for (String blobName : blobNames) { + blobIdsToDelete.add(BlobId.of(bucket, blobName)); + } + + final List deletedStatuses = RepoUtil.doPrivileged(new PrivilegedExceptionAction>() { + @Override + public List run() throws Exception { + return storage.delete(blobIdsToDelete); + } + }); + + assert blobIdsToDelete.size() == deletedStatuses.size(); + boolean failed = false; + for (int i = 0; i < blobIdsToDelete.size(); i++) { + if (Boolean.FALSE.equals(deletedStatuses.get(i))) { + logger.error("Failed to delete blob [{}] in bucket [{}]", blobIdsToDelete.get(i).getName(), bucket); + failed = true; + } + } + if (failed) { + throw new IOException("Failed to delete all [" + blobIdsToDelete.size() + "] blobs"); + } + } + + /** + * Moves a blob within the same bucket + * + * @param sourceBlob name of the blob to move + * @param targetBlob new name of the blob in the target bucket + */ + void moveBlob(final String sourceBlobName, final String targetBlobName) throws IOException { + logger.info("Moving Blobs from: {}, to: {}", sourceBlobName, targetBlobName); + final BlobId sourceBlobId = BlobId.of(bucket, sourceBlobName); + final BlobId targetBlobId = BlobId.of(bucket, targetBlobName); + + final CopyRequest request = CopyRequest.newBuilder().setSource(sourceBlobId).setTarget(targetBlobId).build(); + + RepoUtil.doPrivilegedVoid(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + // There's no atomic "move" in GCS so we need to copy and delete + Blob copiedBlob = storage.copy(request).getResult(); + logger.info("Copied Blob: [{}]", copiedBlob.toString()); + final boolean deleted = storage.delete(sourceBlobId); + if (!deleted) { + throw new IOException( + "Failed to move source [" + sourceBlobName + "] to target [" + targetBlobName + "]"); + } else { + logger.info("Deleted source blob: {}", sourceBlobId); + } + + return null; + } + }); + } + + private String buildKey(String keyPath, String s) { + assert s != null; + return keyPath + s; + } } diff --git a/plugins/cloud-gce/src/main/java/org/elasticsearch/discovery/gce/GceDiscovery.java b/plugins/cloud-gce/src/main/java/org/elasticsearch/discovery/gce/GceDiscovery.java index f20d1c74f833f..d2f5e3b5096f5 100755 --- a/plugins/cloud-gce/src/main/java/org/elasticsearch/discovery/gce/GceDiscovery.java +++ b/plugins/cloud-gce/src/main/java/org/elasticsearch/discovery/gce/GceDiscovery.java @@ -44,6 +44,6 @@ public GceDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa DiscoverySettings discoverySettings, ElectMasterService electMasterService) { super(settings, clusterName, threadPool, transportService, clusterService, nodeSettingsService, - pingService, electMasterService, discoverySettings); + pingService, electMasterService, discoverySettings); } } diff --git a/plugins/cloud-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java b/plugins/cloud-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java index c5997c36301c4..2c6c44ef3f62c 100644 --- a/plugins/cloud-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java +++ b/plugins/cloud-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java @@ -70,9 +70,9 @@ static final class Status { @Inject public GceUnicastHostsProvider(Settings settings, GceComputeService gceComputeService, - TransportService transportService, - NetworkService networkService, - Version version) { + TransportService transportService, + NetworkService networkService, + Version version) { super(settings); this.gceComputeService = gceComputeService; this.transportService = transportService; @@ -99,7 +99,7 @@ public GceUnicastHostsProvider(Settings settings, GceComputeService gceComputeSe public List buildDynamicNodes() { if (refreshInterval.millis() != 0) { if (cachedDiscoNodes != null && - (refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) { + (refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) { if (logger.isTraceEnabled()) logger.trace("using cache to retrieve node list"); return cachedDiscoNodes; } @@ -146,7 +146,7 @@ public List buildDynamicNodes() { if (tags.length > 0) { logger.trace("start filtering instance {} with tags {}.", name, tags); if (instance.getTags() == null || instance.getTags().isEmpty() - || instance.getTags().getItems() == null || instance.getTags().getItems().isEmpty()) { + || instance.getTags().getItems() == null || instance.getTags().getItems().isEmpty()) { // If this instance have no tag, we filter it logger.trace("no tags for this instance but we asked for tags. {} won't be part of the cluster.", name); filterByTag = true; @@ -170,7 +170,7 @@ public List buildDynamicNodes() { } if (filterByTag) { logger.trace("filtering out instance {} based tags {}, not part of {}", name, tags, - instance.getTags() == null || instance.getTags().getItems() == null ? "" : instance.getTags()); + instance.getTags() == null || instance.getTags().getItems() == null ? "" : instance.getTags()); continue; } else { logger.trace("instance {} with tags {} is added to discovery", name, tags); @@ -228,7 +228,7 @@ public List buildDynamicNodes() { for (TransportAddress transportAddress : addresses) { logger.trace("adding {}, type {}, address {}, transport_address {}, status {}", name, type, - ip_private, transportAddress, status); + ip_private, transportAddress, status); cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + name + "-" + 0, transportAddress, version.minimumCompatibilityVersion())); } } diff --git a/plugins/cloud-gce/src/main/java/org/elasticsearch/discovery/gce/RetryHttpInitializerWrapper.java b/plugins/cloud-gce/src/main/java/org/elasticsearch/discovery/gce/RetryHttpInitializerWrapper.java index 8ebbaec087a3b..289f7d72c5f04 100644 --- a/plugins/cloud-gce/src/main/java/org/elasticsearch/discovery/gce/RetryHttpInitializerWrapper.java +++ b/plugins/cloud-gce/src/main/java/org/elasticsearch/discovery/gce/RetryHttpInitializerWrapper.java @@ -34,7 +34,7 @@ public class RetryHttpInitializerWrapper implements HttpRequestInitializer { private int maxWait; private static final ESLogger logger = - ESLoggerFactory.getLogger(RetryHttpInitializerWrapper.class.getName()); + ESLoggerFactory.getLogger(RetryHttpInitializerWrapper.class.getName()); // Intercepts the request for filling in the "Authorization" // header field, as well as recovering from certain unsuccessful @@ -55,7 +55,7 @@ public RetryHttpInitializerWrapper(Credential wrappedCredential, int maxWait) { // Use only for testing. RetryHttpInitializerWrapper( - Credential wrappedCredential, Sleeper sleeper, int maxWait) { + Credential wrappedCredential, Sleeper sleeper, int maxWait) { this.wrappedCredential = Objects.requireNonNull(wrappedCredential); this.sleeper = sleeper; this.maxWait = maxWait; @@ -64,43 +64,43 @@ public RetryHttpInitializerWrapper(Credential wrappedCredential, int maxWait) { @Override public void initialize(HttpRequest httpRequest) { final HttpUnsuccessfulResponseHandler backoffHandler = - new HttpBackOffUnsuccessfulResponseHandler( - new ExponentialBackOff.Builder() - .setMaxElapsedTimeMillis(maxWait) - .build()) - .setSleeper(sleeper); + new HttpBackOffUnsuccessfulResponseHandler( + new ExponentialBackOff.Builder() + .setMaxElapsedTimeMillis(maxWait) + .build()) + .setSleeper(sleeper); httpRequest.setInterceptor(wrappedCredential); httpRequest.setUnsuccessfulResponseHandler( - new HttpUnsuccessfulResponseHandler() { - int retry = 0; + new HttpUnsuccessfulResponseHandler() { + int retry = 0; - @Override - public boolean handleResponse(HttpRequest request, HttpResponse response, boolean supportsRetry) throws IOException { - if (wrappedCredential.handleResponse( - request, response, supportsRetry)) { - // If credential decides it can handle it, - // the return code or message indicated - // something specific to authentication, - // and no backoff is desired. - return true; - } else if (backoffHandler.handleResponse( - request, response, supportsRetry)) { - // Otherwise, we defer to the judgement of - // our internal backoff handler. - logger.debug("Retrying [{}] times : [{}]", retry, request.getUrl()); - return true; - } else { - return false; - } + @Override + public boolean handleResponse(HttpRequest request, HttpResponse response, boolean supportsRetry) throws IOException { + if (wrappedCredential.handleResponse( + request, response, supportsRetry)) { + // If credential decides it can handle it, + // the return code or message indicated + // something specific to authentication, + // and no backoff is desired. + return true; + } else if (backoffHandler.handleResponse( + request, response, supportsRetry)) { + // Otherwise, we defer to the judgement of + // our internal backoff handler. + logger.debug("Retrying [{}] times : [{}]", retry, request.getUrl()); + return true; + } else { + return false; } - }); + } + }); httpRequest.setIOExceptionHandler( - new HttpBackOffIOExceptionHandler( - new ExponentialBackOff.Builder() - .setMaxElapsedTimeMillis(maxWait) - .build()) - .setSleeper(sleeper) + new HttpBackOffIOExceptionHandler( + new ExponentialBackOff.Builder() + .setMaxElapsedTimeMillis(maxWait) + .build()) + .setSleeper(sleeper) ); } -} \ No newline at end of file +} diff --git a/plugins/cloud-gce/src/main/java/org/elasticsearch/plugin/cloud/gce/CloudGcePlugin.java b/plugins/cloud-gce/src/main/java/org/elasticsearch/plugin/cloud/gce/CloudGcePlugin.java index 819294ee4040d..b332f77b083d2 100644 --- a/plugins/cloud-gce/src/main/java/org/elasticsearch/plugin/cloud/gce/CloudGcePlugin.java +++ b/plugins/cloud-gce/src/main/java/org/elasticsearch/plugin/cloud/gce/CloudGcePlugin.java @@ -96,7 +96,7 @@ public Collection nodeModules() { if (isDiscoveryAlive(settings, logger)) { modules.add(new GceModule()); } - + modules.add(new GCSModule()); return modules; @@ -108,9 +108,9 @@ public Collection> nodeServices() { if (isDiscoveryAlive(settings, logger)) { services.add(GceModule.getComputeServiceImpl()); } - + services.add(GCSModule.getStorageServiceImpl()); - + return services; } @@ -121,10 +121,10 @@ public void onModule(DiscoveryModule discoveryModule) { } } - public void onModule(RepositoriesModule repositoriesModule) { - repositoriesModule.registerRepository(GCSRepository.TYPE, GCSRepository.class, BlobStoreIndexShardRepository.class); + public void onModule(RepositoriesModule repositoriesModule) { + repositoriesModule.registerRepository(GCSRepository.TYPE, GCSRepository.class, BlobStoreIndexShardRepository.class); } - + /** * Check if discovery is meant to start * @@ -138,11 +138,11 @@ public static boolean isDiscoveryAlive(Settings settings, ESLogger logger) { } if (!checkProperty(GceComputeService.Fields.PROJECT, settings.get(GceComputeService.Fields.PROJECT), logger) || - !checkProperty(GceComputeService.Fields.ZONE, settings.getAsArray(GceComputeService.Fields.ZONE), logger)) { + !checkProperty(GceComputeService.Fields.ZONE, settings.getAsArray(GceComputeService.Fields.ZONE), logger)) { logger.debug("one or more gce discovery settings are missing. " + - "Check elasticsearch.yml file. Should have [{}] and [{}].", - GceComputeService.Fields.PROJECT, - GceComputeService.Fields.ZONE); + "Check elasticsearch.yml file. Should have [{}] and [{}].", + GceComputeService.Fields.PROJECT, + GceComputeService.Fields.ZONE); return false; } diff --git a/plugins/cloud-gce/src/main/java/org/elasticsearch/repositories/gcs/GCSRepository.java b/plugins/cloud-gce/src/main/java/org/elasticsearch/repositories/gcs/GCSRepository.java index 0ebcd096b581f..d5521f1891ffb 100644 --- a/plugins/cloud-gce/src/main/java/org/elasticsearch/repositories/gcs/GCSRepository.java +++ b/plugins/cloud-gce/src/main/java/org/elasticsearch/repositories/gcs/GCSRepository.java @@ -42,99 +42,99 @@ import java.security.GeneralSecurityException; public class GCSRepository extends BlobStoreRepository { - - public static final String TYPE = "gcs"; + + public static final String TYPE = "gcs"; public static final ByteSizeValue MAX_CHUNK_SIZE = new ByteSizeValue(100, ByteSizeUnit.MB); - public static final TimeValue NO_TIMEOUT = timeValueMillis(-1); - - private final GCSBlobStore blobStore; - - private final BlobPath basePath; - - private ByteSizeValue chunkSize; - - private boolean compress; - - @Inject - public GCSRepository(RepositoryName name, RepositorySettings repositorySettings, - IndexShardRepository indexShardRepository, GCSService gcsService) throws IOException, GeneralSecurityException { - super(name.getName(), repositorySettings, indexShardRepository); - - String bucket = repositorySettings.settings().get("bucket", settings.get(RepositoryGCS.BUCKET)); - String application = repositorySettings.settings().get("application_name", - settings.get(RepositoryGCS.APPLICATION_NAME, "repository-gcs")); - String serviceAccount = repositorySettings.settings().get("service_account", - settings.get(RepositoryGCS.SERVICE_ACCOUNT)); - - String configPath = repositorySettings.settings().get("base_path", settings.get(RepositoryGCS.BASE_PATH)); - if (Strings.hasLength(configPath)) { - BlobPath path = new BlobPath(); - for (String elem : Strings.splitStringToArray(configPath, '/')) { - path = path.add(elem); - } - this.basePath = path; - } else { - this.basePath = BlobPath.cleanPath(); - } - - this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", - settings.getAsBytesSize(RepositoryGCS.CHUNK_SIZE, MAX_CHUNK_SIZE)); - this.compress = repositorySettings.settings().getAsBoolean("compress", - settings.getAsBoolean(RepositoryGCS.COMPRESS, false)); - - - TimeValue connectTimeout = null; - TimeValue readTimeout = null; - - TimeValue timeout = repositorySettings.settings().getAsTime("read_timeout", - settings.getAsTime(RepositoryGCS.HTTP_READ_TIMEOUT, NO_TIMEOUT)); - if ((timeout != null) && (timeout.millis() != NO_TIMEOUT.millis())) { - connectTimeout = timeout; - } - - timeout = repositorySettings.settings().getAsTime("connect_timeout", - settings.getAsTime(RepositoryGCS.HTTP_CONNECT_TIMEOUT, NO_TIMEOUT)); - if ((timeout != null) && (timeout.millis() != NO_TIMEOUT.millis())) { - readTimeout = timeout; - } - - logger.info("Repo settings, bucket: [{}], base path: [{}], application name: [{}], service account: [{}], read timeout: [{}], connect timeout: [{}], compressz; [{}], chunk size: [{}]", - bucket,this.basePath,application,serviceAccount,readTimeout, connectTimeout, this.compress, this.chunkSize); - - Storage client = gcsService.createClient(serviceAccount, application, connectTimeout, readTimeout); - - logger.info("Created storage client"); - - this.blobStore = new GCSBlobStore(settings, bucket, client); - } - - /** - * {@inheritDoc} - */ - @Override - protected BlobStore blobStore() { - return blobStore; - } - - @Override - protected BlobPath basePath() { - return basePath; - } - - /** - * {@inheritDoc} - */ - @Override - protected boolean isCompress() { - return compress; - } - - /** - * {@inheritDoc} - */ - @Override - protected ByteSizeValue chunkSize() { - return chunkSize; - } + public static final TimeValue NO_TIMEOUT = timeValueMillis(-1); + + private final GCSBlobStore blobStore; + + private final BlobPath basePath; + + private ByteSizeValue chunkSize; + + private boolean compress; + + @Inject + public GCSRepository(RepositoryName name, RepositorySettings repositorySettings, + IndexShardRepository indexShardRepository, GCSService gcsService) throws IOException, GeneralSecurityException { + super(name.getName(), repositorySettings, indexShardRepository); + + String bucket = repositorySettings.settings().get("bucket", settings.get(RepositoryGCS.BUCKET)); + String application = repositorySettings.settings().get("application_name", + settings.get(RepositoryGCS.APPLICATION_NAME, "repository-gcs")); + String serviceAccount = repositorySettings.settings().get("service_account", + settings.get(RepositoryGCS.SERVICE_ACCOUNT)); + + String configPath = repositorySettings.settings().get("base_path", settings.get(RepositoryGCS.BASE_PATH)); + if (Strings.hasLength(configPath)) { + BlobPath path = new BlobPath(); + for (String elem : Strings.splitStringToArray(configPath, '/')) { + path = path.add(elem); + } + this.basePath = path; + } else { + this.basePath = BlobPath.cleanPath(); + } + + this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", + settings.getAsBytesSize(RepositoryGCS.CHUNK_SIZE, MAX_CHUNK_SIZE)); + this.compress = repositorySettings.settings().getAsBoolean("compress", + settings.getAsBoolean(RepositoryGCS.COMPRESS, false)); + + + TimeValue connectTimeout = null; + TimeValue readTimeout = null; + + TimeValue timeout = repositorySettings.settings().getAsTime("read_timeout", + settings.getAsTime(RepositoryGCS.HTTP_READ_TIMEOUT, NO_TIMEOUT)); + if ((timeout != null) && (timeout.millis() != NO_TIMEOUT.millis())) { + connectTimeout = timeout; + } + + timeout = repositorySettings.settings().getAsTime("connect_timeout", + settings.getAsTime(RepositoryGCS.HTTP_CONNECT_TIMEOUT, NO_TIMEOUT)); + if ((timeout != null) && (timeout.millis() != NO_TIMEOUT.millis())) { + readTimeout = timeout; + } + + logger.info("Repo settings, bucket: [{}], base path: [{}], application name: [{}], service account: [{}], read timeout: [{}], connect timeout: [{}], compressz; [{}], chunk size: [{}]", + bucket, this.basePath, application, serviceAccount, readTimeout, connectTimeout, this.compress, this.chunkSize); + + Storage client = gcsService.createClient(serviceAccount, application, connectTimeout, readTimeout); + + logger.info("Created storage client"); + + this.blobStore = new GCSBlobStore(settings, bucket, client); + } + + /** + * {@inheritDoc} + */ + @Override + protected BlobStore blobStore() { + return blobStore; + } + + @Override + protected BlobPath basePath() { + return basePath; + } + + /** + * {@inheritDoc} + */ + @Override + protected boolean isCompress() { + return compress; + } + + /** + * {@inheritDoc} + */ + @Override + protected ByteSizeValue chunkSize() { + return chunkSize; + } } diff --git a/plugins/cloud-gce/src/main/java/org/elasticsearch/repositories/gcs/RepoUtil.java b/plugins/cloud-gce/src/main/java/org/elasticsearch/repositories/gcs/RepoUtil.java index ca4d2d74941f5..9a25601f5a348 100644 --- a/plugins/cloud-gce/src/main/java/org/elasticsearch/repositories/gcs/RepoUtil.java +++ b/plugins/cloud-gce/src/main/java/org/elasticsearch/repositories/gcs/RepoUtil.java @@ -10,53 +10,54 @@ public final class RepoUtil { - private RepoUtil() {} - + private RepoUtil() { + } + + /** + * Executes a {@link PrivilegedExceptionAction} with privileges enabled. + */ + public static T doPrivileged(PrivilegedExceptionAction operation) throws IOException { + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + sm.checkPermission(new SpecialPermission()); + } + try { + return AccessController.doPrivileged(operation); + } catch (PrivilegedActionException e) { + throw (IOException) e.getException(); + } + } + + /** + * Executes a {@link PrivilegedExceptionAction} with privileges enabled. + */ + public static void doPrivilegedVoid(PrivilegedExceptionAction operation) throws IOException { + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + sm.checkPermission(new SpecialPermission()); + } + try { + AccessController.doPrivileged(operation); + } catch (PrivilegedActionException e) { + throw (IOException) e.getException(); + } + } + /** - * Executes a {@link PrivilegedExceptionAction} with privileges enabled. - */ - public static T doPrivileged(PrivilegedExceptionAction operation) throws IOException { - SecurityManager sm = System.getSecurityManager(); - if (sm != null) { - sm.checkPermission(new SpecialPermission()); - } - try { - return AccessController.doPrivileged(operation); - } catch (PrivilegedActionException e) { - throw (IOException) e.getException(); - } - } - - /** - * Executes a {@link PrivilegedExceptionAction} with privileges enabled. - */ - public static void doPrivilegedVoid(PrivilegedExceptionAction operation) throws IOException { - SecurityManager sm = System.getSecurityManager(); - if (sm != null) { - sm.checkPermission(new SpecialPermission()); - } - try { - AccessController.doPrivileged(operation); - } catch (PrivilegedActionException e) { - throw (IOException) e.getException(); - } - } - - /** - * Converts timeout values from the settings to a timeout value for the Google - * Cloud SDK - **/ - public static Integer toTimeout(final TimeValue timeout) { - // Null or zero in settings means the default timeout - if (timeout == null || TimeValue.timeValueMillis(0).equals(timeout)) { - // negative value means using the default value - return -1; - } - // -1 means infinite timeout - if (TimeValue.timeValueMillis(-1).equals(timeout)) { - // 0 is the infinite timeout expected by Google Cloud SDK - return 0; - } - return (int) timeout.getMillis(); - } + * Converts timeout values from the settings to a timeout value for the Google + * Cloud SDK + **/ + public static Integer toTimeout(final TimeValue timeout) { + // Null or zero in settings means the default timeout + if (timeout == null || TimeValue.timeValueMillis(0).equals(timeout)) { + // negative value means using the default value + return -1; + } + // -1 means infinite timeout + if (TimeValue.timeValueMillis(-1).equals(timeout)) { + // 0 is the infinite timeout expected by Google Cloud SDK + return 0; + } + return (int) timeout.getMillis(); + } } diff --git a/plugins/cloud-gce/src/main/plugin-metadata/plugin-security.policy b/plugins/cloud-gce/src/main/plugin-metadata/plugin-security.policy index fffe6cbbc0f24..ae7d67a49d0e6 100644 --- a/plugins/cloud-gce/src/main/plugin-metadata/plugin-security.policy +++ b/plugins/cloud-gce/src/main/plugin-metadata/plugin-security.policy @@ -27,4 +27,6 @@ grant { // gcs client opens socket connections for to access repository permission java.net.SocketPermission "*", "connect"; + permission java.net.NetPermission "getNetworkInformation"; + permission java.net.URLPermission "*"; };