Skip to content

Commit

Permalink
Merge branch 'main' into MINIFICPP-2225
Browse files Browse the repository at this point in the history
  • Loading branch information
szaszm committed Oct 18, 2023
2 parents 538ef62 + b8acdd4 commit be6d6b9
Show file tree
Hide file tree
Showing 278 changed files with 2,399 additions and 2,375 deletions.
2 changes: 2 additions & 0 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,8 @@ In the list below, the names of required properties appear in bold. Any other pr
| Always Output Response | false | true<br/>false | Will force a response FlowFile to be generated and routed to the 'Response' relationship regardless of what the server status code received is |
| Penalize on "No Retry" | false | true<br/>false | Enabling this property will penalize FlowFiles that are routed to the "No Retry" relationship. |
| **Invalid HTTP Header Field Handling Strategy** | transform | fail<br/>transform<br/>drop | Indicates what should happen when an attribute's name is not a valid HTTP header field name. Options: transform - invalid characters are replaced, fail - flow file is transferred to failure, drop - drops invalid attributes from HTTP message |
| Upload Speed Limit | | | Maximum upload speed, e.g. '500 KB/s'. Leave this empty if you want no limit. |
| Download Speed Limit | | | Maximum download speed,e.g. '500 KB/s'. Leave this empty if you want no limit. |

### Relationships

Expand Down
61 changes: 0 additions & 61 deletions cmake/BundledLibUvc.cmake

This file was deleted.

