Skip to content
Permalink
Browse files
[NO ISSUE][EXT] Set Azure request timeout
- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- add azure_request_timeout configurable property
- default timeout to 120 seconds
- catch exceptions from external input stream and
  wrap in a RuntimeDataException to avoid halt due
  to non-serializable exceptions from external sources

Change-Id: Iebf988384b0bc5d6ae7688c65747227dbde062b1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15483
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
  • Loading branch information
AliSolaiman committed Mar 1, 2022
1 parent 55ef99b commit 6ddbbfa3c567c3af2132f0f4dcee5253011c6934
Showing 13 changed files with 98 additions and 52 deletions.
@@ -65,6 +65,7 @@
import org.apache.asterix.app.result.fields.ResultHandlePrinter;
import org.apache.asterix.app.result.fields.ResultsPrinter;
import org.apache.asterix.app.result.fields.StatusPrinter;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.IClientRequest;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.api.IRequestTracker;
@@ -829,7 +830,8 @@ protected Optional<? extends Dataset> doCreateDatasetStatement(MetadataProvider
metadataProvider, mdTxnCtx);
ExternalDataUtils.normalize(properties);
ExternalDataUtils.validate(properties);
validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx);
validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx,
appCtx);
datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
TransactionState.COMMIT);
break;
@@ -4773,23 +4775,23 @@ private static void ensureNotCancelled(ClientRequest clientRequest) throws Runti
}

protected void validateExternalDatasetProperties(ExternalDetailsDecl externalDetails,
Map<String, String> properties, SourceLocation srcLoc, MetadataTransactionContext mdTxnCtx)
throws AlgebricksException, HyracksDataException {
Map<String, String> properties, SourceLocation srcLoc, MetadataTransactionContext mdTxnCtx,
IApplicationContext appCtx) throws AlgebricksException, HyracksDataException {
// Validate adapter specific properties
String adapter = externalDetails.getAdapter();
Map<String, String> details = new HashMap<>(properties);
details.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapter);
validateAdapterSpecificProperties(details, srcLoc);
validateAdapterSpecificProperties(details, srcLoc, appCtx);
}

