Skip to content
Permalink
Browse files
Refactor ExternalDataUtils/Constants
Change-Id: Ie1f1499f13968d421bee43ec7352aea0c2749423
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15644
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Michael Blow <mblow@apache.org>
  • Loading branch information
htowaileb committed Mar 14, 2022
1 parent 74bc0b3 commit 6ce0d67f8275d5f3030622778563050a6fabaeab
Showing 21 changed files with 1,409 additions and 1,234 deletions.
@@ -20,7 +20,7 @@

import static java.util.regex.Pattern.CASE_INSENSITIVE;
import static java.util.regex.Pattern.DOTALL;
import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.SECRET_ACCESS_KEY_FIELD_NAME;
import static org.apache.asterix.external.util.aws.s3.S3Constants.SECRET_ACCESS_KEY_FIELD_NAME;

import java.util.regex.Pattern;

@@ -18,7 +18,6 @@
*/
package org.apache.asterix.external.input.record.reader.aws;

import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;

import java.io.IOException;
@@ -32,7 +31,7 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
@@ -109,7 +108,7 @@ private boolean doGetInputStream(GetObjectRequest request) throws RuntimeDataExc
}

private boolean shouldRetry(String errorCode, int currentRetry) {
return currentRetry < MAX_RETRIES && AwsS3.isRetryableError(errorCode);
return currentRetry < MAX_RETRIES && S3Utils.isRetryableError(errorCode);
}

@Override
@@ -134,7 +133,7 @@ public boolean stop() {

private S3Client buildAwsS3Client(Map<String, String> configuration) throws HyracksDataException {
try {
return ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
return S3Utils.buildAwsS3Client(configuration);
} catch (CompilationException ex) {
throw HyracksDataException.create(ex);
}
@@ -26,6 +26,7 @@
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -53,8 +54,7 @@ public void configure(IServiceContext ctx, Map<String, String> configuration, IW
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);

//Get a list of S3 objects
List<S3Object> filesOnly =
ExternalDataUtils.AwsS3.listS3Objects(configuration, includeExcludeMatcher, warningCollector);
List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector);
// Distribute work load amongst the partitions
distributeWorkLoad(filesOnly, getPartitionsCount());
}
@@ -28,6 +28,8 @@
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.aws.s3.S3Constants;
import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
@@ -52,7 +54,7 @@ public void configure(IServiceContext serviceCtx, Map<String, String> configurat
//Configure Hadoop S3 input splits
JobConf conf = createHdfsConf(serviceCtx, configuration);
int numberOfPartitions = getPartitionConstraint().getLocations().length;
ExternalDataUtils.AwsS3.configureAwsS3HdfsJobConf(conf, configuration, numberOfPartitions);
S3Utils.configureAwsS3HdfsJobConf(conf, configuration, numberOfPartitions);
configureHdfsConf(conf, configuration);
}

