Skip to content

Commit

Permalink
NIFI-7886 Added read by range to Azure, GCS, S3 fetch processors
Browse files Browse the repository at this point in the history
This closes #4576

Co-authored-by: Joey Frazee <jfrazee@apache.org>
Signed-off-by: Joey Frazee <jfrazee@apache.org>
  • Loading branch information
pkelly-nifi and jfrazee committed Apr 27, 2021
1 parent 16dc61e commit 0ed3534
Show file tree
Hide file tree
Showing 9 changed files with 378 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.FlowFileAccessException;
Expand Down Expand Up @@ -82,6 +83,7 @@ public class FetchS3Object extends AbstractS3Processor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();

public static final PropertyDescriptor REQUESTER_PAYS = new PropertyDescriptor.Builder()
.name("requester-pays")
.displayName("Requester Pays")
Expand All @@ -95,10 +97,30 @@ public class FetchS3Object extends AbstractS3Processor {
.defaultValue("false")
.build();

public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder()
.name("range-start")
.displayName("Range Start")
.description("The byte position at which to start reading from the object. An empty value or a value of " +
"zero will start reading at the beginning of the object.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();

public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder()
.name("range-length")
.displayName("Range Length")
.description("The number of bytes to download from the object, starting from the Range Start. An empty " +
"value or a value that extends beyond the end of the object will read to the end of the object.")
.addValidator(StandardValidators.createDataSizeBoundsValidator(1, Long.MAX_VALUE))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();

public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID,
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, ENCRYPTION_SERVICE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST,
PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, REQUESTER_PAYS));
PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, REQUESTER_PAYS, RANGE_START, RANGE_LENGTH));

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
Expand Down Expand Up @@ -138,6 +160,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean();
final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L);
final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null);

final AmazonS3 client = getClient();
final GetObjectRequest request;
Expand All @@ -147,6 +171,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
request = new GetObjectRequest(bucket, key, versionId);
}
request.setRequesterPays(requesterPays);
if (rangeLength != null) {
request.setRange(rangeStart, rangeStart + rangeLength - 1);
} else {
request.setRange(rangeStart);
}

final Map<String, String> attributes = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public void testFlowFileAccessExceptionGoesToFailure() throws IOException {
public void testGetPropertyDescriptors() throws Exception {
FetchS3Object processor = new FetchS3Object();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 19, pd.size());
assertEquals("size should be eq", 21, pd.size());
assertTrue(pd.contains(FetchS3Object.ACCESS_KEY));
assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(FetchS3Object.BUCKET));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,10 +34,14 @@
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;

Expand All @@ -53,6 +59,34 @@
})
public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {

public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder()
.name("range-start")
.displayName("Range Start")
.description("The byte position at which to start reading from the blob. An empty value or a value of " +
"zero will start reading at the beginning of the blob.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();

public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder()
.name("range-length")
.displayName("Range Length")
.description("The number of bytes to download from the blob, starting from the Range Start. An empty " +
"value or a value that extends beyond the end of the blob will read to the end of the blob.")
.addValidator(StandardValidators.createDataSizeBoundsValidator(1, Long.MAX_VALUE))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>(super.getSupportedPropertyDescriptors());
properties.add(RANGE_START);
properties.add(RANGE_LENGTH);
return properties;
}

@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
Expand All @@ -62,8 +96,10 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro

final long startNanos = System.nanoTime();

String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
final String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L);
final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null);

