Skip to content

Commit

Permalink
Wrap PrefetchTiles call into CancellationContext. (#1336)
Browse files Browse the repository at this point in the history
This helps speed-up cancelling of the operation as chain of contexts
doesn't break. Previously to this, the `execution_context` was not used
internally in the `PrefetchTilesHelper::Prefetch` method which
resulted in a break of the cancellation context chain and thus
it was not able to cancel all subsequently triggered prefetch
jobs from the one master context, which was handed over to the caller
of the VersionedLayerClient::PrefetchTiles() API. Due to this then
when user actually cancelled the request it would still have to wait for
the download to finish until the callback came. Now this is fixed.

Relates-To: OLPSUP-17825
Signed-off-by: Andrey Kashcheev <ext-andrey.kashcheev@here.com>
Signed-off-by: Andrei Popescu <andrei.popescu@here.com>
  • Loading branch information
andrey-kashcheev committed Jun 9, 2022
1 parent c0c6445 commit e0ba039
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 157 deletions.
36 changes: 27 additions & 9 deletions olp-cpp-sdk-core/include/olp/core/client/ApiError.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,32 +39,50 @@ class CORE_API ApiError {
/**
* @brief Creates the `ApiError` instance with the cancelled error code and
* description.
* @param description The optional description.
*
* @param message The optional description.
*
* @return The `ApiError` instance.
*/
static ApiError Cancelled(const char* description = "Cancelled") {
return ApiError(ErrorCode::Cancelled, description);
static ApiError Cancelled(const char* message = "Cancelled") {
return {ErrorCode::Cancelled, message};
}

/**
* @brief Creates the `ApiError` instance with the network connection error
* code and description.
* @param description The optional description.
*
* @param message The optional description.
*
* @return The `ApiError` instance.
*/
static ApiError NetworkConnection(const char* description = "Offline") {
return ApiError(ErrorCode::NetworkConnection, description);
static ApiError NetworkConnection(const char* message = "Offline") {
return {ErrorCode::NetworkConnection, message};
}

/**
* @brief Creates the `ApiError` instance with the precondition failed error
* code and description.
* @param description The optional description.
*
* @param message The optional description.
*
* @return The `ApiError` instance.
*/
static ApiError PreconditionFailed(
const char* description = "Precondition failed") {
return ApiError(ErrorCode::PreconditionFailed, description);
const char* message = "Precondition failed") {
return {ErrorCode::PreconditionFailed, message};
}

/**
* @brief Creates the `ApiError` instance with the invalid argument error code
* and description.
*
* @param message The optional description.
*
* @return The `ApiError` instance.
*/
static ApiError InvalidArgument(const char* message = "Invalid argument") {
return {ErrorCode::InvalidArgument, message};
}

ApiError() = default;
Expand Down
306 changes: 158 additions & 148 deletions olp-cpp-sdk-dataservice-read/src/VersionedLayerClientImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,167 +367,177 @@ client::CancellationToken VersionedLayerClientImpl::PrefetchTiles(

client::CancellationContext execution_context;

return task_sink_.AddTask(
[=](client::CancellationContext context) mutable -> void {
if (context.IsCancelled()) {
callback(ApiError::Cancelled());
return;
}

const auto key = request.CreateKey(layer_id_);

if (!settings_.cache) {
OLP_SDK_LOG_ERROR_F(
kLogTag,
"PrefetchPartitions: cache is missing, aborting, hrn=%s, key=%s",
catalog_.ToCatalogHRNString().c_str(), key.c_str());
callback(ApiError::PreconditionFailed(
"Unable to prefetch without a cache"));
return;
}

if (request.GetTileKeys().empty()) {
OLP_SDK_LOG_WARNING_F(
kLogTag, "PrefetchTiles: invalid request, catalog=%s, key=%s",
catalog_.ToCatalogHRNString().c_str(), key.c_str());
callback(ApiError(ErrorCode::InvalidArgument, "Empty tile key list"));
return;
}

auto response =
GetVersion(request.GetBillingTag(), OnlineIfNotFound, context);
execution_context.ExecuteOrCancelled([&]() -> client::CancellationToken {
return task_sink_.AddTask(
[=](client::CancellationContext context) mutable -> void {
if (context.IsCancelled()) {
callback(ApiError::Cancelled());
return;
}

if (!response.IsSuccessful()) {
OLP_SDK_LOG_WARNING_F(
kLogTag, "PrefetchTiles: getting catalog version failed, key=%s",
key.c_str());
callback(response.GetError());
return;
}
const auto key = request.CreateKey(layer_id_);

auto version = response.GetResult().GetVersion();

OLP_SDK_LOG_DEBUG_F(kLogTag, "PrefetchTiles: using key=%s",
key.c_str());

// Calculate the minimal set of Tile keys and depth to
// cover tree.
bool request_only_input_tiles =
!(request.GetMinLevel() <= request.GetMaxLevel() &&
request.GetMaxLevel() < geo::TileKey::LevelCount &&
request.GetMinLevel() < geo::TileKey::LevelCount);
unsigned int min_level =
(request_only_input_tiles
? static_cast<unsigned int>(geo::TileKey::LevelCount)
: request.GetMinLevel());
unsigned int max_level =
(request_only_input_tiles
? static_cast<unsigned int>(geo::TileKey::LevelCount)
: request.GetMaxLevel());

repository::PrefetchTilesRepository repository(
catalog_, layer_id_, settings_, lookup_client_,
request.GetBillingTag(), mutex_storage_);

auto sliced_tiles = repository.GetSlicedTiles(request.GetTileKeys(),
min_level, max_level);

if (sliced_tiles.empty()) {
OLP_SDK_LOG_WARNING_F(kLogTag,
"PrefetchTiles: tile/level mismatch, key=%s",
key.c_str());
callback(
ApiError(ErrorCode::InvalidArgument, "TileKeys/levels mismatch"));
return;
}
if (!settings_.cache) {
OLP_SDK_LOG_ERROR_F(kLogTag,
"PrefetchPartitions: cache is missing, "
"aborting, hrn=%s, key=%s",
catalog_.ToString().c_str(), key.c_str());
callback(ApiError::PreconditionFailed(
"Unable to prefetch without a cache"));
return;
}

OLP_SDK_LOG_DEBUG_F(kLogTag, "PrefetchTiles, subquads=%zu, key=%s",
sliced_tiles.size(), key.c_str());
if (request.GetTileKeys().empty()) {
OLP_SDK_LOG_WARNING_F(
kLogTag, "PrefetchTiles: invalid request, catalog=%s, key=%s",
catalog_.ToString().c_str(), key.c_str());
callback(ApiError::InvalidArgument("Empty tile key list"));
return;
}

const bool aggregation_enabled = request.GetDataAggregationEnabled();
auto response =
GetVersion(request.GetBillingTag(), OnlineIfNotFound, context);

auto filter = [=](repository::SubQuadsResult tiles) mutable
-> repository::SubQuadsResult {
if (request_only_input_tiles) {
return repository.FilterTilesByList(request, std::move(tiles));
} else {
return repository.FilterTilesByLevel(request, std::move(tiles));
if (!response.IsSuccessful()) {
OLP_SDK_LOG_WARNING_F(kLogTag,
"PrefetchTiles: getting catalog version "
"failed, catalog=%s, key=%s",
catalog_.ToString().c_str(), key.c_str());
callback(response.GetError());
return;
}
};

auto query = [=](geo::TileKey root,
client::CancellationContext inner_context) mutable {
auto response = repository.GetVersionedSubQuads(
root, kQuadTreeDepth, version, inner_context);
auto version = response.GetResult().GetVersion();

if (response.IsSuccessful() && aggregation_enabled) {
auto subquads = filter(response.GetResult());
auto network_stats = repository.LoadAggregatedSubQuads(
root, std::move(subquads), version, inner_context);
OLP_SDK_LOG_DEBUG_F(kLogTag, "PrefetchTiles: using key=%s",
key.c_str());

// append network statistics
network_stats += GetNetworkStatistics(response);
response = {response.GetResult(), network_stats};
// Calculate the minimal set of Tile keys and depth to cover tree
bool request_only_input_tiles =
!(request.GetMinLevel() <= request.GetMaxLevel() &&
request.GetMaxLevel() < geo::TileKey::LevelCount &&
request.GetMinLevel() < geo::TileKey::LevelCount);

unsigned int min_level =
(request_only_input_tiles
? static_cast<unsigned int>(geo::TileKey::LevelCount)
: request.GetMinLevel());
unsigned int max_level =
(request_only_input_tiles
? static_cast<unsigned int>(geo::TileKey::LevelCount)
: request.GetMaxLevel());

repository::PrefetchTilesRepository repository(
catalog_, layer_id_, settings_, lookup_client_,
request.GetBillingTag(), mutex_storage_);

auto sliced_tiles = repository.GetSlicedTiles(request.GetTileKeys(),
min_level, max_level);

if (sliced_tiles.empty()) {
OLP_SDK_LOG_WARNING_F(
kLogTag,
"PrefetchTiles: tile/level mismatch, catalog=%s, key=%s",
catalog_.ToString().c_str(), key.c_str());
callback(ApiError::InvalidArgument("TileKeys/levels mismatch"));
return;
}

return response;
};
OLP_SDK_LOG_DEBUG_F(kLogTag, "PrefetchTiles: subquads=%zu, key=%s",
sliced_tiles.size(), key.c_str());

auto& billing_tag = request.GetBillingTag();
auto download = [=](std::string data_handle,
client::CancellationContext inner_context) mutable {
if (data_handle.empty()) {
return BlobApi::DataResponse(
ApiError(ErrorCode::NotFound, "Not found"));
}
repository::DataCacheRepository data_cache_repository(
catalog_, settings_.cache);
if (data_cache_repository.IsCached(layer_id_, data_handle)) {
return BlobApi::DataResponse(nullptr);
}
const bool aggregation_enabled = request.GetDataAggregationEnabled();

repository::DataRepository repository(catalog_, settings_,
lookup_client_, mutex_storage_);
// Fetch from online
return repository.GetVersionedData(
layer_id_,
DataRequest()
.WithDataHandle(std::move(data_handle))
.WithBillingTag(billing_tag),
version, std::move(inner_context), true);
};

std::vector<geo::TileKey> roots;
roots.reserve(sliced_tiles.size());

std::transform(
sliced_tiles.begin(), sliced_tiles.end(), std::back_inserter(roots),
[](const repository::RootTilesForRequest::value_type& root) {
return root.first;
});

auto append_result = [](ExtendedDataResponse response,
geo::TileKey item,
PrefetchTilesResult& prefetch_result) {
if (response.IsSuccessful()) {
prefetch_result.push_back(std::make_shared<PrefetchTileResult>(
item, PrefetchTileNoError()));
} else {
prefetch_result.push_back(std::make_shared<PrefetchTileResult>(
item, response.GetError()));
}
};

auto download_job = std::make_shared<PrefetchTilesHelper::DownloadJob>(
std::move(download), std::move(append_result), std::move(callback),
std::move(status_callback));

return PrefetchTilesHelper::Prefetch(
std::move(download_job), std::move(roots), std::move(query),
std::move(filter), task_sink_, request.GetPriority(),
std::move(context));
},
request.GetPriority(), execution_context);
auto filter = [=](repository::SubQuadsResult tiles) mutable {
if (request_only_input_tiles) {
return repository.FilterTilesByList(request, std::move(tiles));
}
return repository.FilterTilesByLevel(request, std::move(tiles));
};

auto query = [=](geo::TileKey root,
client::CancellationContext inner_context) mutable {
auto response = repository.GetVersionedSubQuads(
root, kQuadTreeDepth, version, inner_context);

if (response.IsSuccessful() && aggregation_enabled) {
auto network_stats = repository.LoadAggregatedSubQuads(
root, filter(response.GetResult()), version, inner_context);

// append network statistics
network_stats += GetNetworkStatistics(response);
response = {response.MoveResult(), network_stats};
}

return response;
};

auto& billing_tag = request.GetBillingTag();

auto download =
[=](std::string data_handle,
client::CancellationContext inner_context) mutable {
if (data_handle.empty()) {
return BlobApi::DataResponse(
ApiError(ErrorCode::NotFound, "Not found"));
}

repository::DataCacheRepository cache(catalog_,
settings_.cache);

if (cache.IsCached(layer_id_, data_handle)) {
return BlobApi::DataResponse(nullptr);
}

repository::DataRepository repository(
catalog_, settings_, lookup_client_, mutex_storage_);

// Fetch from online
return repository.GetVersionedData(
layer_id_,
DataRequest()
.WithDataHandle(std::move(data_handle))
.WithBillingTag(billing_tag),
version, std::move(inner_context), true);
};

std::vector<geo::TileKey> roots;
roots.reserve(sliced_tiles.size());

std::transform(
sliced_tiles.begin(), sliced_tiles.end(),
std::back_inserter(roots),
[](const repository::RootTilesForRequest::value_type& root) {
return root.first;
});

auto append_result = [](ExtendedDataResponse response,
geo::TileKey item,
PrefetchTilesResult& prefetch_result) {
if (response.IsSuccessful()) {
prefetch_result.emplace_back(std::make_shared<PrefetchTileResult>(
item, PrefetchTileNoError()));
} else {
prefetch_result.emplace_back(std::make_shared<PrefetchTileResult>(
item, response.GetError()));
}
};

auto download_job =
std::make_shared<PrefetchTilesHelper::DownloadJob>(
std::move(download), std::move(append_result),
std::move(callback), std::move(status_callback));

return PrefetchTilesHelper::Prefetch(
std::move(download_job), std::move(roots), std::move(query),
std::move(filter), task_sink_, request.GetPriority(),
execution_context);
},
request.GetPriority(), client::CancellationContext{});
});

return client::CancellationToken(
[execution_context]() mutable { execution_context.CancelOperation(); });
}

client::CancellableFuture<PrefetchTilesResponse>
Expand Down

0 comments on commit e0ba039

Please sign in to comment.