Skip to content

Commit af585c2

Browse files
adamdebreceniszaszm
authored andcommitted
MINIFICPP-2705 - Configurable timeout, download assets directly to disk
Closes #2088 Signed-off-by: Marton Szasz <szaszm@apache.org>
1 parent 3c83d84 commit af585c2

File tree

17 files changed

+200
-73
lines changed

17 files changed

+200
-73
lines changed

C2.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ be requested via C2 DESCRIBE manifest command.
115115
# specify the maximum number of bulletins to send in a heartbeat
116116
# nifi.c2.flow.info.processor.bulletin.limit=1000
117117

118+
# Specify timeout for asset download operations. The entire download must
119+
# finish in the specified amount of time. There is a separate fixed 30 second
120+
# timeout from the last received data packet.
121+
# setting to 0 disables the timeout (default)
122+
nifi.c2.asset.download.timeout=0s
123+
118124
#### Flow Id and URL
119125

120126
Flow id and URL are usually retrieved from the C2 server. These identify the last updated flow version and where the flow was downloaded from. These properties are persisted in the minifi.properties file.

conf/minifi.properties.in

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,12 @@ nifi.c2.full.heartbeat=false
127127
# specify the maximum number of bulletins to send in a heartbeat
128128
#nifi.c2.flow.info.processor.bulletin.limit=1000
129129

130+
# Specify timeout for asset download operations. The entire download must
131+
# finish in the specified amount of time. There is a separate fixed 30 second
132+
# timeout from the last received data packet.
133+
# setting to 0 disables the timeout (default)
134+
# nifi.c2.asset.download.timeout=0s
135+
130136
## enable the controller socket provider on port 9998
131137
## off by default.
132138
#controller.socket.enable=true

core-framework/include/http/BaseHTTPClient.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,23 @@ class HTTPUploadStreamContentsCallback : public HTTPUploadCallback {
106106
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<HTTPUploadStreamContentsCallback>::getLogger();
107107
};
108108

