Skip to content

Commit

Permalink
NIFI-11117 Remove folder creation from PutGoogleDrive
Browse files Browse the repository at this point in the history
This closes #6910.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
  • Loading branch information
krisztina-zsihovszki authored and turcsanyip committed Jan 31, 2023
1 parent 5daf01e commit 92ccb79
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 252 deletions.
5 changes: 5 additions & 0 deletions nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@
<version>1.20.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-conflict-resolution</artifactId>
<version>1.20.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private File fetchFileMetadata(String fileId) throws IOException {
}

private void handleErrorResponse(ProcessSession session, String fileId, FlowFile flowFile, GoogleJsonResponseException e) {
getLogger().error("Couldn't fetch file with id '{}'", fileId, e);
getLogger().error("Fetching File [{}] failed", fileId, e);

flowFile = session.putAttribute(flowFile, ERROR_CODE, "" + e.getStatusCode());
flowFile = session.putAttribute(flowFile, ERROR_MESSAGE, e.getMessage());
Expand All @@ -193,7 +193,7 @@ private void handleErrorResponse(ProcessSession session, String fileId, FlowFile
}

private void handleUnexpectedError(ProcessSession session, FlowFile flowFile, String fileId, Exception e) {
getLogger().error("Unexpected error while fetching and processing file with id '{}'", fileId, e);
getLogger().error("Fetching File [{}] failed", fileId, e);

flowFile = session.putAttribute(flowFile, ERROR_MESSAGE, e.getMessage());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.joining;
import static org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR;
import static org.apache.nifi.processor.util.StandardValidators.createRegexMatchingValidator;
import static org.apache.nifi.processors.conflict.resolution.ConflictResolutionStrategy.FAIL;
import static org.apache.nifi.processors.conflict.resolution.ConflictResolutionStrategy.IGNORE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
Expand Down Expand Up @@ -66,7 +67,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
Expand All @@ -90,6 +90,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.conflict.resolution.ConflictResolutionStrategy;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.json.JSONObject;
Expand All @@ -108,34 +109,19 @@
@WritesAttribute(attribute = ERROR_CODE, description = ERROR_CODE_DESC),
@WritesAttribute(attribute = ERROR_MESSAGE, description = ERROR_MESSAGE_DESC)})
public class PutGoogleDrive extends AbstractProcessor implements GoogleDriveTrait {

public static final String IGNORE_RESOLUTION = "ignore";
public static final String REPLACE_RESOLUTION = "replace";
public static final String FAIL_RESOLUTION = "fail";
public static final int MIN_ALLOWED_CHUNK_SIZE_IN_BYTES = MediaHttpUploader.MINIMUM_CHUNK_SIZE;
public static final int MAX_ALLOWED_CHUNK_SIZE_IN_BYTES = 1024 * 1024 * 1024;

public static final PropertyDescriptor FOLDER_ID = new PropertyDescriptor.Builder()
.name("folder-id")
.displayName("Folder ID")
.description("The ID of the shared folder. " +
.description("The ID of the shared folder." +
" Please see Additional Details to set up access to Google Drive and obtain Folder ID.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();

public static final PropertyDescriptor SUBFOLDER_NAME = new PropertyDescriptor.Builder()
.name("subfolder-name")
.displayName("Subfolder Name")
.description("The name (path) of the subfolder where files are uploaded. The subfolder name is relative to the shared folder specified by 'Folder ID'."
+ " Example: subFolder, subFolder1/subfolder2")
.addValidator(createRegexMatchingValidator(Pattern.compile("^(?!/).+(?<!/)$"), false,
"Subfolder Name should not contain leading or trailing slash ('/') character."))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();

public static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder()
.name("file-name")
.displayName("Filename")
Expand All @@ -146,26 +132,13 @@ public class PutGoogleDrive extends AbstractProcessor implements GoogleDriveTrai
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

public static final PropertyDescriptor CREATE_SUBFOLDER = new PropertyDescriptor.Builder()
.name("create-subfolder")
.displayName("Create Subfolder")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("false")
.dependsOn(SUBFOLDER_NAME)
.description("Specifies whether to automatically create the subfolder specified by 'Folder Name' if it does not exist. " +
"Permission to list folders is required. ")
.build();

public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
.name("conflict-resolution-strategy")
.displayName("Conflict Resolution Strategy")
.description("Indicates what should happen when a file with the same name already exists in the specified Google Drive folder.")
.required(true)
.defaultValue(FAIL_RESOLUTION)
.allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION, REPLACE_RESOLUTION)
.defaultValue(FAIL.getValue())
.allowableValues(ConflictResolutionStrategy.class)
.build();

public static final PropertyDescriptor CHUNKED_UPLOAD_SIZE = new PropertyDescriptor.Builder()
Expand All @@ -190,8 +163,6 @@ public class PutGoogleDrive extends AbstractProcessor implements GoogleDriveTrai
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(asList(
GCP_CREDENTIALS_PROVIDER_SERVICE,
FOLDER_ID,
SUBFOLDER_NAME,
CREATE_SUBFOLDER,
FILE_NAME,
CONFLICT_RESOLUTION,
CHUNKED_UPLOAD_THRESHOLD,
Expand Down Expand Up @@ -260,15 +231,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
return;
}

String folderId = context.getProperty(FOLDER_ID).evaluateAttributeExpressions(flowFile).getValue();
final String subfolderName = context.getProperty(SUBFOLDER_NAME).evaluateAttributeExpressions(flowFile).getValue();
final boolean createFolder = context.getProperty(CREATE_SUBFOLDER).asBoolean();
final String folderId = context.getProperty(FOLDER_ID).evaluateAttributeExpressions(flowFile).getValue();
final String filename = context.getProperty(FILE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());

try {
folderId = subfolderName != null ? getOrCreateParentSubfolder(subfolderName, folderId, createFolder).getId() : folderId;

final long startNanos = System.nanoTime();
final long size = flowFile.getSize();

Expand All @@ -280,20 +247,19 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
.asDataSize(DataUnit.B)
.intValue();

final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();

final ConflictResolutionStrategy conflictResolution = ConflictResolutionStrategy.forValue(context.getProperty(CONFLICT_RESOLUTION).getValue());
final Optional<File> alreadyExistingFile = checkFileExistence(filename, folderId);
final File fileMetadata = alreadyExistingFile.isPresent() ? alreadyExistingFile.get() : createMetadata(filename, folderId);
final File fileMetadata = alreadyExistingFile.orElseGet(() -> createMetadata(filename, folderId));

if (alreadyExistingFile.isPresent() && FAIL_RESOLUTION.equals(conflictResolution)) {
getLogger().error("File '{}' already exists in {} folder, conflict resolution is '{}'", filename, getFolderName(subfolderName), FAIL_RESOLUTION);
if (alreadyExistingFile.isPresent() && conflictResolution == FAIL) {
getLogger().error("File [{}] already exists in [{}] Folder, conflict resolution is [{}]", filename, folderId, FAIL.getDisplayName());
flowFile = addAttributes(alreadyExistingFile.get(), flowFile, session);
session.transfer(flowFile, REL_FAILURE);
return;
}

if (alreadyExistingFile.isPresent() && IGNORE_RESOLUTION.equals(conflictResolution)) {
getLogger().info("File '{}' already exists in {} folder, conflict resolution is '{}'", filename, getFolderName(subfolderName), IGNORE_RESOLUTION);
if (alreadyExistingFile.isPresent() && conflictResolution == IGNORE) {
getLogger().info("File [{}] already exists in [{}] Folder, conflict resolution is [{}]", filename, folderId, IGNORE.getDisplayName());
flowFile = addAttributes(alreadyExistingFile.get(), flowFile, session);
session.transfer(flowFile, REL_SUCCESS);
return;
Expand Down Expand Up @@ -323,12 +289,12 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}
session.transfer(flowFile, REL_SUCCESS);
} catch (GoogleJsonResponseException e) {
getLogger().error("Exception occurred while uploading file '{}' to {} Google Drive folder", filename,
getFolderName(subfolderName), e);
getLogger().error("Exception occurred while uploading File [{}] to [{}] Google Drive Folder", filename,
folderId, e);
handleExpectedError(session, flowFile, e);
} catch (Exception e) {
getLogger().error("Exception occurred while uploading file '{}' to {} Google Drive folder", filename,
getFolderName(subfolderName), e);
getLogger().error("Exception occurred while uploading File [{}] to [{}] Google Drive Folder", filename,
folderId, e);

if (e.getCause() != null && e.getCause() instanceof GoogleJsonResponseException) {
handleExpectedError(session, flowFile, (GoogleJsonResponseException) e.getCause());
Expand All @@ -354,10 +320,6 @@ private FlowFile addAttributes(File file, FlowFile flowFile, ProcessSession sess
return session.putAllAttributes(flowFile, attributes);
}

private String getFolderName(String subFolderName) {
return subFolderName == null ? "shared" : format("'%s'", subFolderName);
}

private DriveRequest<File> createDriveRequest(File fileMetadata, final InputStreamContent mediaContent) throws IOException {
if (fileMetadata.getId() == null) {
return driveService.files()
Expand All @@ -384,7 +346,8 @@ private File uploadFileInChunks(DriveRequest<File> driveRequest, File fileMetada
fileMetadata.setSize(mediaContent.getLength());
return fileMetadata;
} else {
throw new ProcessException(format("Upload of file '%s' to folder '%s' failed, HTTP error code: %d", fileMetadata.getName(), fileMetadata.getId(), response.getStatusCode()));
throw new ProcessException(format("Upload of File [%s] to Folder [%s] failed, HTTP error code: [%d]",
fileMetadata.getName(), fileMetadata.getId(), response.getStatusCode()));
}
}

Expand All @@ -395,63 +358,17 @@ private String getUploadedFileId(final InputStream content) {
return new JSONObject(contentAsString).getString("id");
}

private File getOrCreateParentSubfolder(String folderName, String parentFolderId, boolean createFolder) throws IOException {
final int indexOfPathSeparator = folderName.indexOf("/");

if (isMultiLevelFolder(indexOfPathSeparator, folderName)) {
final String mainFolderName = folderName.substring(0, indexOfPathSeparator);
final String subFolders = folderName.substring(indexOfPathSeparator + 1);
final File mainFolder = getOrCreateFolder(mainFolderName, parentFolderId, createFolder);
return getOrCreateParentSubfolder(subFolders, mainFolder.getId(), createFolder);
} else {
return getOrCreateFolder(folderName, parentFolderId, createFolder);
}
}

private boolean isMultiLevelFolder(int indexOfPathSeparator, String folderName) {
return indexOfPathSeparator > 0 && indexOfPathSeparator < folderName.length() - 1;
}

private File getOrCreateFolder(String folderName, String parentFolderId, boolean createFolder) throws IOException {
final Optional<File> existingFolder = checkFolderExistence(folderName, parentFolderId);

if (existingFolder.isPresent()) {
return existingFolder.get();
}

if (createFolder) {
getLogger().debug("Create folder " + folderName + " parent id: " + parentFolderId);
final File folderMetadata = createMetadata(folderName, parentFolderId);
folderMetadata.setMimeType(DRIVE_FOLDER_MIME_TYPE);

return driveService.files()
.create(folderMetadata)
.setFields("id, parents")
.execute();
} else {
throw new ProcessException(format("The specified subfolder '%s' does not exist and '%s' is false.", folderName, CREATE_SUBFOLDER.getDisplayName()));
}
}

private File createMetadata(final String name, final String parentId) {
final File metadata = new File();
metadata.setName(name);
metadata.setParents(singletonList(parentId));
return metadata;
}

private Optional<File> checkFolderExistence(String folderName, String parentId) throws IOException {
return checkObjectExistence(format("mimeType='%s' and name='%s' and ('%s' in parents)", DRIVE_FOLDER_MIME_TYPE, folderName, parentId));
}

private Optional<File> checkFileExistence(String fileName, String parentId) throws IOException {
return checkObjectExistence(format("name='%s' and ('%s' in parents)", fileName, parentId));
}

private Optional<File> checkObjectExistence(String query) throws IOException {
final FileList result = driveService.files()
.list()
.setQ(query)
.setQ(format("name='%s' and ('%s' in parents)", fileName, parentId))
.setFields("files(name, id)")
.execute();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public FetchGoogleDrive createTestSubject() {

@Test
void testFetch() throws Exception {
// GIVEN
File file = createFileWithDefaultContent("test_file.txt", mainFolderId);

Map<String, String> inputFlowFileAttributes = new HashMap<>();
Expand All @@ -54,11 +53,9 @@ void testFetch() throws Exception {
HashSet<Map<String, String>> expectedAttributes = new HashSet<>(singletonList(inputFlowFileAttributes));
List<String> expectedContent = singletonList(DEFAULT_FILE_CONTENT);

// WHEN
testRunner.enqueue("unimportant_data", inputFlowFileAttributes);
testRunner.run();

// THEN
testRunner.assertTransferCount(FetchGoogleDrive.REL_FAILURE, 0);

checkAttributes(FetchGoogleDrive.REL_SUCCESS, expectedAttributes);
Expand All @@ -67,7 +64,6 @@ void testFetch() throws Exception {

@Test
void testInputFlowFileReferencesMissingFile() {
// GIVEN
Map<String, String> inputFlowFileAttributes = new HashMap<>();
inputFlowFileAttributes.put(GoogleDriveAttributes.ID, "missing");
inputFlowFileAttributes.put(GoogleDriveAttributes.FILENAME, "missing_filename");
Expand All @@ -80,24 +76,20 @@ void testInputFlowFileReferencesMissingFile() {
}}
));

// WHEN
testRunner.enqueue("unimportant_data", inputFlowFileAttributes);
testRunner.run();

// THEN
testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);
checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes);
}

@Test
void testInputFlowFileThrowsExceptionBeforeFetching() throws Exception {
// GIVEN
File file = createFileWithDefaultContent("test_file.txt", mainFolderId);

Map<String, String> inputFlowFileAttributes = new HashMap<>();
inputFlowFileAttributes.put(GoogleDriveAttributes.ID, file.getId());
inputFlowFileAttributes.put(GoogleDriveAttributes.FILENAME, file.getName());

MockFlowFile input = new MockFlowFile(1) {
final AtomicBoolean throwException = new AtomicBoolean(true);

Expand All @@ -111,7 +103,6 @@ public boolean isPenalized() {
return super.isPenalized();
}
}

@Override
public Map<String, String> getAttributes() {
return inputFlowFileAttributes;
Expand All @@ -122,11 +113,9 @@ public Map<String, String> getAttributes() {
inputFlowFileAttributes
));

// WHEN
testRunner.enqueue(input);
testRunner.run();

// THEN
testRunner.assertTransferCount(FetchGoogleDrive.REL_SUCCESS, 0);

checkAttributes(FetchGoogleDrive.REL_FAILURE, expectedFailureAttributes);
Expand Down

0 comments on commit 92ccb79

Please sign in to comment.