Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,10 @@ protected File getPersistenceFile() {
return new File(this.tempDirMultipart + File.separator + getIdentifier());
}

protected String buildCacheKey(final String bucket, final String key, final String uuid) {
return getIdentifier() + "/" + bucket + "/" + key + "/" + uuid;
}

protected boolean localUploadExistsInS3(final S3Client client, final String bucket, final MultipartState localState) {
final ListMultipartUploadsRequest listRequest = ListMultipartUploadsRequest.builder()
.bucket(bucket)
Expand Down Expand Up @@ -546,7 +550,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

final String bucket = context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final String cacheKey = getIdentifier() + "/" + bucket + "/" + key;
final String cacheKey = buildCacheKey(bucket, key, flowFile.getAttribute(CoreAttributes.UUID.key()));

final Map<String, String> attributes = new HashMap<>();
final String ffFilename = flowFile.getAttributes().get(CoreAttributes.FILENAME.key());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -291,4 +293,22 @@ void testMigration() {

expectedRenamed.forEach((key, value) -> assertEquals(value, propertyMigrationResult.getPropertiesRenamed().get(key)));
}

@Test
void testBuildCacheKeyIncludesFlowFileUUID() {
final String bucket = "test-bucket";
final String key = "data/file.csv";
final String uuid1 = UUID.randomUUID().toString();
final String uuid2 = UUID.randomUUID().toString();

final String cacheKey1 = putS3Object.buildCacheKey(bucket, key, uuid1);
final String cacheKey2 = putS3Object.buildCacheKey(bucket, key, uuid2);
final String cacheKey1Retry = putS3Object.buildCacheKey(bucket, key, uuid1);

assertNotEquals(cacheKey1, cacheKey2,
"FlowFiles with different UUIDs sharing the same bucket and key must use distinct cache keys to prevent multipart state collision");
assertEquals(cacheKey1, cacheKey1Retry,
"Same FlowFile UUID must produce the same cache key so interrupted uploads can be resumed");
}

}
Loading