-
Notifications
You must be signed in to change notification settings - Fork 2.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode) #6443
NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode) #6443
Conversation
43adf0d
to
fef7caf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @malthe!
Thank you for your contribution! The code looks nice, but there are a few things to consider. This feature is already supported by the ADLS processors, which can be used as an example. If it is possible, please use the same namings (property, strategy) that are used there. Also, it is worth considering that ADLS hasn't got OVERWRITE_IF_SOURCE_NEWER
equivalent. It doesn't mean it shouldn't have. Still, considering that the point of the current implementation is arguable (see my inline comment), I'd remove this option and add it in a separate pr for both azure processors.
...s-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageConflictStrategy.java
Outdated
Show resolved
Hide resolved
...ocessors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
Outdated
Show resolved
Hide resolved
...ocessors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
Outdated
Show resolved
Hide resolved
...ocessors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
Outdated
Show resolved
Hide resolved
...s-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageConflictStrategy.java
Outdated
Show resolved
Hide resolved
...ocessors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
Show resolved
Hide resolved
49ebdfa
to
5d149c9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for contributing this improvement @malthe. The general approach looks good, I noted a couple minor recommendations.
...main/java/org/apache/nifi/services/azure/storage/AzureStorageConflictResolutionStrategy.java
Outdated
Show resolved
Hide resolved
...ocessors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
Outdated
Show resolved
Hide resolved
...ocessors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
Outdated
Show resolved
Hide resolved
08789d4
to
15bd2b2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making the adjustments @malthe, this looks close to completion. I noted a couple minor recommendations and questions related to dependencies.
<dependency> | ||
<groupId>org.apache.nifi</groupId> | ||
<artifactId>nifi-standard-record-utils</artifactId> | ||
<version>1.18.0-SNAPSHOT</version> | ||
<scope>compile</scope> | ||
</dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a particular reason for the addition of this dependency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so – I have removed it now; must be an artifact of the coding process.
attributes.put(ATTR_NAME_BLOBNAME, blobName); | ||
attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl()); | ||
attributes.put(ATTR_NAME_LANG, null); | ||
attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any response property that could indicate the content type as opposed to hard-coding this value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell there is not – but this is the value you will get if you don't set it during upload and we currently do not support setting it.
(And if we did and the user had specified a value, we would know what it was.)
I think it's a reasonable bet that the default mime type will not be changed. It basically assumes it's binary if you don't specify something.
attributes.put(ATTR_NAME_CONTAINER, containerName); | ||
attributes.put(ATTR_NAME_BLOBNAME, blobName); | ||
attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl()); | ||
attributes.put(ATTR_NAME_LANG, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a null
here seems unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does but if this line is omitted we get:
org.opentest4j.AssertionFailedError: Attribute lang does not exist ==>
Expected :true
Actual :false
...-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java
Outdated
Show resolved
Hide resolved
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml
Outdated
Show resolved
Hide resolved
...main/java/org/apache/nifi/services/azure/storage/AzureStorageConflictResolutionStrategy.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the adjustments @malthe! I've added a few more comments, but I also think we are close to completion.
|
||
BlobClient blobClient = containerClient.getBlobClient(blobName); | ||
final BlobRequestConditions blobRequestConditions = new BlobRequestConditions(); | ||
Map<String, String> attributes = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These attributes were applied after a successful upload. Do I understand correctly that the intention now is to apply attributes to the failed FlowFile that already have value without triggering upload? Is this a requirement for a specific use case? I'm asking because I worry whether it could break an existing flow. For example, what happens if a downstream processor relies on the existence of one of these properties? What do you think, @exceptionfactory?
I'd still change this part slightly if the above is not an issue.
- I wouldn't assume the
MIME_TYPE
. I'd append it after a successful upload and investigate why the presence ofATTR_NAME_LANG
was needed in the failing test you linked in one of your comments. - Currently, the code is a bit verbose. Those attribute changes distract attention from the primary function, the upload itself. Therefore I'd extract this part to something like
applyCommonAttributes()
and lines 170-173 to something likeapplyUploadResultAttributes
. This is just an idea. If you know a better naming, go for it.
Sorry if I didn't notice it on the first review. Can it happen that this change came with the force push?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nandorsoma the attributes are not applied in case of an error; the code will not reach the session.putAllAttributes(flowFile, attributes)
in this case.
According to the documentation, the default mimetype is "application/octet-stream".
It would seem that ATTR_NAME_LANG
should be there exactly because of backwards compatibility. It is expected that if the upload succeeds, then that attribute is there and by default, equal to null.
In case of an error, the attribute will not be set (similar to above).
I'll take a look about cleaning up the code to make it less verbose around the important bits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I wasn't clear. It's my bad. I meant when execution jumps to the catch block and recovers in case of ignore
resolution. But meanwhile, I realized since ignore
is not the default behavior, it won't break anything.
Nevertheless, I still have issues with the two properties I mentioned previously. Applying mime.type
and lang
before the upload is not ok because it can fail, and then these values don't make sense. I think it's better to apply them after upload, and then we can get the values from the client, which is more precise.
The test you mentioned was written when ignore wasn't in the game; that's why it is probably failing. But I think we can modify it accordingly. Which test is that?
Also, applying the other three attributes before uploading could have a purpose. Still, if we don't have a use case for that, then I think it would be clearer to apply all attributes after a successful upload, like in the original version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you mean now – you're right, in the case where we're not replacing the blob, it makes no sense to set those attributes. Fixed in 438c451.
...s/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java
Outdated
Show resolved
Hide resolved
...ocessors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
Outdated
Show resolved
Hide resolved
...ocessors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
Outdated
Show resolved
Hide resolved
if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS) { | ||
getLogger().warn("Blob already exists: remote blob not modified. Transferring {} to success", flowFile); | ||
attributes.putAll(createBlobAttributesMap(blobClient)); | ||
attributes.put(ATTR_NAME_ERROR_CODE, e.getErrorCode().toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether it is misleading to apply ATTR_NAME_ERROR_CODE
in an expected situation. That we rely on an error code internally is irrelevant from the user's point of view. Probably I'd instead use the "ignored" property.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can always set ATTR_NAME_ERROR_CODE
in case of an error, but in addition, provide "ignored" (to "true" I suppose) in case of "BLOB_ALREADY_EXISTS".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think applying error code only makes sense if downstream processors can utilize it. Otherwise, we can get it from the log. I'd remove it, but I don't feel strongly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think it's rather nice to have it available in an attribute because in the log it is not isolated particularly well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @malthe! Thank you for your recent fixes. I've commented on our existing conversation. I've also found a few minor issues in the new code. Please see them below.
public static final String ATTR_NAME_ERROR_CODE = "azure.error.code"; | ||
public static final String ATTR_DESCRIPTION_ERROR_CODE = "Error code reported during blob operation"; | ||
|
||
public static final String ATTR_NAME_IGNORED = "ignored"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefix this attribute with azure.
to prevent attribute name collision with other processors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed in ddb9b0c.
...ocessors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
Outdated
Show resolved
Hide resolved
// If the blob already exists, we always add an attribute "ignored"; depending on the value of | ||
// conflict resolution, this will be true or false. | ||
if (alreadyExists) { | ||
session.putAttribute(flowFile, ATTR_NAME_IGNORED, String.valueOf(ignore)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is not valid. When the resolution is replace
, this attribute is not applied, though the blob can already exist. I'd only apply this attribute in case of ignore
resolution and set the value depending on the presence of the file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I have changed this in 4a822e1 now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @malthe!
Thank you for your recent changes! I forgot to mention previously that you need to start every commit message with the ticket number.
It was a bit hard to follow because there are already a lot of commits and comments, so if it's ok with you, I'd like to treat this review as a fresh start. The situation is better than it looks like from my suggestions. Half of it is import remove. But if you accept all of them, the result will be a complete commit, and the build won't fail on contrib-check. A little exception is the below code part which should be added manually to the AbstractAzureBlobProcessor_v12
.
protected Map<String, String> createBlobAttributesMap(BlobClient blobClient) {
final Map<String, String> attributes = new HashMap<>();
applyStandardBlobAttributes(attributes, blobClient);
applyBlobMetadata(attributes, blobClient);
return attributes;
}
protected void applyStandardBlobAttributes(Map<String, String> attributes, BlobClient blobClient) {
String primaryUri = String.format("%s/%s", blobClient.getContainerClient().getBlobContainerUrl(), blobClient.getBlobName());
attributes.put(ATTR_NAME_CONTAINER, blobClient.getContainerName());
attributes.put(ATTR_NAME_BLOBNAME, blobClient.getBlobName());
attributes.put(ATTR_NAME_PRIMARY_URI, primaryUri);
}
protected void applyBlobMetadata(Map<String, String> attributes, BlobClient blobClient) {
BlobProperties properties = blobClient.getProperties();
attributes.put(ATTR_NAME_ETAG, properties.getETag());
attributes.put(ATTR_NAME_BLOBTYPE, properties.getBlobType().toString());
attributes.put(ATTR_NAME_MIME_TYPE, properties.getContentType());
attributes.put(ATTR_NAME_LANG, properties.getContentLanguage());
attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(properties.getLastModified()));
attributes.put(ATTR_NAME_LENGTH, String.valueOf(properties.getBlobSize()));
}
The reason why these changes were needed:
- There are failing tests in
ITListAzureBlobStorage_v12
. - Attribute application logic is duplicated. If we move the "split apply" logic to the abstract class, we can deduplicate it.
- The hard-coded attribute values are not backward compatible, and fortunately, we can get them from the client.
- Error code is not needed as an attribute when the Conflict Resolution is
ignore
, since it is the expected behavior.
I hope this approach suits you and we can wrap up the pr soon. :)
...-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java
Show resolved
Hide resolved
...ocessors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
Outdated
Show resolved
Hide resolved
...ocessors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
Outdated
Show resolved
Hide resolved
...ocessors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
Show resolved
Hide resolved
...ocessors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
Outdated
Show resolved
Hide resolved
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI, | ||
String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, URLEncoder.encode( | ||
blobName, | ||
StandardCharsets.US_ASCII.name() | ||
).replace("+", "%20")) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI, | |
String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, URLEncoder.encode( | |
blobName, | |
StandardCharsets.US_ASCII.name() | |
).replace("+", "%20")) | |
); | |
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI, String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, blobName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This breaks real live code.
...s/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java
Outdated
Show resolved
Hide resolved
@@ -18,10 +18,12 @@ | |||
|
|||
import com.azure.storage.blob.BlobClient; | |||
import com.azure.storage.blob.BlobContainerClient; | |||
import com.azure.storage.blob.models.BlobErrorCode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't be needed when other suggestions are accepted.
import com.azure.storage.blob.models.BlobErrorCode; |
@@ -32,11 +34,12 @@ | |||
import java.util.Set; | |||
import java.util.stream.Collectors; | |||
|
|||
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_ERROR_CODE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't be needed when other suggestions are accepted.
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_ERROR_CODE; |
...essors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java
Outdated
Show resolved
Hide resolved
a620816
to
5163a3f
Compare
With this change, it's now possible to overwrite (replace) a blob or ignore the conflict (existing blob).
5163a3f
to
d896ead
Compare
Bump |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @malthe!
Thank you for your work. I have tested the current implementation, works as expected. I've found a minor thing, but I don't want to block the PR with that, so I'm approving!
try (InputStream rawIn = session.read(flowFile)) { | ||
final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn)); | ||
blobParallelUploadOptions.setRequestConditions(blobRequestConditions); | ||
Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't have to declare the response here because it is not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 4ed94f8.
38b7ff1
to
4ed94f8
Compare
@exceptionfactory this should be all good now. |
@malthe, if it is not mandatory, please try to avoid using force push. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working through the feedback @malthe, and thanks for the review @nandorsoma! The current version looks good, +1 merging
This closes apache#6443 Signed-off-by: David Handermann <exceptionfactory@apache.org>
Summary
NIFI-10491
This adds a three new conflict resolution strategy modes (fail, ignore and replace) – analogous to the
PutAzureDataLakeStorage
processor.Note that the default conflict resolution strategy "fail" corresponds to the current behavior.
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000
NIFI-00000
Pull Request Formatting
main
branchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
mvn clean install -P contrib-check
Licensing
LICENSE
andNOTICE
filesDocumentation