@@ -89,8 +91,7 @@ private static String buildPathURIs(Map<String, String> configuration, IWarningC
throws CompilationException {
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
List<S3Object> filesOnly =
ExternalDataUtils.AwsS3.listS3Objects(configuration, includeExcludeMatcher, warningCollector);
List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector);
StringBuilder builder = new StringBuilder();

if (!filesOnly.isEmpty()) {
@@ -105,7 +106,7 @@ private static String buildPathURIs(Map<String, String> configuration, IWarningC
}

private static void appendFileURI(StringBuilder builder, String container, S3Object file) {
builder.append(ExternalDataConstants.AwsS3.HADOOP_S3_PROTOCOL);
builder.append(S3Constants.HADOOP_S3_PROTOCOL);
builder.append("://");
builder.append(container);
builder.append('/');
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.external.input.record.reader.azure.blob;

import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureBlobClient;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;

import java.io.IOException;
@@ -31,7 +32,6 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.util.LogRedactionUtil;

@@ -86,7 +86,7 @@ protected boolean getInputStream() throws IOException {
private BlobServiceClient buildAzureClient(IApplicationContext appCtx, Map<String, String> configuration)
throws HyracksDataException {
try {
return ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration);
return buildAzureBlobClient(appCtx, configuration);
} catch (CompilationException ex) {
throw HyracksDataException.create(ex);
}
@@ -18,6 +18,9 @@
*/
package org.apache.asterix.external.input.record.reader.azure.blob;

import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureBlobClient;
import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.listBlobItems;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -57,9 +60,9 @@ public void configure(IServiceContext ctx, Map<String, String> configuration, IW
// Ensure the validity of include/exclude
ExternalDataUtils.validateIncludeExclude(configuration);
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration);
List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItems(blobServiceClient, configuration,
includeExcludeMatcher, warningCollector);
BlobServiceClient blobServiceClient = buildAzureBlobClient(appCtx, configuration);
List<BlobItem> filesOnly =
listBlobItems(blobServiceClient, configuration, includeExcludeMatcher, warningCollector);

// Distribute work load amongst the partitions
distributeWorkLoad(filesOnly, getPartitionsCount());
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.external.input.record.reader.azure.datalake;

import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureDatalakeClient;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;

import java.io.IOException;
@@ -31,7 +32,6 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.util.LogRedactionUtil;

@@ -86,7 +86,7 @@ protected boolean getInputStream() throws IOException {
private DataLakeServiceClient buildAzureClient(IApplicationContext appCtx, Map<String, String> configuration)
throws HyracksDataException {
try {
return ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration);
return buildAzureDatalakeClient(appCtx, configuration);
} catch (CompilationException ex) {
throw HyracksDataException.create(ex);
}
@@ -18,6 +18,9 @@
*/
package org.apache.asterix.external.input.record.reader.azure.datalake;

import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureDatalakeClient;
import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.listDatalakePathItems;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -57,9 +60,9 @@ public void configure(IServiceContext ctx, Map<String, String> configuration, IW
// Ensure the validity of include/exclude
ExternalDataUtils.validateIncludeExclude(configuration);
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
DataLakeServiceClient client = ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration);
List<PathItem> filesOnly = ExternalDataUtils.Azure.listDatalakePathItems(client, configuration,
includeExcludeMatcher, warningCollector);
DataLakeServiceClient client = buildAzureDatalakeClient(appCtx, configuration);
List<PathItem> filesOnly =
listDatalakePathItems(client, configuration, includeExcludeMatcher, warningCollector);

// Distribute work load amongst the partitions
distributeWorkLoad(filesOnly, getPartitionsCount());
@@ -18,6 +18,11 @@
*/
package org.apache.asterix.external.input.record.reader.azure.parquet;

import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.HADOOP_AZURE_BLOB_PROTOCOL;
import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureBlobClient;
import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.configureAzureHdfsJobConf;
import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.listBlobItems;

import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -47,7 +52,7 @@ public class AzureBlobParquetReaderFactory extends HDFSDataSourceFactory {
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration);
BlobServiceClient blobServiceClient = buildAzureBlobClient(appCtx, configuration);
//Get endpoint
String endPoint = extractEndPoint(blobServiceClient.getAccountUrl());
//Get path
@@ -57,7 +62,7 @@ public void configure(IServiceContext serviceCtx, Map<String, String> configurat

//Configure Hadoop Azure input splits
JobConf conf = createHdfsConf(serviceCtx, configuration);
ExternalDataUtils.Azure.configureAzureHdfsJobConf(conf, configuration, endPoint);
configureAzureHdfsJobConf(conf, configuration, endPoint);
configureHdfsConf(conf, configuration);
}

@@ -94,8 +99,8 @@ private static void putAzureBlobConfToHadoopConf(Map<String, String> configurati
private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector,
BlobServiceClient blobServiceClient, String endPoint) throws CompilationException {
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItems(blobServiceClient, configuration,
includeExcludeMatcher, warningCollector);
List<BlobItem> filesOnly =
listBlobItems(blobServiceClient, configuration, includeExcludeMatcher, warningCollector);

StringBuilder builder = new StringBuilder();
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
@@ -118,7 +123,7 @@ private static String extractEndPoint(String uri) {
}

private static void appendFileURI(StringBuilder builder, String container, String endPoint, BlobItem file) {
builder.append(ExternalDataConstants.Azure.HADOOP_AZURE_BLOB_PROTOCOL);
builder.append(HADOOP_AZURE_BLOB_PROTOCOL);
builder.append("://");
builder.append(container);
builder.append('@');
@@ -18,6 +18,11 @@
*/
package org.apache.asterix.external.input.record.reader.azure.parquet;

import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.HADOOP_AZURE_DATALAKE_PROTOCOL;
import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureDatalakeClient;
import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.configureAzureHdfsJobConf;
import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.listDatalakePathItems;

import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -47,8 +52,7 @@ public class AzureDataLakeParquetReaderFactory extends HDFSDataSourceFactory {
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
DataLakeServiceClient dataLakeServiceClient =
ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration);
DataLakeServiceClient dataLakeServiceClient = buildAzureDatalakeClient(appCtx, configuration);

//Get endpoint
String endPoint = extractEndPoint(dataLakeServiceClient.getAccountUrl());
@@ -61,7 +65,7 @@ public void configure(IServiceContext serviceCtx, Map<String, String> configurat

//Configure Hadoop Azure input splits
JobConf conf = createHdfsConf(serviceCtx, configuration);
ExternalDataUtils.Azure.configureAzureHdfsJobConf(conf, configuration, endPoint);
configureAzureHdfsJobConf(conf, configuration, endPoint);
configureHdfsConf(conf, configuration);
}

@@ -98,8 +102,8 @@ private static void putAzureDataLakeConfToHadoopConf(Map<String, String> configu
private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector,
DataLakeServiceClient dataLakeServiceClient, String endPoint) throws CompilationException {
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
List<PathItem> filesOnly = ExternalDataUtils.Azure.listDatalakePathItems(dataLakeServiceClient, configuration,
includeExcludeMatcher, warningCollector);
List<PathItem> filesOnly =
listDatalakePathItems(dataLakeServiceClient, configuration, includeExcludeMatcher, warningCollector);

StringBuilder builder = new StringBuilder();
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
@@ -122,7 +126,7 @@ private static String extractEndPoint(String uri) {
}

private static void appendFileURI(StringBuilder builder, String container, String endPoint, PathItem file) {
builder.append(ExternalDataConstants.Azure.HADOOP_AZURE_DATALAKE_PROTOCOL);
builder.append(HADOOP_AZURE_DATALAKE_PROTOCOL);
builder.append("://");
builder.append(container);
builder.append('@');
@@ -32,7 +32,7 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.google.gcs.GCSUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
@@ -134,7 +134,7 @@ public boolean stop() {

private Storage buildClient(Map<String, String> configuration) throws HyracksDataException {
try {
return ExternalDataUtils.GCS.buildClient(configuration);
return GCSUtils.buildClient(configuration);
} catch (CompilationException ex) {
throw HyracksDataException.create(ex);
}
@@ -19,6 +19,7 @@
package org.apache.asterix.external.input.record.reader.gcs;

import static org.apache.asterix.external.util.ExternalDataUtils.getIncludeExcludeMatchers;
import static org.apache.asterix.external.util.google.gcs.GCSUtils.buildClient;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;

import java.util.ArrayList;
@@ -67,7 +68,7 @@ public void configure(IServiceContext ctx, Map<String, String> configuration, IW
// Prepare to retrieve the objects
List<Blob> filesOnly = new ArrayList<>();
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
Storage gcs = ExternalDataUtils.GCS.buildClient(configuration);
Storage gcs = buildClient(configuration);
Storage.BlobListOption options = Storage.BlobListOption.prefix(ExternalDataUtils.getPrefix(configuration));
Page<Blob> items;

0 comments on commit 6ce0d67

Please sign in to comment.