Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,20 @@
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobRange;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
Expand All @@ -40,15 +49,6 @@
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12;
import org.apache.nifi.processors.azure.ClientSideEncryptionSupport;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER;
Expand Down Expand Up @@ -81,6 +81,32 @@
@WritesAttribute(attribute = ATTR_NAME_LANG, description = ATTR_DESCRIPTION_LANG),
@WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description = ATTR_DESCRIPTION_TIMESTAMP),
@WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)})
@MultiProcessorUseCase(
description = "Retrieve all files in an Azure Blob Storage container",
keywords = {"azure", "blob", "storage", "state", "retrieve", "fetch", "all", "stream"},
configurations = {
@ProcessorConfiguration(
processorClass = ListAzureBlobStorage_v12.class,
configuration = """
The "Container Name" property should be set to the name of the Blob Storage Container that files reside in. \
If the flow being built is to be reused elsewhere, it's a good idea to parameterize this property by setting it to something like `#{AZURE_CONTAINER}`.

The "Storage Credentials" property should specify an instance of the AzureStorageCredentialsService_v12 in order to provide credentials for accessing the storage container.

The 'success' Relationship of this Processor is then connected to FetchAzureBlobStorage_v12.
"""
),
@ProcessorConfiguration(
processorClass = FetchAzureBlobStorage_v12.class,
configuration = """
"Container Name" = "${azure.container}"
"Blob Name" = "${azure.blobname}"

The "Storage Credentials" property should specify an instance of the AzureStorageCredentialsService_v12 in order to provide credentials for accessing the storage container.
"""
)
}
)
public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 implements ClientSideEncryptionSupport {

public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,17 @@
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import com.azure.storage.file.datalake.models.DownloadRetryOptions;
import com.azure.storage.file.datalake.models.FileRange;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
Expand All @@ -42,20 +48,44 @@
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
@CapabilityDescription("Fetch the provided file from Azure Data Lake Storage")
@CapabilityDescription("Fetch the specified file from Azure Data Lake Storage")
@InputRequirement(Requirement.INPUT_REQUIRED)
@WritesAttributes({
@WritesAttribute(attribute = "azure.datalake.storage.statusCode", description = "The HTTP error code (if available) from the failed operation"),
@WritesAttribute(attribute = "azure.datalake.storage.errorCode", description = "The Azure Data Lake Storage moniker of the failed operation"),
@WritesAttribute(attribute = "azure.datalake.storage.errorMessage", description = "The Azure Data Lake Storage error message from the failed operation")
})
@MultiProcessorUseCase(
description = "Retrieve all files in an Azure DataLake Storage directory",
keywords = {"azure", "datalake", "adls", "state", "retrieve", "fetch", "all", "stream"},
configurations = {
@ProcessorConfiguration(
processorClass = ListAzureDataLakeStorage.class,
configuration = """
The "Filesystem Name" property should be set to the name of the Azure Filesystem (also known as a Container) that files reside in. \
If the flow being built is to be reused elsewhere, it's a good idea to parameterize this property by setting it to something like `#{AZURE_FILESYSTEM}`.
Configure the "Directory Name" property to specify the name of the directory in the file system. \
If the flow being built is to be reused elsewhere, it's a good idea to parameterize this property by setting it to something like `#{AZURE_DIRECTORY}`.

The "ADLS Credentials" property should specify an instance of the ADLSCredentialsService in order to provide credentials for accessing the filesystem.

The 'success' Relationship of this Processor is then connected to FetchAzureDataLakeStorage.
"""
),
@ProcessorConfiguration(
processorClass = FetchAzureDataLakeStorage.class,
configuration = """
"Filesystem Name" = "${azure.filesystem}"
"Directory Name" = "${azure.directory}"
"File Name" = "${azure.filename}"

The "ADLS Credentials" property should specify an instance of the ADLSCredentialsService in order to provide credentials for accessing the filesystem.
"""
)
}
)
public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {

public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,20 @@
import com.google.api.services.drive.Drive;
import com.google.api.services.drive.DriveScopes;
import com.google.api.services.drive.model.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
Expand All @@ -43,17 +52,6 @@
import org.apache.nifi.processors.gcp.util.GoogleUtils;
import org.apache.nifi.proxy.ProxyConfiguration;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
Expand Down Expand Up @@ -82,6 +80,33 @@
@WritesAttribute(attribute = ERROR_CODE, description = ERROR_CODE_DESC),
@WritesAttribute(attribute = ERROR_MESSAGE, description = ERROR_MESSAGE_DESC)
})
@MultiProcessorUseCase(
description = "Retrieve all files in a Google Drive folder",
keywords = {"google", "drive", "google cloud", "state", "retrieve", "fetch", "all", "stream"},
configurations = {
@ProcessorConfiguration(
processorClass = ListGoogleDrive.class,
configuration = """
The "Folder ID" property should be set to the ID of the Google Drive folder that files reside in. \
See processor documentation / additional details for more information on how to determine a Google Drive folder's ID.
If the flow being built is to be reused elsewhere, it's a good idea to parameterize \
this property by setting it to something like `#{GOOGLE_DRIVE_FOLDER_ID}`.

The "GCP Credentials Provider Service" property should specify an instance of the GCPCredentialsService in order to provide credentials for accessing the folder.

The 'success' Relationship of this Processor is then connected to FetchGoogleDrive.
"""
),
@ProcessorConfiguration(
processorClass = FetchGoogleDrive.class,
configuration = """
"File ID" = "${drive.id}"

The "GCP Credentials Provider Service" property should specify an instance of the GCPCredentialsService in order to provide credentials for accessing the bucket.
"""
)
}
)
public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTrait {

// Google Docs Export Types
Expand Down Expand Up @@ -195,8 +220,6 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr





public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile will be routed here for each successfully fetched File.")
Expand All @@ -207,20 +230,17 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
.description("A FlowFile will be routed here for each File for which fetch was attempted but failed.")
.build();

private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
private static final List<PropertyDescriptor> PROPERTIES = List.of(
GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE,
FILE_ID,
ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS),
GOOGLE_DOC_EXPORT_TYPE,
GOOGLE_SPREADSHEET_EXPORT_TYPE,
GOOGLE_PRESENTATION_EXPORT_TYPE,
GOOGLE_DRAWING_EXPORT_TYPE
));
);