25 changes: 25 additions & 0 deletions cmake/FetchUvc.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
include(FetchContent)
FetchContent_Declare(Uvc
URL https://github.com/libuvc/libuvc/archive/refs/tags/v0.0.7.tar.gz
URL_HASH SHA256=7c6ba79723ad5d0ccdfbe6cadcfbd03f9f75b701d7ba96631eb1fd929a86ee72
OVERRIDE_FIND_PACKAGE
)
FetchContent_MakeAvailable(Uvc)
2 changes: 1 addition & 1 deletion controller/MiNiFiController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ int main(int argc, char **argv) {
try {
socket_data.ssl_context_service = getSSLContextService(configuration);
} catch(const minifi::Exception& ex) {
logger->log_error(ex.what());
logger->log_error("{}", ex.what());
std::exit(1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from cluster.DockerTestCluster import DockerTestCluster

from minifi.validators.OutputValidator import OutputValidator
from minifi.validators.EmptyFilesOutPutValidator import EmptyFilesOutPutValidator
from minifi.validators.NoFileOutPutValidator import NoFileOutPutValidator
from minifi.validators.SingleFileOutputValidator import SingleFileOutputValidator
Expand All @@ -35,6 +36,7 @@
from minifi.validators.SingleOrMultiFileOutputRegexValidator import SingleOrMultiFileOutputRegexValidator
from minifi.validators.NoContentCheckFileNumberValidator import NoContentCheckFileNumberValidator
from minifi.validators.NumFileRangeValidator import NumFileRangeValidator
from minifi.validators.NumFileRangeAndFileSizeValidator import NumFileRangeAndFileSizeValidator
from minifi.validators.SingleJSONFileOutputValidator import SingleJSONFileOutputValidator
from utils import decode_escaped_str, get_minifi_pid, get_peak_memory_usage

Expand Down Expand Up @@ -244,11 +246,21 @@ def check_for_num_files_generated(self, num_flowfiles, timeout_seconds):
output_validator.set_output_dir(self.file_system_observer.get_output_dir())
self.__check_output(timeout_seconds, output_validator, max(1, num_flowfiles))

def check_for_num_file_range_generated(self, min_files, max_files, wait_time_in_seconds):
def check_for_num_file_range_generated_after_wait(self, min_files: int, max_files: int, wait_time_in_seconds: int):
output_validator = NumFileRangeValidator(min_files, max_files)
output_validator.set_output_dir(self.file_system_observer.get_output_dir())
self.__check_output_after_time_period(wait_time_in_seconds, output_validator)

def check_for_num_file_range_generated_with_timeout(self, min_files: int, max_files: int, timeout_in_seconds: int):
output_validator = NumFileRangeValidator(min_files, max_files)
output_validator.set_output_dir(self.file_system_observer.get_output_dir())
self.__check_output_over_time_period(timeout_in_seconds, output_validator)

def check_for_num_file_range_and_min_size_generated(self, min_files: int, max_files: int, min_size: int, wait_time_in_seconds: int):
output_validator = NumFileRangeAndFileSizeValidator(min_files, max_files, min_size)
output_validator.set_output_dir(self.file_system_observer.get_output_dir())
self.__check_output_over_time_period(wait_time_in_seconds, output_validator)

def check_for_an_empty_file_generated(self, timeout_seconds):
output_validator = EmptyFilesOutPutValidator()
output_validator.set_output_dir(self.file_system_observer.get_output_dir())
Expand All @@ -258,6 +270,17 @@ def __check_output_after_time_period(self, wait_time_in_seconds, output_validato
time.sleep(wait_time_in_seconds)
self.__validate(output_validator)

def __check_output_over_time_period(self, wait_time_in_seconds: int, output_validator: OutputValidator):
start_time = time.perf_counter()
while True:
assert not self.cluster.segfault_happened() or self.cluster.log_app_output()
if output_validator.validate():
return
time.sleep(1)
if wait_time_in_seconds < (time.perf_counter() - start_time):
break
assert output_validator.validate() or self.cluster.log_app_output()

def __check_output(self, timeout_seconds, output_validator, max_files=0):
result = self.file_system_observer.validate_output(timeout_seconds, output_validator, max_files)
assert not self.cluster.segfault_happened() or self.cluster.log_app_output()
Expand Down
41 changes: 41 additions & 0 deletions docker/test/integration/features/http.feature
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,44 @@ Feature: Sending data using InvokeHTTP to a receiver using ListenHTTP

When both instances start up
Then at least one flowfile with the content "test" is placed in the monitored directory in less than 120 seconds

Scenario: A MiNiFi instance transfers data to another MiNiFi instance with message body and limited speed
Given a GenerateFlowFile processor with the "File Size" property set to "10 MB"
And the scheduling period of the GenerateFlowFile processor is set to "30 sec"
And a InvokeHTTP processor with the "Remote URL" property set to "http://secondary-${feature_id}:8080/contentListener"
And the "HTTP Method" property of the InvokeHTTP processor is set to "POST"
And the "Connection Timeout" property of the InvokeHTTP processor is set to "30 s"
And the "Read Timeout" property of the InvokeHTTP processor is set to "30 s"
And the "Upload Speed Limit" property of the InvokeHTTP processor is set to "800 KB/s"
And the "success" relationship of the GenerateFlowFile processor is connected to the InvokeHTTP

And a ListenHTTP processor with the "Listening Port" property set to "8080" in a "secondary" flow
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "secondary" flow
And the "success" relationship of the ListenHTTP processor is connected to the PutFile

When both instances start up
Then at least one flowfile with minimum size of "1 MB" is placed in the monitored directory in less than 120 seconds
And the Minifi logs contain the following message: "[warning] InvokeHTTP::onTrigger has been running for" in less than 10 seconds

Scenario: A MiNiFi instance retrieves data from another MiNiFi instance with message body and limited speed
Given a InvokeHTTP processor with the "Remote URL" property set to "http://secondary-${feature_id}:8080/contentListener&testfile"
And the scheduling period of the InvokeHTTP processor is set to "3 sec"
And the "HTTP Method" property of the InvokeHTTP processor is set to "GET"
And the "Connection Timeout" property of the InvokeHTTP processor is set to "30 s"
And the "Read Timeout" property of the InvokeHTTP processor is set to "30 s"
And the "Download Speed Limit" property of the InvokeHTTP processor is set to "800 KB/s"
And a PutFile processor with the "Directory" property set to "/tmp/output"
And the "response" relationship of the InvokeHTTP processor is connected to the PutFile

And a GenerateFlowFile processor with the "File Size" property set to "10 MB" in the "secondary" flow
And the "Data Format" property of the InvokeHTTP processor is set to "Text"
And a UpdateAttribute processor with the "http.type" property set to "response_body" in the "secondary" flow
And the "filename" property of the UpdateAttribute processor is set to "testfile"
And the scheduling period of the GenerateFlowFile processor is set to "30 sec"
And a ListenHTTP processor with the "Listening Port" property set to "8080" in a "secondary" flow
And the "success" relationship of the GenerateFlowFile processor is connected to the UpdateAttribute
And the "success" relationship of the UpdateAttribute processor is connected to the ListenHTTP

When both instances start up
Then at least one flowfile with minimum size of "10 MB" is placed in the monitored directory in less than 120 seconds
And the Minifi logs contain the following message: "[warning] InvokeHTTP::onTrigger has been running for" in less than 10 seconds
9 changes: 7 additions & 2 deletions docker/test/integration/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,12 @@ def step_impl(context, num_flowfiles, duration):

@then("at least one flowfile is placed in the monitored directory in less than {duration}")
def step_impl(context, duration):
context.test.check_for_num_file_range_generated(1, float('inf'), humanfriendly.parse_timespan(duration))
context.test.check_for_num_file_range_generated_with_timeout(1, float('inf'), humanfriendly.parse_timespan(duration))


@then("at least one flowfile with minimum size of \"{size}\" is placed in the monitored directory in less than {duration}")
def step_impl(context, duration: str, size: str):
context.test.check_for_num_file_range_and_min_size_generated(1, float('inf'), humanfriendly.parse_size(size), humanfriendly.parse_timespan(duration))


@then("one flowfile with the contents \"{content}\" is placed in the monitored directory in less than {duration}")
Expand All @@ -830,7 +835,7 @@ def step_impl(context, duration, contents):

@then("after a wait of {duration}, at least {lower_bound:d} and at most {upper_bound:d} flowfiles are produced and placed in the monitored directory")
def step_impl(context, lower_bound, upper_bound, duration):
context.test.check_for_num_file_range_generated(lower_bound, upper_bound, humanfriendly.parse_timespan(duration))
context.test.check_for_num_file_range_generated_after_wait(lower_bound, upper_bound, humanfriendly.parse_timespan(duration))


@then("{number_of_files:d} flowfiles are placed in the monitored directory in {duration}")
Expand Down
14 changes: 14 additions & 0 deletions docker/test/integration/minifi/validators/FileOutputValidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,19 @@ def get_num_files(dir_path):
files_found += 1
return files_found

@staticmethod
def get_num_files_with_min_size(dir_path: str, min_size: int):
listing = listdir(dir_path)
logging.info("Num files in %s: %d", dir_path, len(listing))
if not listing:
return 0
files_found = 0
for file_name in listing:
full_path = join(dir_path, file_name)
if os.path.isfile(full_path) and not is_temporary_output_file(full_path) and os.path.getsize(full_path) >= min_size:
logging.info("Found output file in %s: %s", dir_path, file_name)
files_found += 1
return files_found

def validate(self, dir=''):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os

from .FileOutputValidator import FileOutputValidator


class NumFileRangeAndFileSizeValidator(FileOutputValidator):
def __init__(self, min_files: int, max_files: int, min_size: int):
self.min_files = min_files
self.max_files = max_files
self.min_size = min_size

def validate(self):
full_dir = os.path.join(self.output_dir)
logging.info("Output folder: %s", full_dir)

if not os.path.isdir(full_dir):
return False

num_files = self.get_num_files_with_min_size(full_dir, self.min_size)
logging.info("Number of files with min size %d generated: %d", self.min_size, num_files)
return self.min_files <= num_files and num_files <= self.max_files
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ def validate(self):
return False

num_files = self.get_num_files(full_dir)
logging.info("Number of files generated: %d", num_files)
return self.min_files <= num_files and num_files <= self.max_files
2 changes: 1 addition & 1 deletion examples/http_post_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Processors:
FlowFiles To Log: 0
Log Level: warn
Log Prefix: '=== Failed HTTP request ==='
Maxumim Payload Line Length: 0
Maximum Payload Line Length: 0
Connections:
- name: GetFile/success/InvokeHTTP
source id: 962790e7-ea35-4096-9362-96f527288669
Expand Down
8 changes: 4 additions & 4 deletions extensions/aws/processors/DeleteS3Object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ std::optional<aws::s3::DeleteObjectRequestParameters> DeleteS3Object::buildDelet
logger_->log_error("No Object Key is set and default object key 'filename' attribute could not be found!");
return std::nullopt;
}
logger_->log_debug("DeleteS3Object: Object Key [%s]", params.object_key);
logger_->log_debug("DeleteS3Object: Object Key [{}]", params.object_key);

context->getProperty(Version, params.version, flow_file);
logger_->log_debug("DeleteS3Object: Version [%s]", params.version);
logger_->log_debug("DeleteS3Object: Version [{}]", params.version);

params.bucket = common_properties.bucket;
params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
Expand Down Expand Up @@ -73,10 +73,10 @@ void DeleteS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &cont
}

if (s3_wrapper_.deleteObject(*params)) {
logger_->log_debug("Successfully deleted S3 object '%s' from bucket '%s'", params->object_key, common_properties->bucket);
logger_->log_debug("Successfully deleted S3 object '{}' from bucket '{}'", params->object_key, common_properties->bucket);
session->transfer(flow_file, Success);
} else {
logger_->log_error("Failed to delete S3 object '%s' from bucket '%s'", params->object_key, common_properties->bucket);
logger_->log_error("Failed to delete S3 object '{}' from bucket '{}'", params->object_key, common_properties->bucket);
session->transfer(flow_file, Failure);
}
}
Expand Down
Loading

0 comments on commit be6d6b9

Please sign in to comment.