-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MINIFICPP-1396 Create FetchS3Object processor #970
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good, some minor comments.
extensions/aws/s3/S3WrapperBase.cpp
Outdated
@@ -68,13 +75,13 @@ void S3WrapperBase::setCannedAcl(Aws::S3::Model::PutObjectRequest& request, cons | |||
request.SetACL(CANNED_ACL_MAP.at(canned_acl)); | |||
} | |||
|
|||
std::string S3WrapperBase::getExpiryDate(const std::string& expiration) { | |||
std::pair<std::string, std::string> S3WrapperBase::getExpirationPair(const std::string& expiration) { | |||
static const std::regex expr = std::regex("expiry-date=\"(.*)\", rule-id=\"(.*)\""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we use the Regex
class defined in RegexUtils.h
? (that contains some "magic" for <GCC 4.9
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good shout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 8ddbc42
extensions/aws/s3/S3WrapperBase.cpp
Outdated
@@ -68,13 +75,13 @@ void S3WrapperBase::setCannedAcl(Aws::S3::Model::PutObjectRequest& request, cons | |||
request.SetACL(CANNED_ACL_MAP.at(canned_acl)); | |||
} | |||
|
|||
std::string S3WrapperBase::getExpiryDate(const std::string& expiration) { | |||
std::pair<std::string, std::string> S3WrapperBase::getExpirationPair(const std::string& expiration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even though it is only used twice, we could create a struct
(given how both members of the pair are of the same type)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in ad6ee95
fd2e4fa
to
ad6ee95
Compare
return false; | ||
} | ||
get_object_params_.bucket = std::move(bucket_); | ||
get_object_params_.object_key = std::move(object_key_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems like object_key_
and bucket_
are a processor members, and we read/write them from possibly multiple concurrent onTrigger
s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same goes for, I think, aws_credentials_provider_
and s3_wrapper_
where we call S3WrapperBase::setProxy
and S3WrapperBase::setCredentials
from the onTrigger
s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please elaborate on this a bit more? I am not that familiar with the scheduling of the onTrigger calls. Are you saying that it is possible that the same processor's onTrigger call can be scheduled on separate threads in the threadpool and depending on the timing they might overlap causing the concurrency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this can happen depending on configurations.
In most of the processors we just prevent this by having a mutex for the whole ontrigger call, in this case we might just protect some common resources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, as far as I know, onSchedule
is guaranteed to be called once for a processor, but multiple onTrigger
s can run concurrently on multiple threads, the number of such onTrigger
s is controlled by the max concurrent tasks
property of the processor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
of course we could always just ignore this property and override getMaxConcurrentTasks
to return 1
(or the mutex @arpadboda mentioned), but I don't (yet) think there is an inherent reason to do so, for this processor (family)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the info and great catch! I removed the members that were modified in the onTrigger calls and added a mutex for the S3Wrapper calls to guarantee that the S3 configuration is not changed between AWS calls. Change is done in e59580a
|Secret Key|||AWS account secret key<br/>**Supports Expression Language: true**| | ||
|Credentials File|||Path to a file containing AWS access key and secret key in properties file format. Properties used: accessKey and secretKey| | ||
|AWS Credentials Provider service|||The name of the AWS Credentials Provider controller service that is used to obtain AWS credentials.| | ||
|**Region**|us-west-2|af-south-1<br/>ap-east-1<br/>ap-northeast-1<br/>ap-northeast-2<br/>ap-northeast-3<br/>ap-south-1<br/>ap-southeast-1<br/>ap-southeast-2<br/>ca-central-1<br/>cn-north-1<br/>cn-northwest-1<br/>eu-central-1<br/>eu-north-1<br/>eu-south-1<br/>eu-west-1<br/>eu-west-2<br/>eu-west-3<br/>me-south-1<br/>sa-east-1<br/>us-east-1<br/>us-east-2<br/>us-gov-east-1<br/>us-gov-west-1<br/>us-west-1<br/>us-west-2|AWS Region| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I adjusted the cell width with a fix but scalable div in 9417f98. The region values should not wrap anymore.
core::Relationship("success", "d"), | ||
true); | ||
char output_dir_mask[] = "/tmp/gt.XXXXXX"; | ||
output_dir = test_controller.createTempDirectory(output_dir_mask); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can get rid of the mask by using:
output_dir = utils::createTempDir(&testController);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in 7b57ab6
TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test default properties", "[awsS3Config]") { | ||
setRequiredProperties(); | ||
test_controller.runSession(plan, true); | ||
REQUIRE(LogTestController::getInstance().contains("key:s3.bucket value:" + S3_BUCKET)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor but contains
is slow. utils::verifyLogLinePresenceInPollTime
should be a better choice here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced it in 840a645.
REQUIRE(mock_s3_wrapper_ptr->getCredentials().GetAWSSecretKey() == "secret"); | ||
} | ||
|
||
TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test required property not set", "[awsS3Config]") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 for proper negative tests
std::ifstream s3_file(output_dir + "/" + INPUT_FILENAME); | ||
std::string output((std::istreambuf_iterator<char>(s3_file)), std::istreambuf_iterator<char>()); | ||
REQUIRE(output.empty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably a good candidate to extract to FileUtils.h
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extraced in 6097258
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, only minor suggestions added.
Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically main)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.