Skip to content

Commit

Permalink
Add check if an operation was cancelled (#1332)
Browse files Browse the repository at this point in the history
Add checking if the download operation was cancelled
after getting a result and not wait for all operations
to be done.

Relates-To: OLPEDGE-2746

Signed-off-by: Yevhenii Dudnyk <ext-yevhenii.dudnyk@here.com>
  • Loading branch information
dudnyk committed Jun 8, 2022
1 parent 849fbd4 commit c0c6445
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 6 deletions.
10 changes: 10 additions & 0 deletions olp-cpp-sdk-dataservice-read/src/DownloadItemsJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ class DownloadItemsJob {
if (response.IsSuccessful()) {
requests_succeeded_++;
} else {
if (response.GetError().GetErrorCode() ==
olp::client::ErrorCode::Cancelled &&
user_callback_) {
auto user_callback = std::move(user_callback_);
if (user_callback) {
user_callback({{client::ErrorCode::Cancelled, "Cancelled"}});
}
return;
}

requests_failed_++;
}

Expand Down
12 changes: 6 additions & 6 deletions olp-cpp-sdk-dataservice-read/src/QueryMetadataJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,18 @@ class QueryMetadataJob {
}
}

if (canceled_) {
download_job_->OnPrefetchCompleted(
{{client::ErrorCode::Cancelled, "Cancelled"}});
return;
}

if (!--query_count_) {
if (CheckIfFail()) {
download_job_->OnPrefetchCompleted(query_errors_.front());
return;
}

if (canceled_) {
download_job_->OnPrefetchCompleted(
{{client::ErrorCode::Cancelled, "Cancelled"}});
return;
}

if (filter_) {
query_result_ = filter_(std::move(query_result_));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,62 @@ TEST_F(VersionedLayerClientPrefetchPartitionsTest, PrefetchPartitionsFails) {
}
}

TEST_F(VersionedLayerClientPrefetchPartitionsTest, PrefetchPartitionCancelled) {
auto partitions_count = 1;
auto client =
read::VersionedLayerClient(kCatalogHrn, kLayer, boost::none, settings_);
auto partitions = GeneratePartitionIds(partitions_count);
auto partitions_response =
ReadDefaultResponses::GeneratePartitionsResponse(partitions_count);
const auto request =
read::PrefetchPartitionsRequest().WithPartitionIds(partitions);
{
SCOPED_TRACE("Get version fails");

ExpectVersionRequest(
http::NetworkResponse().WithStatus(HttpStatusCode::BAD_REQUEST));

auto future = client.PrefetchPartitions(request).GetFuture();
ASSERT_NE(future.wait_for(std::chrono::seconds(kTimeout)),
std::future_status::timeout);

auto response = future.get();
ASSERT_FALSE(response.IsSuccessful());
}
{
SCOPED_TRACE("Query partition fails");
ExpectVersionRequest();
ExpectQueryPartitionsRequest(
partitions, partitions_response,
http::NetworkResponse().WithStatus(HttpStatusCode::BAD_REQUEST));

auto future = client.PrefetchPartitions(request).GetFuture();
ASSERT_NE(future.wait_for(std::chrono::seconds(kTimeout)),
std::future_status::timeout);

auto response = future.get();
ASSERT_FALSE(response.IsSuccessful());
}
{
SCOPED_TRACE("Get data cancelled");
ExpectQueryPartitionsRequest(partitions, partitions_response);

ExpectBlobRequest(
partitions_response.GetPartitions().begin()->GetDataHandle(), "data",
http::NetworkResponse().WithStatus(
static_cast<int>(http::ErrorCode::CANCELLED_ERROR)));

auto test_request = client.PrefetchPartitions(request);
auto future = test_request.GetFuture();
ASSERT_NE(future.wait_for(std::chrono::seconds(kTimeout)),
std::future_status::timeout);

auto response = future.get();
ASSERT_FALSE(response.IsSuccessful());
ASSERT_EQ(response.GetError().GetErrorCode(), client::ErrorCode::Cancelled);
}
}

TEST_F(VersionedLayerClientPrefetchPartitionsTest, PrefetchBatchFails) {
// Should result in two metadata query
constexpr auto partitions_count = 200u;
Expand Down

0 comments on commit c0c6445

Please sign in to comment.