Skip to content

Commit

Permalink
Graceful LocalTrajectoryUploader (#1381)
Browse files Browse the repository at this point in the history
* Graceful LocalTrajectoryUploader

FIXES=#1370
  • Loading branch information
gaschler authored and Christoph Schütte committed Aug 12, 2018
1 parent fe59278 commit 811f2e8
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 49 deletions.
16 changes: 8 additions & 8 deletions cartographer/cloud/internal/handlers/add_trajectory_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ void AddTrajectoryHandler::OnRequest(
// Ignore initial poses in trajectory_builder_options.
trajectory_builder_options.clear_initial_trajectory_pose();

if (!GetContext<MapBuilderContextInterface>()
->local_trajectory_uploader()
->AddTrajectory(request.client_id(), trajectory_id,
expected_sensor_ids, trajectory_builder_options)) {
LOG(ERROR) << "Failed to create trajectory in uplink: " << trajectory_id;
Finish(::grpc::Status(::grpc::INTERNAL,
"Failed to create trajectory in uplink"));
return;
auto status =
GetContext<MapBuilderContextInterface>()
->local_trajectory_uploader()
->AddTrajectory(request.client_id(), trajectory_id,
expected_sensor_ids, trajectory_builder_options);
if (!status.ok()) {
LOG(ERROR) << "Failed to create trajectory_id " << trajectory_id
<< " in uplink. Error: " << status.error_message();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ TEST_F(AddTrajectoryHandlerTest, WithLocalSlamUploader) {
AddTrajectory(Eq("CLIENT_ID"), Eq(13), ParseSensorIds(request),
Truly(testing::BuildProtoPredicateEquals(
&upstream_trajectory_builder_options))))
.WillOnce(Return(13));
.WillOnce(Return(grpc::Status::OK));
test_server_->SendWrite(request);
EXPECT_EQ(test_server_->response().trajectory_id(), 13);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,14 @@ void FinishTrajectoryHandler::OnRequest(
->NotifyFinishTrajectory(request.trajectory_id());
if (GetUnsynchronizedContext<MapBuilderContextInterface>()
->local_trajectory_uploader()) {
GetContext<MapBuilderContextInterface>()
->local_trajectory_uploader()
->FinishTrajectory(request.client_id(), request.trajectory_id());
auto status =
GetContext<MapBuilderContextInterface>()
->local_trajectory_uploader()
->FinishTrajectory(request.client_id(), request.trajectory_id());
if (!status.ok()) {
LOG(ERROR) << "Failed to finish trajectory in uplink: "
<< status.error_message();
}
}
Send(absl::make_unique<google::protobuf::Empty>());
}
Expand Down
81 changes: 51 additions & 30 deletions cartographer/cloud/internal/local_trajectory_uploader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {
// complete.
void Shutdown() final;

bool AddTrajectory(
grpc::Status AddTrajectory(
const std::string& client_id, int local_trajectory_id,
const std::set<SensorId>& expected_sensor_ids,
const mapping::proto::TrajectoryBuilderOptions& trajectory_options) final;
void FinishTrajectory(const std::string& client_id,
int local_trajectory_id) final;
grpc::Status FinishTrajectory(const std::string& client_id,
int local_trajectory_id) final;
void EnqueueSensorData(std::unique_ptr<proto::SensorData> sensor_data) final;
void TryRecovery();

Expand All @@ -98,7 +98,8 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {

private:
void ProcessSendQueue();
void TranslateTrajectoryId(proto::SensorMetadata* sensor_metadata);
// Returns 'false' for failure.
bool TranslateTrajectoryId(proto::SensorMetadata* sensor_metadata);

std::shared_ptr<::grpc::Channel> client_channel_;
int batch_size_;
Expand All @@ -124,12 +125,12 @@ LocalTrajectoryUploader::LocalTrajectoryUploader(
grpc::CompositeChannelCredentials(channel_creds, call_creds);
}
client_channel_ = ::grpc::CreateChannel(uplink_server_address, channel_creds);
std::chrono::system_clock::time_point deadline(
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() +
std::chrono::seconds(kConnectionTimeoutInSeconds));
std::chrono::seconds(kConnectionTimeoutInSeconds);
LOG(INFO) << "Connecting to uplink " << uplink_server_address;
if (!client_channel_->WaitForConnected(deadline)) {
LOG(FATAL) << "Failed to connect to " << uplink_server_address;
LOG(ERROR) << "Failed to connect to " << uplink_server_address;
}
}

Expand All @@ -151,7 +152,11 @@ void LocalTrajectoryUploader::Shutdown() {

void LocalTrajectoryUploader::TryRecovery() {
// Wind the sensor_data_queue forward to the next new submap.
LOG(INFO) << "LocalTrajectoryUploader tries to recover with next submap.";
while (true) {
if (shutting_down_) {
return;
}
proto::SensorData* sensor_data =
send_queue_.PeekWithTimeout<proto::SensorData>(kPopTimeout);
if (sensor_data) {
Expand All @@ -173,13 +178,16 @@ void LocalTrajectoryUploader::TryRecovery() {
local_trajectory_id_to_trajectory_info_;
local_trajectory_id_to_trajectory_info_.clear();
for (const auto& entry : local_trajectory_id_to_trajectory_info) {
if (!AddTrajectory(entry.second.client_id, entry.first,
entry.second.expected_sensor_ids,
entry.second.trajectory_options)) {
LOG(ERROR) << "Failed to create trajectory. Aborting recovery attempt.";
grpc::Status status = AddTrajectory(entry.second.client_id, entry.first,
entry.second.expected_sensor_ids,
entry.second.trajectory_options);
if (!status.ok()) {
LOG(ERROR) << "Failed to create trajectory. Aborting recovery attempt. "
<< status.error_message();
return;
}
}
LOG(INFO) << "LocalTrajectoryUploader recovered.";
}

void LocalTrajectoryUploader::ProcessSendQueue() {
Expand All @@ -188,9 +196,13 @@ void LocalTrajectoryUploader::ProcessSendQueue() {
while (!shutting_down_) {
auto sensor_data = send_queue_.PopWithTimeout(kPopTimeout);
if (sensor_data) {
if (!TranslateTrajectoryId(sensor_data->mutable_sensor_metadata())) {
batch_request.clear_sensor_data();
TryRecovery();
continue;
}
proto::SensorData* added_sensor_data = batch_request.add_sensor_data();
*added_sensor_data = *sensor_data;
TranslateTrajectoryId(added_sensor_data->mutable_sensor_metadata());

// A submap also holds a trajectory id that must be translated to uplink's
// trajectory id.
Expand All @@ -205,7 +217,7 @@ void LocalTrajectoryUploader::ProcessSendQueue() {

if (batch_request.sensor_data_size() == batch_size_) {
async_grpc::Client<handlers::AddSensorDataBatchSignature> client(
client_channel_, common::FromSeconds(10),
client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds),
async_grpc::CreateUnlimitedConstantDelayStrategy(
common::FromSeconds(1), kUnrecoverableStatusCodes));
if (client.Write(batch_request)) {
Expand All @@ -222,15 +234,19 @@ void LocalTrajectoryUploader::ProcessSendQueue() {
}
}

void LocalTrajectoryUploader::TranslateTrajectoryId(
bool LocalTrajectoryUploader::TranslateTrajectoryId(
proto::SensorMetadata* sensor_metadata) {
int cloud_trajectory_id = local_trajectory_id_to_trajectory_info_
.at(sensor_metadata->trajectory_id())
.uplink_trajectory_id;
auto it = local_trajectory_id_to_trajectory_info_.find(
sensor_metadata->trajectory_id());
if (it == local_trajectory_id_to_trajectory_info_.end()) {
return false;
}
int cloud_trajectory_id = it->second.uplink_trajectory_id;
sensor_metadata->set_trajectory_id(cloud_trajectory_id);
return true;
}

bool LocalTrajectoryUploader::AddTrajectory(
grpc::Status LocalTrajectoryUploader::AddTrajectory(
const std::string& client_id, int local_trajectory_id,
const std::set<SensorId>& expected_sensor_ids,
const mapping::proto::TrajectoryBuilderOptions& trajectory_options) {
Expand All @@ -245,11 +261,12 @@ bool LocalTrajectoryUploader::AddTrajectory(
}
*request.add_expected_sensor_ids() =
cloud::ToProto(GetLocalSlamResultSensorId(local_trajectory_id));
async_grpc::Client<handlers::AddTrajectorySignature> client(client_channel_);
async_grpc::Client<handlers::AddTrajectorySignature> client(
client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds));
::grpc::Status status;
if (!client.Write(request, &status)) {
LOG(ERROR) << status.error_message();
return false;
return status;
}
LOG(INFO) << "Created trajectory for client_id: " << client_id
<< " local trajectory_id: " << local_trajectory_id
Expand All @@ -261,22 +278,26 @@ bool LocalTrajectoryUploader::AddTrajectory(
std::forward_as_tuple(client.response().trajectory_id(),
expected_sensor_ids, trajectory_options,
client_id));
return true;
return status;
}

void LocalTrajectoryUploader::FinishTrajectory(const std::string& client_id,
int local_trajectory_id) {
CHECK_EQ(local_trajectory_id_to_trajectory_info_.count(local_trajectory_id),
1);
int cloud_trajectory_id =
local_trajectory_id_to_trajectory_info_.at(local_trajectory_id)
.uplink_trajectory_id;
grpc::Status LocalTrajectoryUploader::FinishTrajectory(
const std::string& client_id, int local_trajectory_id) {
auto it = local_trajectory_id_to_trajectory_info_.find(local_trajectory_id);
if (it == local_trajectory_id_to_trajectory_info_.end()) {
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
"local_trajectory_id has not been"
" registered with AddTrajectory.");
}
int cloud_trajectory_id = it->second.uplink_trajectory_id;
proto::FinishTrajectoryRequest request;
request.set_client_id(client_id);
request.set_trajectory_id(cloud_trajectory_id);
async_grpc::Client<handlers::FinishTrajectorySignature> client(
client_channel_);
CHECK(client.Write(request));
client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds));
grpc::Status status;
client.Write(request, &status);
return status;
}

void LocalTrajectoryUploader::EnqueueSensorData(
Expand Down
11 changes: 7 additions & 4 deletions cartographer/cloud/internal/local_trajectory_uploader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
#include "cartographer/cloud/proto/map_builder_service.pb.h"
#include "cartographer/mapping/proto/trajectory_builder_options.pb.h"
#include "cartographer/mapping/trajectory_builder_interface.h"
#include "grpc++/support/status.h"

namespace cartographer {
namespace cloud {

// Uploads sensor data batches to uplink server.
// Gracefully handles interruptions of the connection.
class LocalTrajectoryUploaderInterface {
public:
using SensorId = mapping::TrajectoryBuilderInterface::SensorId;
Expand All @@ -46,14 +49,14 @@ class LocalTrajectoryUploaderInterface {
std::unique_ptr<proto::SensorData> sensor_data) = 0;

// Creates a new trajectory with the specified settings in the uplink. A
// return value of 'false' indicates that the creation failed.
virtual bool AddTrajectory(
// return 'value' with '!value.ok()' indicates that the creation failed.
virtual grpc::Status AddTrajectory(
const std::string& client_id, int local_trajectory_id,
const std::set<SensorId>& expected_sensor_ids,
const mapping::proto::TrajectoryBuilderOptions& trajectory_options) = 0;

virtual void FinishTrajectory(const std::string& client_id,
int local_trajectory_id) = 0;
virtual grpc::Status FinishTrajectory(const std::string& client_id,
int local_trajectory_id) = 0;
virtual SensorId GetLocalSlamResultSensorId(
int local_trajectory_id) const = 0;
};
Expand Down
57 changes: 57 additions & 0 deletions cartographer/cloud/internal/local_trajectory_uploader_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2018 The Cartographer Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "cartographer/cloud/internal/local_trajectory_uploader.h"

#include "glog/logging.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"

using SensorId = ::cartographer::mapping::TrajectoryBuilderInterface::SensorId;

namespace cartographer {
namespace cloud {
namespace {

constexpr char kClientId[] = "CLIENT_ID";
const SensorId kImuSensorId{SensorId::SensorType::IMU, "imu"};
const SensorId kRangeSensorId{SensorId::SensorType::RANGE, "range"};
const int kLocalTrajectoryId = 3;

TEST(LocalTrajectoryUploaderTest, HandlesInvalidUplink) {
auto uploader = CreateLocalTrajectoryUploader("invalid-uplink-address:50051",
/*batch_size=*/1, false, "");
uploader->Start();
mapping::proto::TrajectoryBuilderOptions options;
auto status = uploader->AddTrajectory(
kClientId, kLocalTrajectoryId, {kRangeSensorId, kImuSensorId}, options);
EXPECT_EQ(status.error_code(), grpc::StatusCode::DEADLINE_EXCEEDED);
auto sensor_data = absl::make_unique<proto::SensorData>();
sensor_data->mutable_sensor_metadata()->set_client_id(kClientId);
sensor_data->mutable_sensor_metadata()->set_sensor_id(kImuSensorId.id);
sensor_data->mutable_sensor_metadata()->set_trajectory_id(kLocalTrajectoryId);
sensor_data->mutable_imu_data()->set_timestamp(1);
uploader->EnqueueSensorData(std::move(sensor_data));
auto sensor_id = uploader->GetLocalSlamResultSensorId(kLocalTrajectoryId);
EXPECT_THAT(sensor_id.id, ::testing::Not(::testing::IsEmpty()));
status = uploader->FinishTrajectory(kClientId, kLocalTrajectoryId);
EXPECT_FALSE(status.ok());
uploader->Shutdown();
}

} // namespace
} // namespace cloud
} // namespace cartographer
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ class MockLocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {
MOCK_METHOD0(Start, void());
MOCK_METHOD0(Shutdown, void());
MOCK_METHOD4(AddTrajectory,
bool(const std::string &, int, const std::set<SensorId> &,
const mapping::proto::TrajectoryBuilderOptions &));
MOCK_METHOD2(FinishTrajectory, void(const std::string &, int));
grpc::Status(const std::string &, int,
const std::set<SensorId> &,
const mapping::proto::TrajectoryBuilderOptions &));
MOCK_METHOD2(FinishTrajectory, grpc::Status(const std::string &, int));
MOCK_CONST_METHOD1(GetLocalSlamResultSensorId, SensorId(int));
};

Expand Down

0 comments on commit 811f2e8

Please sign in to comment.