AtomicReference<Exception> storedException = new AtomicReference<>();
try {
Expand All @@ -80,7 +116,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
// distribution of download over threads, investigate
flowFile = session.write(flowFile, os -> {
try {
blob.download(os, null, null, operationContext);
blob.downloadRange(rangeStart, rangeLength, os, null, null, operationContext);
} catch (StorageException e) {
storedException.set(e);
throw new IOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,30 @@
*/
package org.apache.nifi.processors.azure.storage;

import com.azure.core.util.Context;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.models.DownloadRetryOptions;
import com.azure.storage.file.datalake.models.FileRange;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
Expand All @@ -39,6 +48,45 @@
@InputRequirement(Requirement.INPUT_REQUIRED)
public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {

public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder()
.name("range-start")
.displayName("Range Start")
.description("The byte position at which to start reading from the object. An empty value or a value of " +
"zero will start reading at the beginning of the object.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();

public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder()
.name("range-length")
.displayName("Range Length")
.description("The number of bytes to download from the object, starting from the Range Start. An empty " +
"value or a value that extends beyond the end of the object will read to the end of the object.")
.addValidator(StandardValidators.createDataSizeBoundsValidator(1, Long.MAX_VALUE))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();

public static final PropertyDescriptor NUM_RETRIES = new PropertyDescriptor.Builder()
.name("number-of-retries")
.displayName("Number of Retries")
.description("The number of automatic retries to perform if the download fails.")
.addValidator(StandardValidators.createLongValidator(0L, Integer.MAX_VALUE, true))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.defaultValue("0")
.build();

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>(super.getSupportedPropertyDescriptors());
properties.add(RANGE_START);
properties.add(RANGE_LENGTH);
properties.add(NUM_RETRIES);
return properties;
}

@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
Expand All @@ -48,10 +96,16 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro

final long startNanos = System.nanoTime();
try {
final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L);
final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null);
final int numRetries = (context.getProperty(NUM_RETRIES).isSet() ? context.getProperty(NUM_RETRIES).evaluateAttributeExpressions(flowFile).asInteger() : 0);
final FileRange fileRange = new FileRange(rangeStart, rangeLength);
final DownloadRetryOptions retryOptions = new DownloadRetryOptions();
retryOptions.setMaxRetryRequests(numRetries);

final String fileSystem = evaluateFileSystemProperty(context, flowFile);
final String directory = evaluateDirectoryProperty(context, flowFile);
final String fileName = evaluateFileNameProperty(context, flowFile);

final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
Expand All @@ -61,7 +115,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
throw new ProcessException(FILE.getDisplayName() + " (" + fileName + ") points to a directory. Full path: " + fileClient.getFilePath());
}

flowFile = session.write(flowFile, os -> fileClient.read(os));
flowFile = session.write(flowFile, os -> fileClient.readWithResponse(os, fileRange, retryOptions, null, false, null, Context.NONE));
session.getProvenanceReporter().modifyContent(flowFile);
session.transfer(flowFile, REL_SUCCESS);

Expand All @@ -73,4 +127,4 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
session.transfer(flowFile, REL_FAILURE);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public abstract class AbstractAzureBlobStorageIT extends AbstractAzureStorageIT
protected static final String TEST_CONTAINER_NAME_PREFIX = "nifi-test-container";
protected static final String TEST_BLOB_NAME = "nifi-test-blob";
protected static final String TEST_FILE_NAME = "nifi-test-file";
protected static final String TEST_FILE_CONTENT = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";

protected CloudBlobContainer container;

Expand All @@ -52,8 +53,8 @@ public void tearDownAzureBlobStorageIT() throws Exception {

protected void uploadTestBlob() throws Exception {
CloudBlob blob = container.getBlockBlobReference(TEST_BLOB_NAME);
byte[] buf = "0123456789".getBytes();
byte[] buf = TEST_FILE_CONTENT.getBytes();
InputStream in = new ByteArrayInputStream(buf);
blob.upload(in, 10);
blob.upload(in, buf.length);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorageIT {

private static final String FILESYSTEM_NAME_PREFIX = "nifi-test-filesystem";
private static final String TEST_FILE_CONTENT = "test";

protected static final String TEST_FILE_CONTENT = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";

protected String fileSystemName;
protected DataLakeFileSystemClient fileSystemClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,71 @@ public void testFetchBlob() throws Exception {
assertResult();
}

@Test
public void testFetchBlobWithRangeZeroOne() throws Exception {
runner.setProperty(FetchAzureBlobStorage.RANGE_START, "0B");
runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1B");
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();

assertResult(TEST_FILE_CONTENT.substring(0, 1));
}

@Test
public void testFetchBlobWithRangeOneOne() throws Exception {
runner.setProperty(FetchAzureBlobStorage.RANGE_START, "1B");
runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1B");
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();

assertResult(TEST_FILE_CONTENT.substring(1, 2));
}

@Test
public void testFetchBlobWithRangeTwentyThreeTwentySix() throws Exception {
runner.setProperty(FetchAzureBlobStorage.RANGE_START, "23B");
runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "3B");
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();

assertResult(TEST_FILE_CONTENT.substring(23, 26));
}

@Test
public void testFetchBlobWithRangeLengthGreater() throws Exception {
runner.setProperty(FetchAzureBlobStorage.RANGE_START, "0B");
runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1KB");
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();

assertResult(TEST_FILE_CONTENT);
}

@Test
public void testFetchBlobWithRangeLengthUnset() throws Exception {
runner.setProperty(FetchAzureBlobStorage.RANGE_START, "0B");
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();

assertResult(TEST_FILE_CONTENT);
}

@Test
public void testFetchBlobWithRangeStartOutOfRange() throws Exception {
runner.setProperty(FetchAzureBlobStorage.RANGE_START, String.format("%sB", TEST_FILE_CONTENT.length() + 1));
runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1B");
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();

runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_FAILURE, 1);
}

@Test
public void testFetchBlobUsingCredentialService() throws Exception {
configureCredentialsService();
Expand All @@ -59,11 +124,15 @@ public void testFetchBlobUsingCredentialService() throws Exception {
}

private void assertResult() throws Exception {
assertResult(TEST_FILE_CONTENT);
}

private void assertResult(final String expectedContent) throws Exception {
runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_SUCCESS, 1);
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
for (MockFlowFile flowFile : flowFilesForRelationship) {
flowFile.assertContentEquals("0123456789".getBytes());
flowFile.assertAttributeEquals("azure.length", "10");
flowFile.assertContentEquals(expectedContent);
flowFile.assertAttributeEquals("azure.length", String.valueOf(TEST_FILE_CONTENT.length()));
}
}
}

0 comments on commit 0ed3534

Please sign in to comment.