109-
class HTTPReadCallback : public utils::ByteOutputCallback {
109+
class HTTPReadCallback {
110110
public:
111-
using ByteOutputCallback::ByteOutputCallback;
111+
virtual bool process(std::span<const char> data) = 0;
112+
virtual ~HTTPReadCallback() = default;
112113

113114
std::atomic<bool> stop = false;
115+
};
116+
117+
class HTTPReadByteOutputCallback : public HTTPReadCallback, public utils::ByteOutputCallback {
118+
public:
119+
using ByteOutputCallback::ByteOutputCallback;
120+
121+
bool process(std::span<const char> data) override {
122+
ByteOutputCallback::write(data.data(), data.size());
123+
return true;
124+
}
125+
114126
std::atomic<size_t> pos = 0;
115127
};
116128

core-framework/include/http/HTTPClient.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ class HTTPClient : public BaseHTTPClient, public core::ConnectableImpl {
9999

100100
void setReadTimeout(std::chrono::milliseconds timeout) override;
101101

102+
void setAbsoluteTimeout(std::optional<std::chrono::milliseconds> timeout) {
103+
absolute_timeout_ = timeout;
104+
}
105+
102106
void setUploadCallback(std::unique_ptr<HTTPUploadCallback> callback) override;
103107

104108
void setReadCallback(std::unique_ptr<HTTPReadCallback> callback);
@@ -226,16 +230,15 @@ class HTTPClient : public BaseHTTPClient, public core::ConnectableImpl {
226230

227231
void configure_secure_connection();
228232

229-
std::chrono::milliseconds getAbsoluteTimeout() const { return 3*read_timeout_; }
230-
231-
HTTPReadCallback content_{std::numeric_limits<size_t>::max()};
233+
HTTPReadByteOutputCallback content_{std::numeric_limits<size_t>::max()};
232234

233235
std::shared_ptr<minifi::controllers::SSLContextServiceInterface> ssl_context_service_;
234236
std::string url_;
235237
std::optional<http::HttpRequestMethod> method_;
236238

237239
std::chrono::milliseconds connect_timeout_{std::chrono::seconds(30)};
238240
std::chrono::milliseconds read_timeout_{std::chrono::seconds(30)};
241+
std::optional<std::chrono::milliseconds> absolute_timeout_;
239242

240243
HTTPResponseData response_data_;
241244

core-framework/include/http/HTTPStream.h

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,9 @@ class HttpStream : public io::BaseStreamImpl {
115115

116116
inline bool isFinished(int seconds = 0) {
117117
return http_client_future_.wait_for(std::chrono::seconds(seconds)) == std::future_status::ready
118-
&& http_client_->getReadCallback()
119-
&& http_client_->getReadCallback()->getSize() == 0
120-
&& http_client_->getReadCallback()->waitingOps();
118+
&& getByteOutputReadCallback()
119+
&& getByteOutputReadCallback()->getSize() == 0
120+
&& getByteOutputReadCallback()->waitingOps();
121121
}
122122

123123
/**
@@ -127,11 +127,11 @@ class HttpStream : public io::BaseStreamImpl {
127127
do {
128128
logger_->log_trace("Waiting for more data");
129129
} while (http_client_future_.wait_for(std::chrono::seconds(0)) != std::future_status::ready
130-
&& http_client_->getReadCallback()
131-
&& http_client_->getReadCallback()->getSize() == 0);
130+
&& getByteOutputReadCallback()
131+
&& getByteOutputReadCallback()->getSize() == 0);
132132

133-
return http_client_->getReadCallback()
134-
&& http_client_->getReadCallback()->getSize() > 0;
133+
return getByteOutputReadCallback()
134+
&& getByteOutputReadCallback()->getSize() > 0;
135135
}
136136

137137
protected:
@@ -147,6 +147,10 @@ class HttpStream : public io::BaseStreamImpl {
147147
std::atomic<bool> started_{false};
148148

149149
private:
150+
utils::ByteOutputCallback* getByteOutputReadCallback() {
151+
return dynamic_cast<utils::ByteOutputCallback*>(http_client_->getReadCallback());
152+
}
153+
150154
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<HttpStream>::getLogger();
151155
};
152156
} // namespace org::apache::nifi::minifi::http

core-framework/include/utils/ByteArrayCallback.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,12 @@ class ByteOutputCallback {
111111

112112
bool waitingOps();
113113

114-
virtual void write(char *data, size_t size);
114+
virtual void write(const char *data, size_t size);
115115

116116
size_t readFully(char *buffer, size_t size);
117117

118118
protected:
119-
inline void write_and_notify(char *data, size_t size);
119+
inline void write_and_notify(const char *data, size_t size);
120120

121121
inline size_t read_current_str(char *buffer, size_t size);
122122

core-framework/src/http/BaseHTTPClient.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,10 @@ size_t HTTPRequestResponse::receiveWrite(char *data, size_t size, size_t nmemb,
173173
if (callback->stop) {
174174
return CALLBACK_ABORT;
175175
}
176-
callback->write(data, (size * nmemb));
177-
return (size * nmemb);
176+
if (!callback->process(std::span(data, size * nmemb))) {
177+
return CALLBACK_ABORT;
178+
}
179+
return size * nmemb;
178180
} catch (...) {
179181
return CALLBACK_ABORT;
180182
}

core-framework/src/http/HTTPClient.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -334,9 +334,11 @@ bool HTTPClient::submit() {
334334

335335
response_data_.clear();
336336

337+
const auto absolute_timeout = absolute_timeout_.value_or(3 * read_timeout_);
338+
337339
curl_easy_setopt(http_session_.get(), CURLOPT_NOSIGNAL, 1);
338340
curl_easy_setopt(http_session_.get(), CURLOPT_CONNECTTIMEOUT_MS, connect_timeout_.count());
339-
curl_easy_setopt(http_session_.get(), CURLOPT_TIMEOUT_MS, getAbsoluteTimeout().count());
341+
curl_easy_setopt(http_session_.get(), CURLOPT_TIMEOUT_MS, absolute_timeout.count());
340342

341343
if (read_timeout_ > 0ms) {
342344
progress_.reset();
@@ -376,7 +378,7 @@ bool HTTPClient::submit() {
376378
response_data_.response_code = http_code;
377379
curl_easy_getinfo(http_session_.get(), CURLINFO_CONTENT_TYPE, &response_data_.response_content_type);
378380
if (res_ == CURLE_OPERATION_TIMEDOUT) {
379-
logger_->log_error("HTTP operation timed out, with absolute timeout {}\n", getAbsoluteTimeout());
381+
logger_->log_error("HTTP operation timed out, with absolute timeout {}\n", absolute_timeout);
380382
}
381383
if (res_ != CURLE_OK) {
382384
logger_->log_info("{}", request_headers_.size());
@@ -398,8 +400,8 @@ const char *HTTPClient::getContentType() {
398400

399401
const std::vector<char> &HTTPClient::getResponseBody() {
400402
if (response_data_.response_body.empty()) {
401-
if (read_callback_) {
402-
response_data_.response_body = read_callback_->to_string();
403+
if (auto byte_output_callback = dynamic_cast<utils::ByteOutputCallback*>(read_callback_.get())) {
404+
response_data_.response_body = byte_output_callback->to_string();
403405
} else {
404406
response_data_.response_body = content_.to_string();
405407
}

core-framework/src/http/HTTPStream.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ HttpStream::HttpStream(std::shared_ptr<HTTPClient> client)
3333
}
3434

3535
void HttpStream::close() {
36-
if (auto read_callback = http_client_->getReadCallback())
36+
if (auto read_callback = getByteOutputReadCallback())
3737
read_callback->close();
3838
if (auto upload_callback = http_client_->getUploadCallback())
3939
upload_callback->close();
@@ -78,13 +78,13 @@ size_t HttpStream::read(std::span<std::byte> buf) {
7878
if (!started_) {
7979
std::lock_guard<std::mutex> lock(mutex_);
8080
if (!started_) {
81-
auto read_callback = std::make_unique<HTTPReadCallback>(66560, true);
81+
auto read_callback = std::make_unique<HTTPReadByteOutputCallback>(66560, true);
8282
http_client_future_ = std::async(std::launch::async, submit_read_client, http_client_, read_callback.get());
8383
http_client_->setReadCallback(std::move(read_callback));
8484
started_ = true;
8585
}
8686
}
87-
return http_client_->getReadCallback()->readFully(reinterpret_cast<char*>(buf.data()), buf.size());
87+
return gsl::not_null(getByteOutputReadCallback())->readFully(reinterpret_cast<char*>(buf.data()), buf.size());
8888
} else {
8989
return io::STREAM_ERROR;
9090
}

core-framework/src/utils/ByteArrayCallback.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ bool ByteOutputCallback::waitingOps() {
6262
return true;
6363
}
6464

65-
void ByteOutputCallback::write(char *data, size_t size) {
65+
void ByteOutputCallback::write(const char *data, size_t size) {
6666
if (!read_started_) {
6767
std::unique_lock<std::recursive_mutex> lock(vector_lock_);
6868
spinner_.wait(lock, [&] {
@@ -73,7 +73,7 @@ void ByteOutputCallback::write(char *data, size_t size) {
7373
write_and_notify(data, size);
7474
}
7575

76-
void ByteOutputCallback::write_and_notify(char *data, size_t size) {
76+
void ByteOutputCallback::write_and_notify(const char *data, size_t size) {
7777
queue_.enqueue(std::string(data, size));
7878
size_ += size;
7979
total_written_ += size;

0 commit comments

Comments
 (0)