public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS,
REL_FAILURE
)));
public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);

private volatile Drive driveService;

Expand Down Expand Up @@ -279,20 +299,14 @@ private String getExportType(final String mimeType, final ProcessContext context
return null;
}

switch (mimeType) {
case "application/vnd.google-apps.document":
return context.getProperty(GOOGLE_DOC_EXPORT_TYPE).getValue();
case "application/vnd.google-apps.spreadsheet":
return context.getProperty(GOOGLE_SPREADSHEET_EXPORT_TYPE).getValue();
case "application/vnd.google-apps.presentation":
return context.getProperty(GOOGLE_PRESENTATION_EXPORT_TYPE).getValue();
case "application/vnd.google-apps.drawing":
return context.getProperty(GOOGLE_DRAWING_EXPORT_TYPE).getValue();
case "application/vnd.google-apps.script":
return "application/vnd.google-apps.script+json";
default:
return null;
}
return switch (mimeType) {
case "application/vnd.google-apps.document" -> context.getProperty(GOOGLE_DOC_EXPORT_TYPE).getValue();
case "application/vnd.google-apps.spreadsheet" -> context.getProperty(GOOGLE_SPREADSHEET_EXPORT_TYPE).getValue();
case "application/vnd.google-apps.presentation" -> context.getProperty(GOOGLE_PRESENTATION_EXPORT_TYPE).getValue();
case "application/vnd.google-apps.drawing" -> context.getProperty(GOOGLE_DRAWING_EXPORT_TYPE).getValue();
case "application/vnd.google-apps.script" -> "application/vnd.google-apps.script+json";
default -> null;
};
}

private FlowFile fetchFile(final String fileId, final ProcessSession session, final ProcessContext context, final FlowFile flowFile, final Map<String, String> attributeMap) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.commons.io.output.CountingOutputStream;
Expand All @@ -30,6 +38,8 @@
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.ConfigVerificationResult;
Expand All @@ -45,15 +55,6 @@
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CACHE_CONTROL_ATTR;
Expand Down Expand Up @@ -129,6 +130,33 @@
@WritesAttribute(attribute = OWNER_TYPE_ATTR, description = OWNER_TYPE_DESC),
@WritesAttribute(attribute = URI_ATTR, description = URI_DESC)
})
@MultiProcessorUseCase(
description = "Retrieve all files in a Google Compute Storage (GCS) bucket",
keywords = {"gcp", "gcs", "google cloud", "google compute storage", "state", "retrieve", "fetch", "all", "stream"},
configurations = {
@ProcessorConfiguration(
processorClass = ListGCSBucket.class,
configuration = """
The "Bucket" property should be set to the name of the GCS bucket that files reside in. If the flow being built is to be reused elsewhere, it's a good idea to parameterize \
this property by setting it to something like `#{GCS_SOURCE_BUCKET}`.
Configure the "Project ID" property to reflect the ID of your Google Compute Cloud Project.

The "GCP Credentials Provider Service" property should specify an instance of the GCPCredentialsService in order to provide credentials for accessing the bucket.

The 'success' Relationship of this Processor is then connected to FetchGCSObject.
"""
),
@ProcessorConfiguration(
processorClass = FetchGCSObject.class,
configuration = """
"Bucket" = "${gcs.bucket}"
"Name" = "${filename}"

The "GCP Credentials Provider Service" property should specify an instance of the GCPCredentialsService in order to provide credentials for accessing the bucket.
"""
)
}
)
public class FetchGCSObject extends AbstractGCSProcessor {
public static final PropertyDescriptor BUCKET = new PropertyDescriptor
.Builder().name("gcs-bucket")
Expand Down Expand Up @@ -219,7 +247,7 @@ public List<ConfigVerificationResult> verify(final ProcessContext context, final
try {
final FetchedBlob blob = fetchBlob(context, storage, attributes);

final CountingOutputStream out = new CountingOutputStream(NullOutputStream.NULL_OUTPUT_STREAM);
final CountingOutputStream out = new CountingOutputStream(NullOutputStream.INSTANCE);
IOUtils.copy(blob.contents, out);
final long byteCount = out.getByteCount();
results.add(new ConfigVerificationResult.Builder()
Expand Down Expand Up @@ -253,9 +281,6 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

final Storage storage = getCloudService();

final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L);
final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null);

try {
final FetchedBlob blob = fetchBlob(context, storage, flowFile.getAttributes());
flowFile = session.importFrom(blob.contents, flowFile);
Expand Down Expand Up @@ -328,7 +353,7 @@ private List<Storage.BlobSourceOption> getBlobSourceOptions(final ProcessContext
return blobSourceOptions;
}

private class FetchedBlob {
private static class FetchedBlob {
private final InputStream contents;
private final Blob blob;

Expand Down
Loading