/**
* Ensures that the external source container is present
*
* @param configuration external source properties
*/
protected void validateAdapterSpecificProperties(Map<String, String> configuration, SourceLocation srcLoc)
throws CompilationException {
ExternalDataUtils.validateAdapterSpecificProperties(configuration, srcLoc, warningCollector);
protected void validateAdapterSpecificProperties(Map<String, String> configuration, SourceLocation srcLoc,
IApplicationContext appCtx) throws CompilationException {
ExternalDataUtils.validateAdapterSpecificProperties(configuration, srcLoc, warningCollector, appCtx);
}

protected enum CreateResult {
@@ -8,6 +8,7 @@
"active\.memory\.global\.budget" : 67108864,
"active\.stop\.timeout" : 3600,
"active\.suspend\.timeout" : 3600,
"azure.request.timeout" : 120,
"compiler\.arrayindex" : true,
"compiler\.external\.field\.pushdown" : true,
"compiler\.framesize" : 32768,
@@ -8,6 +8,7 @@
"active\.memory\.global\.budget" : 67108864,
"active\.stop\.timeout" : 3600,
"active\.suspend\.timeout" : 3600,
"azure.request.timeout" : 120,
"compiler\.arrayindex" : true,
"compiler\.external\.field\.pushdown" : true,
"compiler\.framesize" : 32768,
@@ -8,6 +8,7 @@
"active\.memory\.global\.budget" : 67108864,
"active\.stop\.timeout" : 3600,
"active\.suspend\.timeout" : 3600,
"azure.request.timeout" : 120,
"compiler\.arrayindex" : true,
"compiler\.external\.field\.pushdown" : true,
"compiler\.framesize" : 32768,
@@ -51,7 +51,8 @@ public enum Option implements IOption {
StorageUtil.getIntSizeInBytes(200, StorageUtil.StorageUnit.MEGABYTE),
"The maximum accepted web request size in bytes"),
REQUESTS_ARCHIVE_SIZE(NONNEGATIVE_INTEGER, 50, "The maximum number of archived requests to maintain"),
LIBRARY_DEPLOY_TIMEOUT(POSITIVE_INTEGER, 1800, "Timeout to upload a UDF in seconds");
LIBRARY_DEPLOY_TIMEOUT(POSITIVE_INTEGER, 1800, "Timeout to upload a UDF in seconds"),
AZURE_REQUEST_TIMEOUT(POSITIVE_INTEGER, 120, "Timeout for Azure client requests in seconds");

private final IOptionType type;
private final Object defaultValue;
@@ -78,6 +79,7 @@ public Section section() {
case MAX_WAIT_ACTIVE_CLUSTER:
case MAX_WEB_REQUEST_SIZE:
case LIBRARY_DEPLOY_TIMEOUT:
case AZURE_REQUEST_TIMEOUT:
return Section.COMMON;
case CC_JAVA_OPTS:
case NC_JAVA_OPTS:
@@ -155,4 +157,7 @@ public int getLibraryDeployTimeout() {
return accessor.getInt(Option.LIBRARY_DEPLOY_TIMEOUT);
}

public int getAzureRequestTimeout() {
return accessor.getInt(Option.AZURE_REQUEST_TIMEOUT);
}
}
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.zip.GZIPInputStream;

import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -45,9 +46,10 @@ public class AzureBlobInputStream extends AbstractExternalInputStream {
private final BlobServiceClient client;
private final String container;

public AzureBlobInputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
public AzureBlobInputStream(IApplicationContext appCtx, Map<String, String> configuration, List<String> filePaths)
throws HyracksDataException {
super(configuration, filePaths);
this.client = buildAzureClient(configuration);
this.client = buildAzureClient(appCtx, configuration);
this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
}

@@ -81,9 +83,10 @@ protected boolean getInputStream() throws IOException {
return true;
}

private BlobServiceClient buildAzureClient(Map<String, String> configuration) throws HyracksDataException {
private BlobServiceClient buildAzureClient(IApplicationContext appCtx, Map<String, String> configuration)
throws HyracksDataException {
try {
return ExternalDataUtils.Azure.buildAzureBlobClient(configuration);
return ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration);
} catch (CompilationException ex) {
throw HyracksDataException.create(ex);
}
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.PriorityQueue;

import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -41,18 +42,22 @@ public class AzureBlobInputStreamFactory extends AbstractExternalInputStreamFact

@Override
public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
return new AzureBlobInputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
IApplicationContext appCtx =
(IApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
return new AzureBlobInputStream(appCtx, configuration,
partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
}

@Override
public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
throws AlgebricksException {
super.configure(ctx, configuration, warningCollector);

IApplicationContext appCtx = (IApplicationContext) ctx.getApplicationContext();
// Ensure the validity of include/exclude
ExternalDataUtils.validateIncludeExclude(configuration);
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(configuration);
BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration);
List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItems(blobServiceClient, configuration,
includeExcludeMatcher, warningCollector);

@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.zip.GZIPInputStream;

import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -45,10 +46,10 @@ public class AzureDataLakeInputStream extends AbstractExternalInputStream {
private final DataLakeServiceClient client;
private final String container;

public AzureDataLakeInputStream(Map<String, String> configuration, List<String> filePaths)
throws HyracksDataException {
public AzureDataLakeInputStream(IApplicationContext appCtx, Map<String, String> configuration,
List<String> filePaths) throws HyracksDataException {
super(configuration, filePaths);
this.client = buildAzureClient(configuration);
this.client = buildAzureClient(appCtx, configuration);
this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
}

@@ -82,9 +83,10 @@ protected boolean getInputStream() throws IOException {
return true;
}

private DataLakeServiceClient buildAzureClient(Map<String, String> configuration) throws HyracksDataException {
private DataLakeServiceClient buildAzureClient(IApplicationContext appCtx, Map<String, String> configuration)
throws HyracksDataException {
try {
return ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration);
return ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration);
} catch (CompilationException ex) {
throw HyracksDataException.create(ex);
}
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.PriorityQueue;

import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -41,18 +42,22 @@ public class AzureDataLakeInputStreamFactory extends AbstractExternalInputStream

@Override
public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
return new AzureDataLakeInputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
IApplicationContext appCtx =
(IApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
return new AzureDataLakeInputStream(appCtx, configuration,
partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
}

@Override
public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
throws AlgebricksException {
super.configure(ctx, configuration, warningCollector);

IApplicationContext appCtx = (IApplicationContext) ctx.getApplicationContext();
// Ensure the validity of include/exclude
ExternalDataUtils.validateIncludeExclude(configuration);
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
DataLakeServiceClient client = ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration);
DataLakeServiceClient client = ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration);
List<PathItem> filesOnly = ExternalDataUtils.Azure.listDatalakePathItems(client, configuration,
includeExcludeMatcher, warningCollector);

@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;

import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
@@ -45,7 +46,8 @@ public class AzureBlobParquetReaderFactory extends HDFSDataSourceFactory {
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(configuration);
IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration);
//Get endpoint
String endPoint = extractEndPoint(blobServiceClient.getAccountUrl());
//Get path
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;

import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
@@ -45,7 +46,9 @@ public class AzureDataLakeParquetReaderFactory extends HDFSDataSourceFactory {
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
DataLakeServiceClient dataLakeServiceClient = ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration);
IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
DataLakeServiceClient dataLakeServiceClient =
ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration);

//Get endpoint
String endPoint = extractEndPoint(dataLakeServiceClient.getAccountUrl());
@@ -21,10 +21,13 @@
import java.io.IOException;
import java.io.InputStream;

import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IStreamNotificationHandler;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.ExceptionUtils;

/**
* Base class for a source stream that is composed of multiple separate input streams. Reading proceeds one stream at
@@ -54,25 +57,30 @@ public int read() throws IOException {

@Override
public final int read(byte[] b, int off, int len) throws IOException {
if (in == null) {
if (!advance()) {
return -1;
try {
if (in == null) {
if (!advance()) {
return -1;
}
}
int result = in.read(b, off, len);
if (result < 0 && (lastByte != ExternalDataConstants.BYTE_LF)
&& (lastByte != ExternalDataConstants.BYTE_CR)) {
// return a new line at the end of every file <--Might create problems for some cases
// depending on the parser implementation-->
lastByte = ExternalDataConstants.BYTE_LF;
b[off] = ExternalDataConstants.BYTE_LF;
return 1;
}
while ((result < 0) && advance()) {
result = in.read(b, off, len);
}
if (result > 0) {
lastByte = b[(off + result) - 1];
}
return result;
} catch (Exception e) {
throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ExceptionUtils.getMessageOrToString(e));
}
int result = in.read(b, off, len);
if (result < 0 && (lastByte != ExternalDataConstants.BYTE_LF) && (lastByte != ExternalDataConstants.BYTE_CR)) {
// return a new line at the end of every file <--Might create problems for some cases
// depending on the parser implementation-->
lastByte = ExternalDataConstants.BYTE_LF;
b[off] = ExternalDataConstants.BYTE_LF;
return 1;
}
while ((result < 0) && advance()) {
result = in.read(b, off, len);
}
if (result > 0) {
lastByte = b[(off + result) - 1];
}
return result;
}
}

0 comments on commit 6ddbbfa

Please sign in to comment.