From 6ce0d67f8275d5f3030622778563050a6fabaeab Mon Sep 17 00:00:00 2001 From: Hussain Towaileb Date: Mon, 7 Mar 2022 14:44:47 +0300 Subject: [PATCH] Refactor ExternalDataUtils/Constants Change-Id: Ie1f1499f13968d421bee43ec7352aea0c2749423 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15644 Tested-by: Jenkins Integration-Tests: Jenkins Reviewed-by: Hussain Towaileb Reviewed-by: Michael Blow --- .../apache/asterix/utils/RedactionUtil.java | 2 +- .../record/reader/aws/AwsS3InputStream.java | 7 +- .../reader/aws/AwsS3InputStreamFactory.java | 4 +- .../parquet/AwsS3ParquetReaderFactory.java | 9 +- .../azure/blob/AzureBlobInputStream.java | 4 +- .../blob/AzureBlobInputStreamFactory.java | 9 +- .../datalake/AzureDataLakeInputStream.java | 4 +- .../AzureDataLakeInputStreamFactory.java | 9 +- .../AzureBlobParquetReaderFactory.java | 15 +- .../AzureDataLakeParquetReaderFactory.java | 16 +- .../record/reader/gcs/GCSInputStream.java | 4 +- .../reader/gcs/GCSInputStreamFactory.java | 3 +- .../external/util/ExternalDataConstants.java | 103 -- .../external/util/ExternalDataUtils.java | 1104 +---------------- .../external/util/aws/s3/S3Constants.java | 63 + .../asterix/external/util/aws/s3/S3Utils.java | 436 +++++++ .../azure/blob_storage/AzureConstants.java | 65 + .../util/azure/blob_storage/AzureUtils.java | 636 ++++++++++ .../util/google/gcs/GCSConstants.java | 27 + .../external/util/google/gcs/GCSUtils.java | 119 ++ .../input/record/reader/awss3/AwsS3Test.java | 4 +- 21 files changed, 1409 insertions(+), 1234 deletions(-) create mode 100644 asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java create mode 100644 asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java create mode 100644 asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureConstants.java create mode 100644 asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureUtils.java create mode 100644 asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java create mode 100644 asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java index 156b78ae932..48cf5115acc 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java @@ -20,7 +20,7 @@ import static java.util.regex.Pattern.CASE_INSENSITIVE; import static java.util.regex.Pattern.DOTALL; -import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.SECRET_ACCESS_KEY_FIELD_NAME; +import static org.apache.asterix.external.util.aws.s3.S3Constants.SECRET_ACCESS_KEY_FIELD_NAME; import java.util.regex.Pattern; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java index f14af53bc77..bbcf9cd6503 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java @@ -18,7 +18,6 @@ */ package org.apache.asterix.external.input.record.reader.aws; -import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import java.io.IOException; @@ -32,7 +31,7 @@ import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.external.util.ExternalDataUtils; +import org.apache.asterix.external.util.aws.s3.S3Utils; import org.apache.commons.lang3.StringUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.util.CleanupUtils; @@ -109,7 +108,7 @@ private boolean doGetInputStream(GetObjectRequest request) throws RuntimeDataExc } private boolean shouldRetry(String errorCode, int currentRetry) { - return currentRetry < MAX_RETRIES && AwsS3.isRetryableError(errorCode); + return currentRetry < MAX_RETRIES && S3Utils.isRetryableError(errorCode); } @Override @@ -134,7 +133,7 @@ public boolean stop() { private S3Client buildAwsS3Client(Map configuration) throws HyracksDataException { try { - return ExternalDataUtils.AwsS3.buildAwsS3Client(configuration); + return S3Utils.buildAwsS3Client(configuration); } catch (CompilationException ex) { throw HyracksDataException.create(ex); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java index 89ea39eeb92..a2413546b64 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java @@ -26,6 +26,7 @@ import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory; import org.apache.asterix.external.util.ExternalDataUtils; +import org.apache.asterix.external.util.aws.s3.S3Utils; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -53,8 +54,7 @@ public void configure(IServiceContext ctx, Map configuration, IW IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration); //Get a list of S3 objects - List filesOnly = - ExternalDataUtils.AwsS3.listS3Objects(configuration, includeExcludeMatcher, warningCollector); + List filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector); // Distribute work load amongst the partitions distributeWorkLoad(filesOnly, getPartitionsCount()); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java index 803e657dd8b..ff93a464607 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java @@ -28,6 +28,8 @@ import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; +import org.apache.asterix.external.util.aws.s3.S3Constants; +import org.apache.asterix.external.util.aws.s3.S3Utils; import org.apache.hadoop.mapred.JobConf; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.IServiceContext; @@ -52,7 +54,7 @@ public void configure(IServiceContext serviceCtx, Map configurat //Configure Hadoop S3 input splits JobConf conf = createHdfsConf(serviceCtx, configuration); int numberOfPartitions = getPartitionConstraint().getLocations().length; - ExternalDataUtils.AwsS3.configureAwsS3HdfsJobConf(conf, configuration, numberOfPartitions); + S3Utils.configureAwsS3HdfsJobConf(conf, configuration, numberOfPartitions); configureHdfsConf(conf, configuration); } @@ -89,8 +91,7 @@ private static String buildPathURIs(Map configuration, IWarningC throws CompilationException { String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration); - List filesOnly = - ExternalDataUtils.AwsS3.listS3Objects(configuration, includeExcludeMatcher, warningCollector); + List filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector); StringBuilder builder = new StringBuilder(); if (!filesOnly.isEmpty()) { @@ -105,7 +106,7 @@ private static String buildPathURIs(Map configuration, IWarningC } private static void appendFileURI(StringBuilder builder, String container, S3Object file) { - builder.append(ExternalDataConstants.AwsS3.HADOOP_S3_PROTOCOL); + builder.append(S3Constants.HADOOP_S3_PROTOCOL); builder.append("://"); builder.append(container); builder.append('/'); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java index cdb3834a6c8..bbfece2dc0d 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.external.input.record.reader.azure.blob; +import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureBlobClient; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import java.io.IOException; @@ -31,7 +32,6 @@ import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.util.LogRedactionUtil; @@ -86,7 +86,7 @@ protected boolean getInputStream() throws IOException { private BlobServiceClient buildAzureClient(IApplicationContext appCtx, Map configuration) throws HyracksDataException { try { - return ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration); + return buildAzureBlobClient(appCtx, configuration); } catch (CompilationException ex) { throw HyracksDataException.create(ex); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java index 064b3196e2c..55c05218fbb 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java @@ -18,6 +18,9 @@ */ package org.apache.asterix.external.input.record.reader.azure.blob; +import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureBlobClient; +import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.listBlobItems; + import java.util.Comparator; import java.util.List; import java.util.Map; @@ -57,9 +60,9 @@ public void configure(IServiceContext ctx, Map configuration, IW // Ensure the validity of include/exclude ExternalDataUtils.validateIncludeExclude(configuration); IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration); - BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration); - List filesOnly = ExternalDataUtils.Azure.listBlobItems(blobServiceClient, configuration, - includeExcludeMatcher, warningCollector); + BlobServiceClient blobServiceClient = buildAzureBlobClient(appCtx, configuration); + List filesOnly = + listBlobItems(blobServiceClient, configuration, includeExcludeMatcher, warningCollector); // Distribute work load amongst the partitions distributeWorkLoad(filesOnly, getPartitionsCount()); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java index e34d1885e0d..7a95222bfd5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.external.input.record.reader.azure.datalake; +import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureDatalakeClient; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import java.io.IOException; @@ -31,7 +32,6 @@ import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.util.LogRedactionUtil; @@ -86,7 +86,7 @@ protected boolean getInputStream() throws IOException { private DataLakeServiceClient buildAzureClient(IApplicationContext appCtx, Map configuration) throws HyracksDataException { try { - return ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration); + return buildAzureDatalakeClient(appCtx, configuration); } catch (CompilationException ex) { throw HyracksDataException.create(ex); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java index e9f8d4ce14c..929cb6e4954 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java @@ -18,6 +18,9 @@ */ package org.apache.asterix.external.input.record.reader.azure.datalake; +import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureDatalakeClient; +import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.listDatalakePathItems; + import java.util.Comparator; import java.util.List; import java.util.Map; @@ -57,9 +60,9 @@ public void configure(IServiceContext ctx, Map configuration, IW // Ensure the validity of include/exclude ExternalDataUtils.validateIncludeExclude(configuration); IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration); - DataLakeServiceClient client = ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration); - List filesOnly = ExternalDataUtils.Azure.listDatalakePathItems(client, configuration, - includeExcludeMatcher, warningCollector); + DataLakeServiceClient client = buildAzureDatalakeClient(appCtx, configuration); + List filesOnly = + listDatalakePathItems(client, configuration, includeExcludeMatcher, warningCollector); // Distribute work load amongst the partitions distributeWorkLoad(filesOnly, getPartitionsCount()); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java index c2251df57ea..e08013c33b0 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java @@ -18,6 +18,11 @@ */ package org.apache.asterix.external.input.record.reader.azure.parquet; +import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.HADOOP_AZURE_BLOB_PROTOCOL; +import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureBlobClient; +import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.configureAzureHdfsJobConf; +import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.listBlobItems; + import java.util.Collections; import java.util.List; import java.util.Map; @@ -47,7 +52,7 @@ public class AzureBlobParquetReaderFactory extends HDFSDataSourceFactory { public void configure(IServiceContext serviceCtx, Map configuration, IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException { IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext(); - BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration); + BlobServiceClient blobServiceClient = buildAzureBlobClient(appCtx, configuration); //Get endpoint String endPoint = extractEndPoint(blobServiceClient.getAccountUrl()); //Get path @@ -57,7 +62,7 @@ public void configure(IServiceContext serviceCtx, Map configurat //Configure Hadoop Azure input splits JobConf conf = createHdfsConf(serviceCtx, configuration); - ExternalDataUtils.Azure.configureAzureHdfsJobConf(conf, configuration, endPoint); + configureAzureHdfsJobConf(conf, configuration, endPoint); configureHdfsConf(conf, configuration); } @@ -94,8 +99,8 @@ private static void putAzureBlobConfToHadoopConf(Map configurati private static String buildPathURIs(Map configuration, IWarningCollector warningCollector, BlobServiceClient blobServiceClient, String endPoint) throws CompilationException { IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration); - List filesOnly = ExternalDataUtils.Azure.listBlobItems(blobServiceClient, configuration, - includeExcludeMatcher, warningCollector); + List filesOnly = + listBlobItems(blobServiceClient, configuration, includeExcludeMatcher, warningCollector); StringBuilder builder = new StringBuilder(); String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); @@ -118,7 +123,7 @@ private static String extractEndPoint(String uri) { } private static void appendFileURI(StringBuilder builder, String container, String endPoint, BlobItem file) { - builder.append(ExternalDataConstants.Azure.HADOOP_AZURE_BLOB_PROTOCOL); + builder.append(HADOOP_AZURE_BLOB_PROTOCOL); builder.append("://"); builder.append(container); builder.append('@'); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java index db878682550..c98fc8b6956 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java @@ -18,6 +18,11 @@ */ package org.apache.asterix.external.input.record.reader.azure.parquet; +import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.HADOOP_AZURE_DATALAKE_PROTOCOL; +import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureDatalakeClient; +import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.configureAzureHdfsJobConf; +import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.listDatalakePathItems; + import java.util.Collections; import java.util.List; import java.util.Map; @@ -47,8 +52,7 @@ public class AzureDataLakeParquetReaderFactory extends HDFSDataSourceFactory { public void configure(IServiceContext serviceCtx, Map configuration, IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException { IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext(); - DataLakeServiceClient dataLakeServiceClient = - ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration); + DataLakeServiceClient dataLakeServiceClient = buildAzureDatalakeClient(appCtx, configuration); //Get endpoint String endPoint = extractEndPoint(dataLakeServiceClient.getAccountUrl()); @@ -61,7 +65,7 @@ public void configure(IServiceContext serviceCtx, Map configurat //Configure Hadoop Azure input splits JobConf conf = createHdfsConf(serviceCtx, configuration); - ExternalDataUtils.Azure.configureAzureHdfsJobConf(conf, configuration, endPoint); + configureAzureHdfsJobConf(conf, configuration, endPoint); configureHdfsConf(conf, configuration); } @@ -98,8 +102,8 @@ private static void putAzureDataLakeConfToHadoopConf(Map configu private static String buildPathURIs(Map configuration, IWarningCollector warningCollector, DataLakeServiceClient dataLakeServiceClient, String endPoint) throws CompilationException { IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration); - List filesOnly = ExternalDataUtils.Azure.listDatalakePathItems(dataLakeServiceClient, configuration, - includeExcludeMatcher, warningCollector); + List filesOnly = + listDatalakePathItems(dataLakeServiceClient, configuration, includeExcludeMatcher, warningCollector); StringBuilder builder = new StringBuilder(); String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); @@ -122,7 +126,7 @@ private static String extractEndPoint(String uri) { } private static void appendFileURI(StringBuilder builder, String container, String endPoint, PathItem file) { - builder.append(ExternalDataConstants.Azure.HADOOP_AZURE_DATALAKE_PROTOCOL); + builder.append(HADOOP_AZURE_DATALAKE_PROTOCOL); builder.append("://"); builder.append(container); builder.append('@'); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java index 652fa3e4b0f..007e8be1b37 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java @@ -32,7 +32,7 @@ import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.external.util.ExternalDataUtils; +import org.apache.asterix.external.util.google.gcs.GCSUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.util.CleanupUtils; @@ -134,7 +134,7 @@ public boolean stop() { private Storage buildClient(Map configuration) throws HyracksDataException { try { - return ExternalDataUtils.GCS.buildClient(configuration); + return GCSUtils.buildClient(configuration); } catch (CompilationException ex) { throw HyracksDataException.create(ex); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java index 0e7ea905704..1bc51f27d4b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java @@ -19,6 +19,7 @@ package org.apache.asterix.external.input.record.reader.gcs; import static org.apache.asterix.external.util.ExternalDataUtils.getIncludeExcludeMatchers; +import static org.apache.asterix.external.util.google.gcs.GCSUtils.buildClient; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import java.util.ArrayList; @@ -67,7 +68,7 @@ public void configure(IServiceContext ctx, Map configuration, IW // Prepare to retrieve the objects List filesOnly = new ArrayList<>(); String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); - Storage gcs = ExternalDataUtils.GCS.buildClient(configuration); + Storage gcs = buildClient(configuration); Storage.BlobListOption options = Storage.BlobListOption.prefix(ExternalDataUtils.getPrefix(configuration)); Page items; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index f0b9c90428c..429706e30e3 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -333,107 +333,4 @@ private ParquetOptions() { */ public static final Set VALID_TIME_ZONES = Set.of(TimeZone.getAvailableIDs()); } - - public static class AwsS3 { - private AwsS3() { - throw new AssertionError("do not instantiate"); - } - - public static final String REGION_FIELD_NAME = "region"; - public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId"; - public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey"; - public static final String SESSION_TOKEN_FIELD_NAME = "sessionToken"; - public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint"; - - // AWS S3 specific error codes - public static final String ERROR_INTERNAL_ERROR = "InternalError"; - public static final String ERROR_SLOW_DOWN = "SlowDown"; - public static final String ERROR_METHOD_NOT_IMPLEMENTED = "NotImplemented"; - - public static boolean isRetryableError(String errorCode) { - return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN); - } - - /* - * Hadoop-AWS - * AWS connectors for s3 and s3n are deprecated. - */ - public static final String HADOOP_ACCESS_KEY_ID = "fs.s3a.access.key"; - public static final String HADOOP_SECRET_ACCESS_KEY = "fs.s3a.secret.key"; - public static final String HADOOP_SESSION_TOKEN = "fs.s3a.session.token"; - public static final String HADOOP_REGION = "fs.s3a.region"; - public static final String HADOOP_SERVICE_END_POINT = "fs.s3a.endpoint"; - - /* - * Internal configurations - */ - //Allows accessing directories as file system path - public static final String HADOOP_PATH_STYLE_ACCESS = "fs.s3a.path.style.access"; - //The number of maximum HTTP connections in connection pool - public static final String HADOOP_S3_CONNECTION_POOL_SIZE = "fs.s3a.connection.maximum"; - //S3 used protocol - public static final String HADOOP_S3_PROTOCOL = "s3a"; - - //Hadoop credentials provider key - public static final String HADOOP_CREDENTIAL_PROVIDER_KEY = "fs.s3a.aws.credentials.provider"; - //Anonymous credential provider - public static final String HADOOP_ANONYMOUS_ACCESS = "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"; - //Temporary credential provider - public static final String HADOOP_TEMP_ACCESS = "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider"; - - } - - /* - * Note: Azure Blob and Azure Datalake use identical authentication, so they are using the same properties. - * If they end up diverging, then properties for AzureBlob and AzureDataLake need to be created. - */ - public static class Azure { - private Azure() { - throw new AssertionError("do not instantiate"); - } - - /* - * Asterix Configuration Keys - */ - public static final String MANAGED_IDENTITY_ID_FIELD_NAME = "managedIdentityId"; - public static final String ACCOUNT_NAME_FIELD_NAME = "accountName"; - public static final String ACCOUNT_KEY_FIELD_NAME = "accountKey"; - public static final String SHARED_ACCESS_SIGNATURE_FIELD_NAME = "sharedAccessSignature"; - public static final String TENANT_ID_FIELD_NAME = "tenantId"; - public static final String CLIENT_ID_FIELD_NAME = "clientId"; - public static final String CLIENT_SECRET_FIELD_NAME = "clientSecret"; - public static final String CLIENT_CERTIFICATE_FIELD_NAME = "clientCertificate"; - public static final String CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME = "clientCertificatePassword"; - public static final String ENDPOINT_FIELD_NAME = "endpoint"; - - // Specific Azure data lake property - /* - The behavior of Data Lake (true file system) is to read the files of the specified prefix only, example: - storage/myData/personal/file1.json - storage/myData/personal/file2.json - storage/myData/file3.json - If the prefix used is "myData", then only the file file3.json is read. However, if the property "recursive" - is set to "true" when creating the external dataset, then it goes recursively overall the paths, and the result - is file1.json, file2.json and file3.json. - */ - public static final String RECURSIVE_FIELD_NAME = "recursive"; - - /* - * Hadoop-Azure - */ - //Used when accountName and accessKey are provided - public static final String HADOOP_AZURE_FS_ACCOUNT_KEY = "fs.azure.account.key"; - //Used when a connectionString is provided - public static final String HADOOP_AZURE_FS_SAS = "fs.azure.sas"; - public static final String HADOOP_AZURE_BLOB_PROTOCOL = "wasbs"; - public static final String HADOOP_AZURE_DATALAKE_PROTOCOL = "abfss"; - } - - public static class GCS { - private GCS() { - throw new AssertionError("do not instantiate"); - } - - public static final String JSON_CREDENTIALS_FIELD_NAME = "jsonCredentials"; - } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 8e38eed2702..702ef42d6ff 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -18,43 +18,9 @@ */ package org.apache.asterix.external.util; -import static com.google.cloud.storage.Storage.BlobListOption; -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR; import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_REQ_PARAM_VAL; import static org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_NOT_ALLOWED_AT_SAME_TIME; import static org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_REQUIRED; -import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT; -import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT; -import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT; -import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED; -import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ACCESS_KEY_ID_FIELD_NAME; -import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_METHOD_NOT_IMPLEMENTED; -import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_ACCESS_KEY_ID; -import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_ANONYMOUS_ACCESS; -import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_CREDENTIAL_PROVIDER_KEY; -import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_PATH_STYLE_ACCESS; -import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_S3_CONNECTION_POOL_SIZE; -import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_S3_PROTOCOL; -import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_SECRET_ACCESS_KEY; -import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_SESSION_TOKEN; -import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_TEMP_ACCESS; -import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.SECRET_ACCESS_KEY_FIELD_NAME; -import static org.apache.asterix.external.util.ExternalDataConstants.Azure.ACCOUNT_KEY_FIELD_NAME; -import static org.apache.asterix.external.util.ExternalDataConstants.Azure.ACCOUNT_NAME_FIELD_NAME; -import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_CERTIFICATE_FIELD_NAME; -import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME; -import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_ID_FIELD_NAME; -import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_SECRET_FIELD_NAME; -import static org.apache.asterix.external.util.ExternalDataConstants.Azure.ENDPOINT_FIELD_NAME; -import static org.apache.asterix.external.util.ExternalDataConstants.Azure.HADOOP_AZURE_BLOB_PROTOCOL; -import static org.apache.asterix.external.util.ExternalDataConstants.Azure.HADOOP_AZURE_FS_ACCOUNT_KEY; -import static org.apache.asterix.external.util.ExternalDataConstants.Azure.HADOOP_AZURE_FS_SAS; -import static org.apache.asterix.external.util.ExternalDataConstants.Azure.MANAGED_IDENTITY_ID_FIELD_NAME; -import static org.apache.asterix.external.util.ExternalDataConstants.Azure.RECURSIVE_FIELD_NAME; -import static org.apache.asterix.external.util.ExternalDataConstants.Azure.SHARED_ACCESS_SIGNATURE_FIELD_NAME; -import static org.apache.asterix.external.util.ExternalDataConstants.Azure.TENANT_ID_FIELD_NAME; -import static org.apache.asterix.external.util.ExternalDataConstants.GCS.JSON_CREDENTIALS_FIELD_NAME; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_ADAPTER_NAME_GCS; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_DELIMITER; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_ESCAPE; @@ -65,18 +31,14 @@ import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START; +import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureBlobProperties; +import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties; +import static org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties; import static org.apache.asterix.runtime.evaluators.functions.StringEvaluatorUtils.RESERVED_REGEX_CHARS; -import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; @@ -85,7 +47,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.BiPredicate; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; @@ -106,20 +67,17 @@ import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher; import org.apache.asterix.external.library.JavaLibrary; import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions; +import org.apache.asterix.external.util.aws.s3.S3Utils; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.AUnionType; import org.apache.asterix.runtime.evaluators.common.NumberUtils; import org.apache.asterix.runtime.projection.DataProjectionInfo; import org.apache.asterix.runtime.projection.FunctionCallInformation; -import org.apache.hadoop.fs.s3a.Constants; -import org.apache.hadoop.mapred.JobConf; import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.IWarningCollector; import org.apache.hyracks.api.exceptions.SourceLocation; -import org.apache.hyracks.api.exceptions.Warning; -import org.apache.hyracks.api.util.CleanupUtils; import org.apache.hyracks.dataflow.common.data.parsers.BooleanParserFactory; import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory; import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory; @@ -129,46 +87,6 @@ import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory; import org.apache.hyracks.util.StorageUtil; -import com.azure.core.credential.AzureSasCredential; -import com.azure.core.http.rest.PagedIterable; -import com.azure.identity.ClientCertificateCredentialBuilder; -import com.azure.identity.ClientSecretCredentialBuilder; -import com.azure.identity.ManagedIdentityCredentialBuilder; -import com.azure.storage.blob.BlobContainerClient; -import com.azure.storage.blob.BlobServiceClient; -import com.azure.storage.blob.BlobServiceClientBuilder; -import com.azure.storage.blob.models.BlobItem; -import com.azure.storage.blob.models.ListBlobsOptions; -import com.azure.storage.common.StorageSharedKeyCredential; -import com.azure.storage.common.policy.RequestRetryOptions; -import com.azure.storage.file.datalake.DataLakeFileSystemClient; -import com.azure.storage.file.datalake.DataLakeServiceClient; -import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; -import com.azure.storage.file.datalake.models.ListPathsOptions; -import com.azure.storage.file.datalake.models.PathItem; -import com.google.api.gax.paging.Page; -import com.google.auth.oauth2.ServiceAccountCredentials; -import com.google.cloud.storage.Blob; -import com.google.cloud.storage.Storage; -import com.google.cloud.storage.StorageOptions; - -import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.core.exception.SdkException; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.S3ClientBuilder; -import software.amazon.awssdk.services.s3.model.ListObjectsRequest; -import software.amazon.awssdk.services.s3.model.ListObjectsResponse; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; -import software.amazon.awssdk.services.s3.model.S3Exception; -import software.amazon.awssdk.services.s3.model.S3Object; -import software.amazon.awssdk.services.s3.model.S3Response; - public class ExternalDataUtils { private static final Map valueParserFactoryMap = new EnumMap<>(ATypeTag.class); private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024; @@ -604,16 +522,16 @@ public static void validateAdapterSpecificProperties(Map configu switch (type) { case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3: - AwsS3.validateProperties(configuration, srcLoc, collector); + S3Utils.validateProperties(configuration, srcLoc, collector); break; case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB: - Azure.validateAzureBlobProperties(configuration, srcLoc, collector, appCtx); + validateAzureBlobProperties(configuration, srcLoc, collector, appCtx); break; case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE: - Azure.validateAzureDataLakeProperties(configuration, srcLoc, collector, appCtx); + validateAzureDataLakeProperties(configuration, srcLoc, collector, appCtx); break; case KEY_ADAPTER_NAME_GCS: - GCS.validateProperties(configuration, srcLoc, collector); + validateProperties(configuration, srcLoc, collector); break; default: // Nothing needs to be done @@ -844,7 +762,7 @@ public static void validateParquetTypeAndConfiguration(Map prope } } - private static boolean isParquetFormat(Map properties) { + public static boolean isParquetFormat(Map properties) { String inputFormat = properties.get(ExternalDataConstants.KEY_INPUT_FORMAT); return ExternalDataConstants.CLASS_NAME_PARQUET_INPUT_FORMAT.equals(inputFormat) || ExternalDataConstants.INPUT_FORMAT_PARQUET.equals(inputFormat) @@ -893,1008 +811,6 @@ static String serializeFunctionCallInfoToString(Map configuration) throws CompilationException { - // TODO(Hussain): Need to ensure that all required parameters are present in a previous step - String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME); - String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME); - String sessionToken = configuration.get(ExternalDataConstants.AwsS3.SESSION_TOKEN_FIELD_NAME); - String regionId = configuration.get(ExternalDataConstants.AwsS3.REGION_FIELD_NAME); - String serviceEndpoint = configuration.get(ExternalDataConstants.AwsS3.SERVICE_END_POINT_FIELD_NAME); - - S3ClientBuilder builder = S3Client.builder(); - - // Credentials - AwsCredentialsProvider credentialsProvider; - - // No auth required - if (accessKeyId == null) { - credentialsProvider = AnonymousCredentialsProvider.create(); - } else { - // auth required, check for temporary or permanent credentials - if (sessionToken != null) { - credentialsProvider = StaticCredentialsProvider - .create(AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken)); - } else { - credentialsProvider = - StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey)); - } - } - - builder.credentialsProvider(credentialsProvider); - - // Validate the region - List regions = S3Client.serviceMetadata().regions(); - Optional selectedRegion = - regions.stream().filter(region -> region.id().equals(regionId)).findFirst(); - - if (selectedRegion.isEmpty()) { - throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId); - } - builder.region(selectedRegion.get()); - - // Validate the service endpoint if present - if (serviceEndpoint != null) { - try { - URI uri = new URI(serviceEndpoint); - try { - builder.endpointOverride(uri); - } catch (NullPointerException ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); - } - } catch (URISyntaxException ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, - String.format("Invalid service endpoint %s", serviceEndpoint)); - } - } - - return builder.build(); - } - - /** - * Builds the S3 client using the provided configuration - * - * @param configuration properties - * @param numberOfPartitions number of partitions in the cluster - */ - public static void configureAwsS3HdfsJobConf(JobConf conf, Map configuration, - int numberOfPartitions) { - String accessKeyId = configuration.get(ExternalDataConstants.AwsS3.ACCESS_KEY_ID_FIELD_NAME); - String secretAccessKey = configuration.get(ExternalDataConstants.AwsS3.SECRET_ACCESS_KEY_FIELD_NAME); - String sessionToken = configuration.get(ExternalDataConstants.AwsS3.SESSION_TOKEN_FIELD_NAME); - String serviceEndpoint = configuration.get(ExternalDataConstants.AwsS3.SERVICE_END_POINT_FIELD_NAME); - - //Disable caching S3 FileSystem - HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_S3_PROTOCOL); - - /* - * Authentication Methods: - * 1- Anonymous: no accessKeyId and no secretAccessKey - * 2- Temporary: has to provide accessKeyId, secretAccessKey and sessionToken - * 3- Private: has to provide accessKeyId and secretAccessKey - */ - if (accessKeyId == null) { - //Tells hadoop-aws it is an anonymous access - conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS_ACCESS); - } else { - conf.set(HADOOP_ACCESS_KEY_ID, accessKeyId); - conf.set(HADOOP_SECRET_ACCESS_KEY, secretAccessKey); - if (sessionToken != null) { - conf.set(HADOOP_SESSION_TOKEN, sessionToken); - //Tells hadoop-aws it is a temporary access - conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_TEMP_ACCESS); - } - } - - /* - * This is to allow S3 definition to have path-style form. Should always be true to match the current - * way we access files in S3 - */ - conf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE); - - /* - * Set the size of S3 connection pool to be the number of partitions - */ - conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions)); - - if (serviceEndpoint != null) { - // Validation of the URL should be done at hadoop-aws level - conf.set(ExternalDataConstants.AwsS3.HADOOP_SERVICE_END_POINT, serviceEndpoint); - } else { - //Region is ignored and buckets could be found by the central endpoint - conf.set(ExternalDataConstants.AwsS3.HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT); - } - } - - /** - * Validate external dataset properties - * - * @param configuration properties - * @throws CompilationException Compilation exception - */ - public static void validateProperties(Map configuration, SourceLocation srcLoc, - IWarningCollector collector) throws CompilationException { - - // check if the format property is present - if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { - throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); - } - - // Both parameters should be passed, or neither should be passed (for anonymous/no auth) - String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME); - String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME); - if (accessKeyId == null || secretAccessKey == null) { - // If one is passed, the other is required - if (accessKeyId != null) { - throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME, - ACCESS_KEY_ID_FIELD_NAME); - } else if (secretAccessKey != null) { - throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME, - SECRET_ACCESS_KEY_FIELD_NAME); - } - } - - validateIncludeExclude(configuration); - - // Check if the bucket is present - S3Client s3Client = buildAwsS3Client(configuration); - S3Response response; - boolean useOldApi = false; - String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); - String prefix = getPrefix(configuration); - - try { - response = isBucketEmpty(s3Client, container, prefix, false); - } catch (S3Exception ex) { - // Method not implemented, try falling back to old API - try { - // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html - if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) { - useOldApi = true; - response = isBucketEmpty(s3Client, container, prefix, true); - } else { - throw ex; - } - } catch (SdkException ex2) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); - } - } catch (SdkException ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); - } finally { - if (s3Client != null) { - CleanupUtils.close(s3Client, null); - } - } - - boolean isEmpty = useOldApi ? ((ListObjectsResponse) response).contents().isEmpty() - : ((ListObjectsV2Response) response).contents().isEmpty(); - if (isEmpty && collector.shouldWarn()) { - Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); - collector.warn(warning); - } - - // Returns 200 only in case the bucket exists, otherwise, throws an exception. However, to - // ensure coverage, check if the result is successful as well and not only catch exceptions - if (!response.sdkHttpResponse().isSuccessful()) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container); - } - } - - /** - * Checks for a single object in the specified bucket to determine if the bucket is empty or not. - * - * @param s3Client s3 client - * @param container the container name - * @param prefix Prefix to be used - * @param useOldApi flag whether to use the old API or not - * @return returns the S3 response - */ - private static S3Response isBucketEmpty(S3Client s3Client, String container, String prefix, boolean useOldApi) { - S3Response response; - if (useOldApi) { - ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder(); - listObjectsBuilder.prefix(prefix); - response = s3Client.listObjects(listObjectsBuilder.bucket(container).maxKeys(1).build()); - } else { - ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder(); - listObjectsBuilder.prefix(prefix); - response = s3Client.listObjectsV2(listObjectsBuilder.bucket(container).maxKeys(1).build()); - } - return response; - } - - /** - * Returns the lists of S3 objects. - * - * @param configuration properties - * @param includeExcludeMatcher include/exclude matchers to apply - */ - public static List listS3Objects(Map configuration, - IncludeExcludeMatcher includeExcludeMatcher, IWarningCollector warningCollector) - throws CompilationException { - // Prepare to retrieve the objects - List filesOnly; - String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); - S3Client s3Client = buildAwsS3Client(configuration); - String prefix = getPrefix(configuration); - - try { - filesOnly = listS3Objects(s3Client, container, prefix, includeExcludeMatcher); - } catch (S3Exception ex) { - // New API is not implemented, try falling back to old API - try { - // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html - if (ex.awsErrorDetails().errorCode() - .equals(ExternalDataConstants.AwsS3.ERROR_METHOD_NOT_IMPLEMENTED)) { - filesOnly = oldApiListS3Objects(s3Client, container, prefix, includeExcludeMatcher); - } else { - throw ex; - } - } catch (SdkException ex2) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); - } - } catch (SdkException ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); - } finally { - if (s3Client != null) { - CleanupUtils.close(s3Client, null); - } - } - - // Warn if no files are returned - if (filesOnly.isEmpty() && warningCollector.shouldWarn()) { - Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); - warningCollector.warn(warning); - } - - return filesOnly; - } - - /** - * Uses the latest API to retrieve the objects from the storage. - * - * @param s3Client S3 client - * @param container container name - * @param prefix definition prefix - * @param includeExcludeMatcher include/exclude matchers to apply - */ - private static List listS3Objects(S3Client s3Client, String container, String prefix, - IncludeExcludeMatcher includeExcludeMatcher) { - String newMarker = null; - List filesOnly = new ArrayList<>(); - - ListObjectsV2Response listObjectsResponse; - ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container); - listObjectsBuilder.prefix(prefix); - - while (true) { - // List the objects from the start, or from the last marker in case of truncated result - if (newMarker == null) { - listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build()); - } else { - listObjectsResponse = - s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build()); - } - - // Collect the paths to files only - collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(), - includeExcludeMatcher.getMatchersList(), filesOnly); - - // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request - if (!listObjectsResponse.isTruncated()) { - break; - } else { - newMarker = listObjectsResponse.nextContinuationToken(); - } - } - - return filesOnly; - } - - /** - * Uses the old API (in case the new API is not implemented) to retrieve the objects from the storage - * - * @param s3Client S3 client - * @param container container name - * @param prefix definition prefix - * @param includeExcludeMatcher include/exclude matchers to apply - */ - private static List oldApiListS3Objects(S3Client s3Client, String container, String prefix, - IncludeExcludeMatcher includeExcludeMatcher) { - String newMarker = null; - List filesOnly = new ArrayList<>(); - - ListObjectsResponse listObjectsResponse; - ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder().bucket(container); - listObjectsBuilder.prefix(prefix); - - while (true) { - // List the objects from the start, or from the last marker in case of truncated result - if (newMarker == null) { - listObjectsResponse = s3Client.listObjects(listObjectsBuilder.build()); - } else { - listObjectsResponse = s3Client.listObjects(listObjectsBuilder.marker(newMarker).build()); - } - - // Collect the paths to files only - collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(), - includeExcludeMatcher.getMatchersList(), filesOnly); - - // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request - if (!listObjectsResponse.isTruncated()) { - break; - } else { - newMarker = listObjectsResponse.nextMarker(); - } - } - - return filesOnly; - } - - /** - * AWS S3 returns all the objects as paths, not differentiating between folder and files. The path is considered - * a file if it does not end up with a "/" which is the separator in a folder structure. - * - * @param s3Objects List of returned objects - */ - private static void collectAndFilterFiles(List s3Objects, - BiPredicate, String> predicate, List matchers, List filesOnly) { - for (S3Object object : s3Objects) { - // skip folders - if (object.key().endsWith("/")) { - continue; - } - - // No filter, add file - if (predicate.test(matchers, object.key())) { - filesOnly.add(object); - } - } - } - } - - /* - * Note: Azure Blob and Azure Datalake use identical authentication, so they are using the same properties. - * If they end up diverging, then properties for AzureBlob and AzureDataLake need to be created. - */ - public static class Azure { - private Azure() { - throw new AssertionError("do not instantiate"); - } - - /** - * Builds the Azure storage account using the provided configuration - * - * @param configuration properties - * @return client - */ - public static BlobServiceClient buildAzureBlobClient(IApplicationContext appCtx, - Map configuration) throws CompilationException { - String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME); - String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME); - String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME); - String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME); - String tenantId = configuration.get(TENANT_ID_FIELD_NAME); - String clientId = configuration.get(CLIENT_ID_FIELD_NAME); - String clientSecret = configuration.get(CLIENT_SECRET_FIELD_NAME); - String clientCertificate = configuration.get(CLIENT_CERTIFICATE_FIELD_NAME); - String clientCertificatePassword = configuration.get(CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME); - String endpoint = configuration.get(ENDPOINT_FIELD_NAME); - - // Client builder - BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); - int timeout = appCtx.getExternalProperties().getAzureRequestTimeout(); - RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null); - builder.retryOptions(requestRetryOptions); - - // Endpoint is required - if (endpoint == null) { - throw new CompilationException(PARAMETERS_REQUIRED, ENDPOINT_FIELD_NAME); - } - builder.endpoint(endpoint); - - // Shared Key - if (accountName != null || accountKey != null) { - if (accountName == null) { - throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_NAME_FIELD_NAME, - ACCOUNT_KEY_FIELD_NAME); - } - - if (accountKey == null) { - throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_KEY_FIELD_NAME, - ACCOUNT_NAME_FIELD_NAME); - } - - Optional provided = getFirstNotNull(configuration, SHARED_ACCESS_SIGNATURE_FIELD_NAME, - MANAGED_IDENTITY_ID_FIELD_NAME, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, - CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME); - if (provided.isPresent()) { - throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(), - ACCOUNT_KEY_FIELD_NAME); - } - StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey); - builder.credential(credential); - } - - // Shared access signature - if (sharedAccessSignature != null) { - Optional provided = getFirstNotNull(configuration, MANAGED_IDENTITY_ID_FIELD_NAME, - CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, - CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME); - if (provided.isPresent()) { - throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(), - SHARED_ACCESS_SIGNATURE_FIELD_NAME); - } - AzureSasCredential credential = new AzureSasCredential(sharedAccessSignature); - builder.credential(credential); - } - - // Managed Identity auth - if (managedIdentityId != null) { - Optional provided = getFirstNotNull(configuration, CLIENT_ID_FIELD_NAME, - CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, - TENANT_ID_FIELD_NAME); - if (provided.isPresent()) { - throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(), - MANAGED_IDENTITY_ID_FIELD_NAME); - } - builder.credential(new ManagedIdentityCredentialBuilder().clientId(managedIdentityId).build()); - } - - // Client secret & certificate auth - if (clientId != null) { - // Both (or neither) client secret and client secret were provided, only one is allowed - if ((clientSecret == null) == (clientCertificate == null)) { - if (clientSecret != null) { - throw new CompilationException(PARAMETERS_NOT_ALLOWED_AT_SAME_TIME, CLIENT_SECRET_FIELD_NAME, - CLIENT_CERTIFICATE_FIELD_NAME); - } else { - throw new CompilationException(REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT, - CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_ID_FIELD_NAME); - } - } - - // Tenant ID is required - if (tenantId == null) { - throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, TENANT_ID_FIELD_NAME, - CLIENT_ID_FIELD_NAME); - } - - // Client certificate password is not allowed if client secret is used - if (clientCertificatePassword != null && clientSecret != null) { - throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, - CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, CLIENT_SECRET_FIELD_NAME); - } - - // Use AD authentication - if (clientSecret != null) { - ClientSecretCredentialBuilder secret = new ClientSecretCredentialBuilder(); - secret.clientId(clientId); - secret.tenantId(tenantId); - secret.clientSecret(clientSecret); - builder.credential(secret.build()); - } else { - // Certificate - ClientCertificateCredentialBuilder certificate = new ClientCertificateCredentialBuilder(); - certificate.clientId(clientId); - certificate.tenantId(tenantId); - try { - InputStream certificateContent = new ByteArrayInputStream(clientCertificate.getBytes(UTF_8)); - if (clientCertificatePassword == null) { - Method pemCertificate = ClientCertificateCredentialBuilder.class - .getDeclaredMethod("pemCertificate", InputStream.class); - pemCertificate.setAccessible(true); - pemCertificate.invoke(certificate, certificateContent); - } else { - Method pemCertificate = ClientCertificateCredentialBuilder.class - .getDeclaredMethod("pfxCertificate", InputStream.class, String.class); - pemCertificate.setAccessible(true); - pemCertificate.invoke(certificate, certificateContent, clientCertificatePassword); - } - } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) { - throw new CompilationException(EXTERNAL_SOURCE_ERROR, ex.getMessage()); - } - builder.credential(certificate.build()); - } - } - - // If client id is not present, ensure client secret, certificate, tenant id and client certificate - // password are not present - if (clientId == null) { - Optional provided = getFirstNotNull(configuration, CLIENT_SECRET_FIELD_NAME, - CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME); - if (provided.isPresent()) { - throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(), - SHARED_ACCESS_SIGNATURE_FIELD_NAME); - } - } - - try { - return builder.buildClient(); - } catch (Exception ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); - } - } - - /** - * Builds the Azure data lake storage account using the provided configuration - * - * @param configuration properties - * @return client - */ - public static DataLakeServiceClient buildAzureDatalakeClient(IApplicationContext appCtx, - Map configuration) throws CompilationException { - String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME); - String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME); - String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME); - String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME); - String tenantId = configuration.get(TENANT_ID_FIELD_NAME); - String clientId = configuration.get(CLIENT_ID_FIELD_NAME); - String clientSecret = configuration.get(CLIENT_SECRET_FIELD_NAME); - String clientCertificate = configuration.get(CLIENT_CERTIFICATE_FIELD_NAME); - String clientCertificatePassword = configuration.get(CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME); - String endpoint = configuration.get(ENDPOINT_FIELD_NAME); - - // Client builder - DataLakeServiceClientBuilder builder = new DataLakeServiceClientBuilder(); - int timeout = appCtx.getExternalProperties().getAzureRequestTimeout(); - RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null); - builder.retryOptions(requestRetryOptions); - - // Endpoint is required - if (endpoint == null) { - throw new CompilationException(PARAMETERS_REQUIRED, ENDPOINT_FIELD_NAME); - } - builder.endpoint(endpoint); - - // Shared Key - if (accountName != null || accountKey != null) { - if (accountName == null) { - throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_NAME_FIELD_NAME, - ACCOUNT_KEY_FIELD_NAME); - } - - if (accountKey == null) { - throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_KEY_FIELD_NAME, - ACCOUNT_NAME_FIELD_NAME); - } - - Optional provided = getFirstNotNull(configuration, SHARED_ACCESS_SIGNATURE_FIELD_NAME, - MANAGED_IDENTITY_ID_FIELD_NAME, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, - CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME); - if (provided.isPresent()) { - throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(), - ACCOUNT_KEY_FIELD_NAME); - } - StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey); - builder.credential(credential); - } - - // Shared access signature - if (sharedAccessSignature != null) { - Optional provided = getFirstNotNull(configuration, MANAGED_IDENTITY_ID_FIELD_NAME, - CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, - CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME); - if (provided.isPresent()) { - throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(), - SHARED_ACCESS_SIGNATURE_FIELD_NAME); - } - AzureSasCredential credential = new AzureSasCredential(sharedAccessSignature); - builder.credential(credential); - } - - // Managed Identity auth - if (managedIdentityId != null) { - Optional provided = getFirstNotNull(configuration, CLIENT_ID_FIELD_NAME, - CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, - TENANT_ID_FIELD_NAME); - if (provided.isPresent()) { - throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(), - MANAGED_IDENTITY_ID_FIELD_NAME); - } - builder.credential(new ManagedIdentityCredentialBuilder().clientId(managedIdentityId).build()); - } - - // Client secret & certificate auth - if (clientId != null) { - // Both (or neither) client secret and client secret were provided, only one is allowed - if ((clientSecret == null) == (clientCertificate == null)) { - if (clientSecret != null) { - throw new CompilationException(PARAMETERS_NOT_ALLOWED_AT_SAME_TIME, CLIENT_SECRET_FIELD_NAME, - CLIENT_CERTIFICATE_FIELD_NAME); - } else { - throw new CompilationException(REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT, - CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_ID_FIELD_NAME); - } - } - - // Tenant ID is required - if (tenantId == null) { - throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, TENANT_ID_FIELD_NAME, - CLIENT_ID_FIELD_NAME); - } - - // Client certificate password is not allowed if client secret is used - if (clientCertificatePassword != null && clientSecret != null) { - throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, - CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, CLIENT_SECRET_FIELD_NAME); - } - - // Use AD authentication - if (clientSecret != null) { - ClientSecretCredentialBuilder secret = new ClientSecretCredentialBuilder(); - secret.clientId(clientId); - secret.tenantId(tenantId); - secret.clientSecret(clientSecret); - builder.credential(secret.build()); - } else { - // Certificate - ClientCertificateCredentialBuilder certificate = new ClientCertificateCredentialBuilder(); - certificate.clientId(clientId); - certificate.tenantId(tenantId); - try { - InputStream certificateContent = new ByteArrayInputStream(clientCertificate.getBytes(UTF_8)); - if (clientCertificatePassword == null) { - Method pemCertificate = ClientCertificateCredentialBuilder.class - .getDeclaredMethod("pemCertificate", InputStream.class); - pemCertificate.setAccessible(true); - pemCertificate.invoke(certificate, certificateContent); - } else { - Method pemCertificate = ClientCertificateCredentialBuilder.class - .getDeclaredMethod("pfxCertificate", InputStream.class, String.class); - pemCertificate.setAccessible(true); - pemCertificate.invoke(certificate, certificateContent, clientCertificatePassword); - } - } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) { - throw new CompilationException(EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); - } - builder.credential(certificate.build()); - } - } - - // If client id is not present, ensure client secret, certificate, tenant id and client certificate - // password are not present - if (clientId == null) { - Optional provided = getFirstNotNull(configuration, CLIENT_SECRET_FIELD_NAME, - CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME); - if (provided.isPresent()) { - throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(), - SHARED_ACCESS_SIGNATURE_FIELD_NAME); - } - } - - try { - return builder.buildClient(); - } catch (Exception ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); - } - } - - public static List listBlobItems(BlobServiceClient blobServiceClient, - Map configuration, IncludeExcludeMatcher includeExcludeMatcher, - IWarningCollector warningCollector) throws CompilationException { - String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); - - List filesOnly = new ArrayList<>(); - - // Ensure the validity of include/exclude - ExternalDataUtils.validateIncludeExclude(configuration); - - BlobContainerClient blobContainer; - try { - blobContainer = blobServiceClient.getBlobContainerClient(container); - - // Get all objects in a container and extract the paths to files - ListBlobsOptions listBlobsOptions = new ListBlobsOptions(); - listBlobsOptions.setPrefix(ExternalDataUtils.getPrefix(configuration)); - Iterable blobItems = blobContainer.listBlobs(listBlobsOptions, null); - - // Collect the paths to files only - collectAndFilterBlobFiles(blobItems, includeExcludeMatcher.getPredicate(), - includeExcludeMatcher.getMatchersList(), filesOnly); - - // Warn if no files are returned - if (filesOnly.isEmpty() && warningCollector.shouldWarn()) { - Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); - warningCollector.warn(warning); - } - } catch (Exception ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); - } - - return filesOnly; - } - - /** - * Collects and filters the files only, and excludes any folders - * - * @param items storage items - * @param predicate predicate to test with for file filtration - * @param matchers include/exclude matchers to test against - * @param filesOnly List containing the files only (excluding folders) - */ - private static void collectAndFilterBlobFiles(Iterable items, - BiPredicate, String> predicate, List matchers, List filesOnly) { - for (BlobItem item : items) { - String uri = item.getName(); - - // skip folders - if (uri.endsWith("/")) { - continue; - } - - // No filter, add file - if (predicate.test(matchers, uri)) { - filesOnly.add(item); - } - } - } - - public static List listDatalakePathItems(DataLakeServiceClient client, - Map configuration, IncludeExcludeMatcher includeExcludeMatcher, - IWarningCollector warningCollector) throws CompilationException { - String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); - - List filesOnly = new ArrayList<>(); - - // Ensure the validity of include/exclude - ExternalDataUtils.validateIncludeExclude(configuration); - - DataLakeFileSystemClient fileSystemClient; - try { - fileSystemClient = client.getFileSystemClient(container); - - // Get all objects in a container and extract the paths to files - ListPathsOptions listOptions = new ListPathsOptions(); - boolean recursive = Boolean.parseBoolean(configuration.get(RECURSIVE_FIELD_NAME)); - listOptions.setRecursive(recursive); - listOptions.setPath(ExternalDataUtils.getPrefix(configuration, false)); - PagedIterable pathItems = fileSystemClient.listPaths(listOptions, null); - - // Collect the paths to files only - collectAndFilterDatalakeFiles(pathItems, includeExcludeMatcher.getPredicate(), - includeExcludeMatcher.getMatchersList(), filesOnly); - - // Warn if no files are returned - if (filesOnly.isEmpty() && warningCollector.shouldWarn()) { - Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); - warningCollector.warn(warning); - } - } catch (Exception ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); - } - - return filesOnly; - } - - /** - * Collects and filters the files only, and excludes any folders - * - * @param items storage items - * @param predicate predicate to test with for file filtration - * @param matchers include/exclude matchers to test against - * @param filesOnly List containing the files only (excluding folders) - */ - private static void collectAndFilterDatalakeFiles(Iterable items, - BiPredicate, String> predicate, List matchers, List filesOnly) { - for (PathItem item : items) { - String uri = item.getName(); - - // skip folders - if (uri.endsWith("/")) { - continue; - } - - // No filter, add file - if (predicate.test(matchers, uri)) { - filesOnly.add(item); - } - } - } - - /** - * Validate external dataset properties - * - * @param configuration properties - * @throws CompilationException Compilation exception - */ - public static void validateAzureBlobProperties(Map configuration, SourceLocation srcLoc, - IWarningCollector collector, IApplicationContext appCtx) throws CompilationException { - - // check if the format property is present - if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { - throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); - } - - validateIncludeExclude(configuration); - - // Check if the bucket is present - BlobServiceClient blobServiceClient; - try { - String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); - blobServiceClient = buildAzureBlobClient(appCtx, configuration); - BlobContainerClient blobContainer = blobServiceClient.getBlobContainerClient(container); - - // Get all objects in a container and extract the paths to files - ListBlobsOptions listBlobsOptions = new ListBlobsOptions(); - listBlobsOptions.setPrefix(getPrefix(configuration)); - Iterable blobItems = blobContainer.listBlobs(listBlobsOptions, null); - - if (!blobItems.iterator().hasNext() && collector.shouldWarn()) { - Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); - collector.warn(warning); - } - } catch (CompilationException ex) { - throw ex; - } catch (Exception ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); - } - } - - /** - * Validate external dataset properties - * - * @param configuration properties - * @throws CompilationException Compilation exception - */ - public static void validateAzureDataLakeProperties(Map configuration, SourceLocation srcLoc, - IWarningCollector collector, IApplicationContext appCtx) throws CompilationException { - - // check if the format property is present - if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { - throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); - } - - validateIncludeExclude(configuration); - - // Check if the bucket is present - DataLakeServiceClient dataLakeServiceClient; - try { - String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); - dataLakeServiceClient = buildAzureDatalakeClient(appCtx, configuration); - DataLakeFileSystemClient fileSystemClient = dataLakeServiceClient.getFileSystemClient(container); - - // Get all objects in a container and extract the paths to files - ListPathsOptions listPathsOptions = new ListPathsOptions(); - listPathsOptions.setPath(getPrefix(configuration)); - Iterable blobItems = fileSystemClient.listPaths(listPathsOptions, null); - - if (!blobItems.iterator().hasNext() && collector.shouldWarn()) { - Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); - collector.warn(warning); - } - } catch (CompilationException ex) { - throw ex; - } catch (Exception ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); - } - } - - /** - * Builds the Azure Blob storage client using the provided configuration - * - * @param configuration properties - * @see Azure - * Blob storage - */ - public static void configureAzureHdfsJobConf(JobConf conf, Map configuration, String endPoint) { - String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); - String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME); - String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME); - - //Disable caching S3 FileSystem - HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_AZURE_BLOB_PROTOCOL); - - //Key for Hadoop configuration - StringBuilder hadoopKey = new StringBuilder(); - //Value for Hadoop configuration - String hadoopValue; - if (accountKey != null || sharedAccessSignature != null) { - if (accountKey != null) { - hadoopKey.append(HADOOP_AZURE_FS_ACCOUNT_KEY).append('.'); - //Set only the AccountKey - hadoopValue = accountKey; - } else { - //Use SAS for Hadoop FS as connectionString is provided - hadoopKey.append(HADOOP_AZURE_FS_SAS).append('.'); - //Setting the container is required for SAS - hadoopKey.append(container).append('.'); - //Set the connection string for SAS - hadoopValue = sharedAccessSignature; - } - //Set the endPoint, which includes the AccountName - hadoopKey.append(endPoint); - //Tells Hadoop we are reading from Blob Storage - conf.set(hadoopKey.toString(), hadoopValue); - } - } - } - - public static class GCS { - private GCS() { - throw new AssertionError("do not instantiate"); - - } - - //TODO(htowaileb): Add validation step similar to other externals, which also checks if empty bucket - //upon creating the external dataset - - /** - * Builds the client using the provided configuration - * - * @param configuration properties - * @return clientasterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java - * @throws CompilationException CompilationException - */ - public static Storage buildClient(Map configuration) throws CompilationException { - String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME); - - StorageOptions.Builder builder = StorageOptions.newBuilder(); - - // Use credentials if available - if (jsonCredentials != null) { - try (InputStream credentialsStream = new ByteArrayInputStream(jsonCredentials.getBytes())) { - builder.setCredentials(ServiceAccountCredentials.fromStream(credentialsStream)); - } catch (IOException ex) { - throw new CompilationException(EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); - } - } - - return builder.build().getService(); - } - - /** - * Validate external dataset properties - * - * @param configuration properties - * @throws CompilationException Compilation exception - */ - public static void validateProperties(Map configuration, SourceLocation srcLoc, - IWarningCollector collector) throws CompilationException { - - // check if the format property is present - if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { - throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); - } - - // parquet is not supported for google cloud storage - if (isParquetFormat(configuration)) { - throw new CompilationException(INVALID_REQ_PARAM_VAL, srcLoc, KEY_FORMAT, - configuration.get(KEY_FORMAT)); - } - - validateIncludeExclude(configuration); - String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); - - try { - BlobListOption limitOption = BlobListOption.pageSize(1); - BlobListOption prefixOption = BlobListOption.prefix(getPrefix(configuration)); - Storage storage = buildClient(configuration); - Page items = storage.list(container, limitOption, prefixOption); - - if (!items.iterateAll().iterator().hasNext() && collector.shouldWarn()) { - Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); - collector.warn(warning); - } - } catch (CompilationException ex) { - throw ex; - } catch (Exception ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); - } - } - } - public static int roundUpToNearestFrameSize(int size, int framesize) { return ((size / framesize) + 1) * framesize; } @@ -1911,7 +827,7 @@ public static int getArgBufferSize() { return maxArgSz; } - private static Optional getFirstNotNull(Map configuration, String... parameters) { + public static Optional getFirstNotNull(Map configuration, String... parameters) { return Arrays.stream(parameters).filter(field -> configuration.get(field) != null).findFirst(); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java new file mode 100644 index 00000000000..e1c10ad6ef2 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.util.aws.s3; + +public class S3Constants { + private S3Constants() { + throw new AssertionError("do not instantiate"); + } + + public static final String REGION_FIELD_NAME = "region"; + public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId"; + public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey"; + public static final String SESSION_TOKEN_FIELD_NAME = "sessionToken"; + public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint"; + + // AWS S3 specific error codes + public static final String ERROR_INTERNAL_ERROR = "InternalError"; + public static final String ERROR_SLOW_DOWN = "SlowDown"; + public static final String ERROR_METHOD_NOT_IMPLEMENTED = "NotImplemented"; + + /* + * Hadoop-AWS + * AWS connectors for s3 and s3n are deprecated. + */ + public static final String HADOOP_ACCESS_KEY_ID = "fs.s3a.access.key"; + public static final String HADOOP_SECRET_ACCESS_KEY = "fs.s3a.secret.key"; + public static final String HADOOP_SESSION_TOKEN = "fs.s3a.session.token"; + public static final String HADOOP_REGION = "fs.s3a.region"; + public static final String HADOOP_SERVICE_END_POINT = "fs.s3a.endpoint"; + + /* + * Internal configurations + */ + //Allows accessing directories as file system path + public static final String HADOOP_PATH_STYLE_ACCESS = "fs.s3a.path.style.access"; + //The number of maximum HTTP connections in connection pool + public static final String HADOOP_S3_CONNECTION_POOL_SIZE = "fs.s3a.connection.maximum"; + //S3 used protocol + public static final String HADOOP_S3_PROTOCOL = "s3a"; + + //Hadoop credentials provider key + public static final String HADOOP_CREDENTIAL_PROVIDER_KEY = "fs.s3a.aws.credentials.provider"; + //Anonymous credential provider + public static final String HADOOP_ANONYMOUS_ACCESS = "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"; + //Temporary credential provider + public static final String HADOOP_TEMP_ACCESS = "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider"; +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java new file mode 100644 index 00000000000..a88d59be789 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.util.aws.s3; + +import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT; +import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED; +import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix; +import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude; +import static org.apache.asterix.external.util.aws.s3.S3Constants.*; +import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.BiPredicate; +import java.util.regex.Matcher; + +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.HDFSUtils; +import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hyracks.api.exceptions.IWarningCollector; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.api.exceptions.Warning; +import org.apache.hyracks.api.util.CleanupUtils; + +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.services.s3.model.S3Response; + +public class S3Utils { + private S3Utils() { + throw new AssertionError("do not instantiate"); + } + + public static boolean isRetryableError(String errorCode) { + return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN); + } + + /** + * Builds the S3 client using the provided configuration + * + * @param configuration properties + * @return S3 client + * @throws CompilationException CompilationException + */ + public static S3Client buildAwsS3Client(Map configuration) throws CompilationException { + // TODO(Hussain): Need to ensure that all required parameters are present in a previous step + String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME); + String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME); + String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME); + String regionId = configuration.get(REGION_FIELD_NAME); + String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME); + + S3ClientBuilder builder = S3Client.builder(); + + // Credentials + AwsCredentialsProvider credentialsProvider; + + // No auth required + if (accessKeyId == null) { + credentialsProvider = AnonymousCredentialsProvider.create(); + } else { + // auth required, check for temporary or permanent credentials + if (sessionToken != null) { + credentialsProvider = StaticCredentialsProvider + .create(AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken)); + } else { + credentialsProvider = + StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey)); + } + } + + builder.credentialsProvider(credentialsProvider); + + // Validate the region + List regions = S3Client.serviceMetadata().regions(); + Optional selectedRegion = regions.stream().filter(region -> region.id().equals(regionId)).findFirst(); + + if (selectedRegion.isEmpty()) { + throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId); + } + builder.region(selectedRegion.get()); + + // Validate the service endpoint if present + if (serviceEndpoint != null) { + try { + URI uri = new URI(serviceEndpoint); + try { + builder.endpointOverride(uri); + } catch (NullPointerException ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); + } + } catch (URISyntaxException ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, + String.format("Invalid service endpoint %s", serviceEndpoint)); + } + } + + return builder.build(); + } + + /** + * Builds the S3 client using the provided configuration + * + * @param configuration properties + * @param numberOfPartitions number of partitions in the cluster + */ + public static void configureAwsS3HdfsJobConf(JobConf conf, Map configuration, + int numberOfPartitions) { + String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME); + String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME); + String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME); + String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME); + + //Disable caching S3 FileSystem + HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_S3_PROTOCOL); + + /* + * Authentication Methods: + * 1- Anonymous: no accessKeyId and no secretAccessKey + * 2- Temporary: has to provide accessKeyId, secretAccessKey and sessionToken + * 3- Private: has to provide accessKeyId and secretAccessKey + */ + if (accessKeyId == null) { + //Tells hadoop-aws it is an anonymous access + conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS_ACCESS); + } else { + conf.set(HADOOP_ACCESS_KEY_ID, accessKeyId); + conf.set(HADOOP_SECRET_ACCESS_KEY, secretAccessKey); + if (sessionToken != null) { + conf.set(HADOOP_SESSION_TOKEN, sessionToken); + //Tells hadoop-aws it is a temporary access + conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_TEMP_ACCESS); + } + } + + /* + * This is to allow S3 definition to have path-style form. Should always be true to match the current + * way we access files in S3 + */ + conf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE); + + /* + * Set the size of S3 connection pool to be the number of partitions + */ + conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions)); + + if (serviceEndpoint != null) { + // Validation of the URL should be done at hadoop-aws level + conf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint); + } else { + //Region is ignored and buckets could be found by the central endpoint + conf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT); + } + } + + /** + * Validate external dataset properties + * + * @param configuration properties + * @throws CompilationException Compilation exception + */ + public static void validateProperties(Map configuration, SourceLocation srcLoc, + IWarningCollector collector) throws CompilationException { + + // check if the format property is present + if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { + throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); + } + + // Both parameters should be passed, or neither should be passed (for anonymous/no auth) + String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME); + String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME); + if (accessKeyId == null || secretAccessKey == null) { + // If one is passed, the other is required + if (accessKeyId != null) { + throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME, + ACCESS_KEY_ID_FIELD_NAME); + } else if (secretAccessKey != null) { + throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME, + SECRET_ACCESS_KEY_FIELD_NAME); + } + } + + validateIncludeExclude(configuration); + + // Check if the bucket is present + S3Client s3Client = buildAwsS3Client(configuration); + S3Response response; + boolean useOldApi = false; + String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); + String prefix = getPrefix(configuration); + + try { + response = isBucketEmpty(s3Client, container, prefix, false); + } catch (S3Exception ex) { + // Method not implemented, try falling back to old API + try { + // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html + if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) { + useOldApi = true; + response = isBucketEmpty(s3Client, container, prefix, true); + } else { + throw ex; + } + } catch (SdkException ex2) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); + } + } catch (SdkException ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); + } finally { + if (s3Client != null) { + CleanupUtils.close(s3Client, null); + } + } + + boolean isEmpty = useOldApi ? ((ListObjectsResponse) response).contents().isEmpty() + : ((ListObjectsV2Response) response).contents().isEmpty(); + if (isEmpty && collector.shouldWarn()) { + Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); + collector.warn(warning); + } + + // Returns 200 only in case the bucket exists, otherwise, throws an exception. However, to + // ensure coverage, check if the result is successful as well and not only catch exceptions + if (!response.sdkHttpResponse().isSuccessful()) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container); + } + } + + /** + * Checks for a single object in the specified bucket to determine if the bucket is empty or not. + * + * @param s3Client s3 client + * @param container the container name + * @param prefix Prefix to be used + * @param useOldApi flag whether to use the old API or not + * @return returns the S3 response + */ + private static S3Response isBucketEmpty(S3Client s3Client, String container, String prefix, boolean useOldApi) { + S3Response response; + if (useOldApi) { + ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder(); + listObjectsBuilder.prefix(prefix); + response = s3Client.listObjects(listObjectsBuilder.bucket(container).maxKeys(1).build()); + } else { + ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder(); + listObjectsBuilder.prefix(prefix); + response = s3Client.listObjectsV2(listObjectsBuilder.bucket(container).maxKeys(1).build()); + } + return response; + } + + /** + * Returns the lists of S3 objects. + * + * @param configuration properties + * @param includeExcludeMatcher include/exclude matchers to apply + */ + public static List listS3Objects(Map configuration, + AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher, + IWarningCollector warningCollector) throws CompilationException { + // Prepare to retrieve the objects + List filesOnly; + String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); + S3Client s3Client = buildAwsS3Client(configuration); + String prefix = getPrefix(configuration); + + try { + filesOnly = listS3Objects(s3Client, container, prefix, includeExcludeMatcher); + } catch (S3Exception ex) { + // New API is not implemented, try falling back to old API + try { + // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html + if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) { + filesOnly = oldApiListS3Objects(s3Client, container, prefix, includeExcludeMatcher); + } else { + throw ex; + } + } catch (SdkException ex2) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); + } + } catch (SdkException ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); + } finally { + if (s3Client != null) { + CleanupUtils.close(s3Client, null); + } + } + + // Warn if no files are returned + if (filesOnly.isEmpty() && warningCollector.shouldWarn()) { + Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); + warningCollector.warn(warning); + } + + return filesOnly; + } + + /** + * Uses the latest API to retrieve the objects from the storage. + * + * @param s3Client S3 client + * @param container container name + * @param prefix definition prefix + * @param includeExcludeMatcher include/exclude matchers to apply + */ + private static List listS3Objects(S3Client s3Client, String container, String prefix, + AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher) { + String newMarker = null; + List filesOnly = new ArrayList<>(); + + ListObjectsV2Response listObjectsResponse; + ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container); + listObjectsBuilder.prefix(prefix); + + while (true) { + // List the objects from the start, or from the last marker in case of truncated result + if (newMarker == null) { + listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build()); + } else { + listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build()); + } + + // Collect the paths to files only + collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(), + includeExcludeMatcher.getMatchersList(), filesOnly); + + // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request + if (!listObjectsResponse.isTruncated()) { + break; + } else { + newMarker = listObjectsResponse.nextContinuationToken(); + } + } + + return filesOnly; + } + + /** + * Uses the old API (in case the new API is not implemented) to retrieve the objects from the storage + * + * @param s3Client S3 client + * @param container container name + * @param prefix definition prefix + * @param includeExcludeMatcher include/exclude matchers to apply + */ + private static List oldApiListS3Objects(S3Client s3Client, String container, String prefix, + AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher) { + String newMarker = null; + List filesOnly = new ArrayList<>(); + + ListObjectsResponse listObjectsResponse; + ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder().bucket(container); + listObjectsBuilder.prefix(prefix); + + while (true) { + // List the objects from the start, or from the last marker in case of truncated result + if (newMarker == null) { + listObjectsResponse = s3Client.listObjects(listObjectsBuilder.build()); + } else { + listObjectsResponse = s3Client.listObjects(listObjectsBuilder.marker(newMarker).build()); + } + + // Collect the paths to files only + collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(), + includeExcludeMatcher.getMatchersList(), filesOnly); + + // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request + if (!listObjectsResponse.isTruncated()) { + break; + } else { + newMarker = listObjectsResponse.nextMarker(); + } + } + + return filesOnly; + } + + /** + * AWS S3 returns all the objects as paths, not differentiating between folder and files. The path is considered + * a file if it does not end up with a "/" which is the separator in a folder structure. + * + * @param s3Objects List of returned objects + */ + private static void collectAndFilterFiles(List s3Objects, BiPredicate, String> predicate, + List matchers, List filesOnly) { + for (S3Object object : s3Objects) { + // skip folders + if (object.key().endsWith("/")) { + continue; + } + + // No filter, add file + if (predicate.test(matchers, object.key())) { + filesOnly.add(object); + } + } + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureConstants.java new file mode 100644 index 00000000000..9ade27baebd --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureConstants.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.util.azure.blob_storage; + +/* + * Note: Azure Blob and Azure Datalake use identical authentication, so they are using the same properties. + * If they end up diverging, then properties for AzureBlob and AzureDataLake need to be created. + */ +public class AzureConstants { + private AzureConstants() { + throw new AssertionError("do not instantiate"); + } + + /* + * Asterix Configuration Keys + */ + public static final String MANAGED_IDENTITY_ID_FIELD_NAME = "managedIdentityId"; + public static final String ACCOUNT_NAME_FIELD_NAME = "accountName"; + public static final String ACCOUNT_KEY_FIELD_NAME = "accountKey"; + public static final String SHARED_ACCESS_SIGNATURE_FIELD_NAME = "sharedAccessSignature"; + public static final String TENANT_ID_FIELD_NAME = "tenantId"; + public static final String CLIENT_ID_FIELD_NAME = "clientId"; + public static final String CLIENT_SECRET_FIELD_NAME = "clientSecret"; + public static final String CLIENT_CERTIFICATE_FIELD_NAME = "clientCertificate"; + public static final String CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME = "clientCertificatePassword"; + public static final String ENDPOINT_FIELD_NAME = "endpoint"; + + // Specific Azure data lake property + /* + The behavior of Data Lake (true file system) is to read the files of the specified prefix only, example: + storage/myData/personal/file1.json + storage/myData/personal/file2.json + storage/myData/file3.json + If the prefix used is "myData", then only the file file3.json is read. However, if the property "recursive" + is set to "true" when creating the external dataset, then it goes recursively overall the paths, and the result + is file1.json, file2.json and file3.json. + */ + public static final String RECURSIVE_FIELD_NAME = "recursive"; + + /* + * Hadoop-Azure + */ + //Used when accountName and accessKey are provided + public static final String HADOOP_AZURE_FS_ACCOUNT_KEY = "fs.azure.account.key"; + //Used when a connectionString is provided + public static final String HADOOP_AZURE_FS_SAS = "fs.azure.sas"; + public static final String HADOOP_AZURE_BLOB_PROTOCOL = "wasbs"; + public static final String HADOOP_AZURE_DATALAKE_PROTOCOL = "abfss"; +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureUtils.java new file mode 100644 index 00000000000..0dc9ad28070 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureUtils.java @@ -0,0 +1,636 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.util.azure.blob_storage; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR; +import static org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_NOT_ALLOWED_AT_SAME_TIME; +import static org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_REQUIRED; +import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT; +import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT; +import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT; +import static org.apache.asterix.external.util.ExternalDataUtils.getFirstNotNull; +import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix; +import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude; +import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.ACCOUNT_KEY_FIELD_NAME; +import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.ACCOUNT_NAME_FIELD_NAME; +import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.CLIENT_CERTIFICATE_FIELD_NAME; +import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME; +import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.CLIENT_ID_FIELD_NAME; +import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.CLIENT_SECRET_FIELD_NAME; +import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.ENDPOINT_FIELD_NAME; +import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.HADOOP_AZURE_BLOB_PROTOCOL; +import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.HADOOP_AZURE_FS_ACCOUNT_KEY; +import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.HADOOP_AZURE_FS_SAS; +import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.MANAGED_IDENTITY_ID_FIELD_NAME; +import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.RECURSIVE_FIELD_NAME; +import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.SHARED_ACCESS_SIGNATURE_FIELD_NAME; +import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.TENANT_ID_FIELD_NAME; +import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.BiPredicate; +import java.util.regex.Matcher; + +import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.HDFSUtils; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hyracks.api.exceptions.IWarningCollector; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.api.exceptions.Warning; + +import com.azure.core.credential.AzureSasCredential; +import com.azure.core.http.rest.PagedIterable; +import com.azure.identity.ClientCertificateCredentialBuilder; +import com.azure.identity.ClientSecretCredentialBuilder; +import com.azure.identity.ManagedIdentityCredentialBuilder; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.models.BlobItem; +import com.azure.storage.blob.models.ListBlobsOptions; +import com.azure.storage.common.StorageSharedKeyCredential; +import com.azure.storage.common.policy.RequestRetryOptions; +import com.azure.storage.file.datalake.DataLakeFileSystemClient; +import com.azure.storage.file.datalake.DataLakeServiceClient; +import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; +import com.azure.storage.file.datalake.models.ListPathsOptions; +import com.azure.storage.file.datalake.models.PathItem; + +public class AzureUtils { + private AzureUtils() { + throw new AssertionError("do not instantiate"); + } + + /** + * Builds the Azure storage account using the provided configuration + * + * @param configuration properties + * @return client + */ + public static BlobServiceClient buildAzureBlobClient(IApplicationContext appCtx, Map configuration) + throws CompilationException { + String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME); + String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME); + String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME); + String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME); + String tenantId = configuration.get(TENANT_ID_FIELD_NAME); + String clientId = configuration.get(CLIENT_ID_FIELD_NAME); + String clientSecret = configuration.get(CLIENT_SECRET_FIELD_NAME); + String clientCertificate = configuration.get(CLIENT_CERTIFICATE_FIELD_NAME); + String clientCertificatePassword = configuration.get(CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME); + String endpoint = configuration.get(ENDPOINT_FIELD_NAME); + + // Client builder + BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); + int timeout = appCtx.getExternalProperties().getAzureRequestTimeout(); + RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null); + builder.retryOptions(requestRetryOptions); + + // Endpoint is required + if (endpoint == null) { + throw new CompilationException(PARAMETERS_REQUIRED, ENDPOINT_FIELD_NAME); + } + builder.endpoint(endpoint); + + // Shared Key + if (accountName != null || accountKey != null) { + if (accountName == null) { + throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_NAME_FIELD_NAME, + ACCOUNT_KEY_FIELD_NAME); + } + + if (accountKey == null) { + throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_KEY_FIELD_NAME, + ACCOUNT_NAME_FIELD_NAME); + } + + Optional provided = getFirstNotNull(configuration, SHARED_ACCESS_SIGNATURE_FIELD_NAME, + MANAGED_IDENTITY_ID_FIELD_NAME, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, + CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME); + if (provided.isPresent()) { + throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(), + ACCOUNT_KEY_FIELD_NAME); + } + StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey); + builder.credential(credential); + } + + // Shared access signature + if (sharedAccessSignature != null) { + Optional provided = getFirstNotNull(configuration, MANAGED_IDENTITY_ID_FIELD_NAME, + CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, + CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME); + if (provided.isPresent()) { + throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(), + SHARED_ACCESS_SIGNATURE_FIELD_NAME); + } + AzureSasCredential credential = new AzureSasCredential(sharedAccessSignature); + builder.credential(credential); + } + + // Managed Identity auth + if (managedIdentityId != null) { + Optional provided = getFirstNotNull(configuration, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, + CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME); + if (provided.isPresent()) { + throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(), + MANAGED_IDENTITY_ID_FIELD_NAME); + } + builder.credential(new ManagedIdentityCredentialBuilder().clientId(managedIdentityId).build()); + } + + // Client secret & certificate auth + if (clientId != null) { + // Both (or neither) client secret and client secret were provided, only one is allowed + if ((clientSecret == null) == (clientCertificate == null)) { + if (clientSecret != null) { + throw new CompilationException(PARAMETERS_NOT_ALLOWED_AT_SAME_TIME, CLIENT_SECRET_FIELD_NAME, + CLIENT_CERTIFICATE_FIELD_NAME); + } else { + throw new CompilationException(REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT, + CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_ID_FIELD_NAME); + } + } + + // Tenant ID is required + if (tenantId == null) { + throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, TENANT_ID_FIELD_NAME, + CLIENT_ID_FIELD_NAME); + } + + // Client certificate password is not allowed if client secret is used + if (clientCertificatePassword != null && clientSecret != null) { + throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, + CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, CLIENT_SECRET_FIELD_NAME); + } + + // Use AD authentication + if (clientSecret != null) { + ClientSecretCredentialBuilder secret = new ClientSecretCredentialBuilder(); + secret.clientId(clientId); + secret.tenantId(tenantId); + secret.clientSecret(clientSecret); + builder.credential(secret.build()); + } else { + // Certificate + ClientCertificateCredentialBuilder certificate = new ClientCertificateCredentialBuilder(); + certificate.clientId(clientId); + certificate.tenantId(tenantId); + try { + InputStream certificateContent = new ByteArrayInputStream(clientCertificate.getBytes(UTF_8)); + if (clientCertificatePassword == null) { + Method pemCertificate = ClientCertificateCredentialBuilder.class + .getDeclaredMethod("pemCertificate", InputStream.class); + pemCertificate.setAccessible(true); + pemCertificate.invoke(certificate, certificateContent); + } else { + Method pemCertificate = ClientCertificateCredentialBuilder.class + .getDeclaredMethod("pfxCertificate", InputStream.class, String.class); + pemCertificate.setAccessible(true); + pemCertificate.invoke(certificate, certificateContent, clientCertificatePassword); + } + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) { + throw new CompilationException(EXTERNAL_SOURCE_ERROR, ex.getMessage()); + } + builder.credential(certificate.build()); + } + } + + // If client id is not present, ensure client secret, certificate, tenant id and client certificate + // password are not present + if (clientId == null) { + Optional provided = getFirstNotNull(configuration, CLIENT_SECRET_FIELD_NAME, + CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME); + if (provided.isPresent()) { + throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(), + SHARED_ACCESS_SIGNATURE_FIELD_NAME); + } + } + + try { + return builder.buildClient(); + } catch (Exception ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); + } + } + + /** + * Builds the Azure data lake storage account using the provided configuration + * + * @param configuration properties + * @return client + */ + public static DataLakeServiceClient buildAzureDatalakeClient(IApplicationContext appCtx, + Map configuration) throws CompilationException { + String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME); + String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME); + String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME); + String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME); + String tenantId = configuration.get(TENANT_ID_FIELD_NAME); + String clientId = configuration.get(CLIENT_ID_FIELD_NAME); + String clientSecret = configuration.get(CLIENT_SECRET_FIELD_NAME); + String clientCertificate = configuration.get(CLIENT_CERTIFICATE_FIELD_NAME); + String clientCertificatePassword = configuration.get(CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME); + String endpoint = configuration.get(ENDPOINT_FIELD_NAME); + + // Client builder + DataLakeServiceClientBuilder builder = new DataLakeServiceClientBuilder(); + int timeout = appCtx.getExternalProperties().getAzureRequestTimeout(); + RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null); + builder.retryOptions(requestRetryOptions); + + // Endpoint is required + if (endpoint == null) { + throw new CompilationException(PARAMETERS_REQUIRED, ENDPOINT_FIELD_NAME); + } + builder.endpoint(endpoint); + + // Shared Key + if (accountName != null || accountKey != null) { + if (accountName == null) { + throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_NAME_FIELD_NAME, + ACCOUNT_KEY_FIELD_NAME); + } + + if (accountKey == null) { + throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_KEY_FIELD_NAME, + ACCOUNT_NAME_FIELD_NAME); + } + + Optional provided = getFirstNotNull(configuration, SHARED_ACCESS_SIGNATURE_FIELD_NAME, + MANAGED_IDENTITY_ID_FIELD_NAME, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, + CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME); + if (provided.isPresent()) { + throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(), + ACCOUNT_KEY_FIELD_NAME); + } + StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey); + builder.credential(credential); + } + + // Shared access signature + if (sharedAccessSignature != null) { + Optional provided = getFirstNotNull(configuration, MANAGED_IDENTITY_ID_FIELD_NAME, + CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, + CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME); + if (provided.isPresent()) { + throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(), + SHARED_ACCESS_SIGNATURE_FIELD_NAME); + } + AzureSasCredential credential = new AzureSasCredential(sharedAccessSignature); + builder.credential(credential); + } + + // Managed Identity auth + if (managedIdentityId != null) { + Optional provided = getFirstNotNull(configuration, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, + CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME); + if (provided.isPresent()) { + throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(), + MANAGED_IDENTITY_ID_FIELD_NAME); + } + builder.credential(new ManagedIdentityCredentialBuilder().clientId(managedIdentityId).build()); + } + + // Client secret & certificate auth + if (clientId != null) { + // Both (or neither) client secret and client secret were provided, only one is allowed + if ((clientSecret == null) == (clientCertificate == null)) { + if (clientSecret != null) { + throw new CompilationException(PARAMETERS_NOT_ALLOWED_AT_SAME_TIME, CLIENT_SECRET_FIELD_NAME, + CLIENT_CERTIFICATE_FIELD_NAME); + } else { + throw new CompilationException(REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT, + CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_ID_FIELD_NAME); + } + } + + // Tenant ID is required + if (tenantId == null) { + throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, TENANT_ID_FIELD_NAME, + CLIENT_ID_FIELD_NAME); + } + + // Client certificate password is not allowed if client secret is used + if (clientCertificatePassword != null && clientSecret != null) { + throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, + CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, CLIENT_SECRET_FIELD_NAME); + } + + // Use AD authentication + if (clientSecret != null) { + ClientSecretCredentialBuilder secret = new ClientSecretCredentialBuilder(); + secret.clientId(clientId); + secret.tenantId(tenantId); + secret.clientSecret(clientSecret); + builder.credential(secret.build()); + } else { + // Certificate + ClientCertificateCredentialBuilder certificate = new ClientCertificateCredentialBuilder(); + certificate.clientId(clientId); + certificate.tenantId(tenantId); + try { + InputStream certificateContent = new ByteArrayInputStream(clientCertificate.getBytes(UTF_8)); + if (clientCertificatePassword == null) { + Method pemCertificate = ClientCertificateCredentialBuilder.class + .getDeclaredMethod("pemCertificate", InputStream.class); + pemCertificate.setAccessible(true); + pemCertificate.invoke(certificate, certificateContent); + } else { + Method pemCertificate = ClientCertificateCredentialBuilder.class + .getDeclaredMethod("pfxCertificate", InputStream.class, String.class); + pemCertificate.setAccessible(true); + pemCertificate.invoke(certificate, certificateContent, clientCertificatePassword); + } + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) { + throw new CompilationException(EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); + } + builder.credential(certificate.build()); + } + } + + // If client id is not present, ensure client secret, certificate, tenant id and client certificate + // password are not present + if (clientId == null) { + Optional provided = getFirstNotNull(configuration, CLIENT_SECRET_FIELD_NAME, + CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME); + if (provided.isPresent()) { + throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(), + SHARED_ACCESS_SIGNATURE_FIELD_NAME); + } + } + + try { + return builder.buildClient(); + } catch (Exception ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); + } + } + + public static List listBlobItems(BlobServiceClient blobServiceClient, Map configuration, + AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher, + IWarningCollector warningCollector) throws CompilationException { + String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); + + List filesOnly = new ArrayList<>(); + + // Ensure the validity of include/exclude + validateIncludeExclude(configuration); + + BlobContainerClient blobContainer; + try { + blobContainer = blobServiceClient.getBlobContainerClient(container); + + // Get all objects in a container and extract the paths to files + ListBlobsOptions listBlobsOptions = new ListBlobsOptions(); + listBlobsOptions.setPrefix(getPrefix(configuration)); + Iterable blobItems = blobContainer.listBlobs(listBlobsOptions, null); + + // Collect the paths to files only + collectAndFilterBlobFiles(blobItems, includeExcludeMatcher.getPredicate(), + includeExcludeMatcher.getMatchersList(), filesOnly); + + // Warn if no files are returned + if (filesOnly.isEmpty() && warningCollector.shouldWarn()) { + Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); + warningCollector.warn(warning); + } + } catch (Exception ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); + } + + return filesOnly; + } + + /** + * Collects and filters the files only, and excludes any folders + * + * @param items storage items + * @param predicate predicate to test with for file filtration + * @param matchers include/exclude matchers to test against + * @param filesOnly List containing the files only (excluding folders) + */ + private static void collectAndFilterBlobFiles(Iterable items, + BiPredicate, String> predicate, List matchers, List filesOnly) { + for (BlobItem item : items) { + String uri = item.getName(); + + // skip folders + if (uri.endsWith("/")) { + continue; + } + + // No filter, add file + if (predicate.test(matchers, uri)) { + filesOnly.add(item); + } + } + } + + public static List listDatalakePathItems(DataLakeServiceClient client, Map configuration, + AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher, + IWarningCollector warningCollector) throws CompilationException { + String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); + + List filesOnly = new ArrayList<>(); + + // Ensure the validity of include/exclude + validateIncludeExclude(configuration); + + DataLakeFileSystemClient fileSystemClient; + try { + fileSystemClient = client.getFileSystemClient(container); + + // Get all objects in a container and extract the paths to files + ListPathsOptions listOptions = new ListPathsOptions(); + boolean recursive = Boolean.parseBoolean(configuration.get(RECURSIVE_FIELD_NAME)); + listOptions.setRecursive(recursive); + listOptions.setPath(getPrefix(configuration, false)); + PagedIterable pathItems = fileSystemClient.listPaths(listOptions, null); + + // Collect the paths to files only + collectAndFilterDatalakeFiles(pathItems, includeExcludeMatcher.getPredicate(), + includeExcludeMatcher.getMatchersList(), filesOnly); + + // Warn if no files are returned + if (filesOnly.isEmpty() && warningCollector.shouldWarn()) { + Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); + warningCollector.warn(warning); + } + } catch (Exception ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); + } + + return filesOnly; + } + + /** + * Collects and filters the files only, and excludes any folders + * + * @param items storage items + * @param predicate predicate to test with for file filtration + * @param matchers include/exclude matchers to test against + * @param filesOnly List containing the files only (excluding folders) + */ + private static void collectAndFilterDatalakeFiles(Iterable items, + BiPredicate, String> predicate, List matchers, List filesOnly) { + for (PathItem item : items) { + String uri = item.getName(); + + // skip folders + if (uri.endsWith("/")) { + continue; + } + + // No filter, add file + if (predicate.test(matchers, uri)) { + filesOnly.add(item); + } + } + } + + /** + * Validate external dataset properties + * + * @param configuration properties + * @throws CompilationException Compilation exception + */ + public static void validateAzureBlobProperties(Map configuration, SourceLocation srcLoc, + IWarningCollector collector, IApplicationContext appCtx) throws CompilationException { + + // check if the format property is present + if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { + throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); + } + + validateIncludeExclude(configuration); + + // Check if the bucket is present + BlobServiceClient blobServiceClient; + try { + String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); + blobServiceClient = buildAzureBlobClient(appCtx, configuration); + BlobContainerClient blobContainer = blobServiceClient.getBlobContainerClient(container); + + // Get all objects in a container and extract the paths to files + ListBlobsOptions listBlobsOptions = new ListBlobsOptions(); + listBlobsOptions.setPrefix(getPrefix(configuration)); + Iterable blobItems = blobContainer.listBlobs(listBlobsOptions, null); + + if (!blobItems.iterator().hasNext() && collector.shouldWarn()) { + Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); + collector.warn(warning); + } + } catch (CompilationException ex) { + throw ex; + } catch (Exception ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); + } + } + + /** + * Validate external dataset properties + * + * @param configuration properties + * @throws CompilationException Compilation exception + */ + public static void validateAzureDataLakeProperties(Map configuration, SourceLocation srcLoc, + IWarningCollector collector, IApplicationContext appCtx) throws CompilationException { + + // check if the format property is present + if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { + throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); + } + + validateIncludeExclude(configuration); + + // Check if the bucket is present + DataLakeServiceClient dataLakeServiceClient; + try { + String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); + dataLakeServiceClient = buildAzureDatalakeClient(appCtx, configuration); + DataLakeFileSystemClient fileSystemClient = dataLakeServiceClient.getFileSystemClient(container); + + // Get all objects in a container and extract the paths to files + ListPathsOptions listPathsOptions = new ListPathsOptions(); + listPathsOptions.setPath(getPrefix(configuration)); + Iterable blobItems = fileSystemClient.listPaths(listPathsOptions, null); + + if (!blobItems.iterator().hasNext() && collector.shouldWarn()) { + Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); + collector.warn(warning); + } + } catch (CompilationException ex) { + throw ex; + } catch (Exception ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); + } + } + + /** + * Builds the Azure Blob storage client using the provided configuration + * + * @param configuration properties + * @see Azure + * Blob storage + */ + public static void configureAzureHdfsJobConf(JobConf conf, Map configuration, String endPoint) { + String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); + String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME); + String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME); + + //Disable caching S3 FileSystem + HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_AZURE_BLOB_PROTOCOL); + + //Key for Hadoop configuration + StringBuilder hadoopKey = new StringBuilder(); + //Value for Hadoop configuration + String hadoopValue; + if (accountKey != null || sharedAccessSignature != null) { + if (accountKey != null) { + hadoopKey.append(HADOOP_AZURE_FS_ACCOUNT_KEY).append('.'); + //Set only the AccountKey + hadoopValue = accountKey; + } else { + //Use SAS for Hadoop FS as connectionString is provided + hadoopKey.append(HADOOP_AZURE_FS_SAS).append('.'); + //Setting the container is required for SAS + hadoopKey.append(container).append('.'); + //Set the connection string for SAS + hadoopValue = sharedAccessSignature; + } + //Set the endPoint, which includes the AccountName + hadoopKey.append(endPoint); + //Tells Hadoop we are reading from Blob Storage + conf.set(hadoopKey.toString(), hadoopValue); + } + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java new file mode 100644 index 00000000000..8a0be99faf2 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.util.google.gcs; + +public class GCSConstants { + private GCSConstants() { + throw new AssertionError("do not instantiate"); + } + + public static final String JSON_CREDENTIALS_FIELD_NAME = "jsonCredentials"; +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java new file mode 100644 index 00000000000..553733fe429 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.util.google.gcs; + +import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR; +import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_REQ_PARAM_VAL; +import static org.apache.asterix.external.util.ExternalDataConstants.KEY_FORMAT; +import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix; +import static org.apache.asterix.external.util.ExternalDataUtils.isParquetFormat; +import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude; +import static org.apache.asterix.external.util.google.gcs.GCSConstants.JSON_CREDENTIALS_FIELD_NAME; +import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.hyracks.api.exceptions.IWarningCollector; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.api.exceptions.Warning; + +import com.google.api.gax.paging.Page; +import com.google.auth.oauth2.ServiceAccountCredentials; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; + +public class GCSUtils { + private GCSUtils() { + throw new AssertionError("do not instantiate"); + + } + + //TODO(htowaileb): Add validation step similar to other externals, which also checks if empty bucket + //upon creating the external dataset + + /** + * Builds the client using the provided configuration + * + * @param configuration properties + * @return clientasterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java + * @throws CompilationException CompilationException + */ + public static Storage buildClient(Map configuration) throws CompilationException { + String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME); + + StorageOptions.Builder builder = StorageOptions.newBuilder(); + + // Use credentials if available + if (jsonCredentials != null) { + try (InputStream credentialsStream = new ByteArrayInputStream(jsonCredentials.getBytes())) { + builder.setCredentials(ServiceAccountCredentials.fromStream(credentialsStream)); + } catch (IOException ex) { + throw new CompilationException(EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); + } + } + + return builder.build().getService(); + } + + /** + * Validate external dataset properties + * + * @param configuration properties + * @throws CompilationException Compilation exception + */ + public static void validateProperties(Map configuration, SourceLocation srcLoc, + IWarningCollector collector) throws CompilationException { + + // check if the format property is present + if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { + throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); + } + + // parquet is not supported for google cloud storage + if (isParquetFormat(configuration)) { + throw new CompilationException(INVALID_REQ_PARAM_VAL, srcLoc, KEY_FORMAT, configuration.get(KEY_FORMAT)); + } + + validateIncludeExclude(configuration); + String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); + + try { + Storage.BlobListOption limitOption = Storage.BlobListOption.pageSize(1); + Storage.BlobListOption prefixOption = Storage.BlobListOption.prefix(getPrefix(configuration)); + Storage storage = buildClient(configuration); + Page items = storage.list(container, limitOption, prefixOption); + + if (!items.iterateAll().iterator().hasNext() && collector.shouldWarn()) { + Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); + collector.warn(warning); + } + } catch (CompilationException ex) { + throw ex; + } catch (Exception ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex)); + } + } +} diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java index 90ea04bc2f4..91afbd8279f 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java @@ -18,8 +18,8 @@ */ package org.apache.asterix.external.input.record.reader.awss3; -import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_INTERNAL_ERROR; -import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_SLOW_DOWN; +import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR; +import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_SLOW_DOWN; import java.lang.reflect.Field; import java.lang.reflect.Method;