Skip to content

Commit

Permalink
Graceful LocalTrajectoryUploader::AddTrajectory (#1409)
Browse files Browse the repository at this point in the history
FIXES=#1407
  • Loading branch information
gaschler authored and wally-the-cartographer committed Sep 5, 2018
1 parent 3b511aa commit e318751
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 37 deletions.
35 changes: 34 additions & 1 deletion cartographer/cloud/internal/client_server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ class ClientServerTestBase : public T {

void WaitForLocalSlamResultUploads(size_t size) {
while (stub_->pose_graph()->GetTrajectoryNodePoses().size() < size) {
LOG(INFO) << stub_->pose_graph()->GetTrajectoryNodePoses().size();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
Expand Down Expand Up @@ -595,6 +594,40 @@ TEST_P(ClientServerTestByGridType, LocalSlam2DUplinkServerRestarting) {
server_->WaitForShutdown();
}

TEST_F(ClientServerTest, DelayedConnectionToUplinkServer) {
InitializeRealUploadingServer();
uploading_server_->Start();
InitializeStubForUploadingServer();
int trajectory_id = stub_for_uploading_server_->AddTrajectoryBuilder(
{kRangeSensorId}, trajectory_builder_options_,
local_slam_result_callback_);
TrajectoryBuilderInterface* trajectory_stub =
stub_for_uploading_server_->GetTrajectoryBuilder(trajectory_id);
const auto measurements = mapping::testing::GenerateFakeRangeMeasurements(
kTravelDistance, kDuration, kTimeStep);

// Insert the first measurement.
trajectory_stub->AddSensorData(kRangeSensorId.id, measurements.at(0));
WaitForLocalSlamResults(1);

LOG(INFO) << "Delayed start of uplink server.";
InitializeRealServer();
server_->Start();
InitializeStub();

// Insert all other measurements.
for (unsigned int i = 1; i < measurements.size(); ++i) {
trajectory_stub->AddSensorData(kRangeSensorId.id, measurements.at(i));
}
WaitForLocalSlamResults(measurements.size());
WaitForLocalSlamResultUploads(2);
stub_for_uploading_server_->FinishTrajectory(trajectory_id);
uploading_server_->Shutdown();
uploading_server_->WaitForShutdown();
server_->Shutdown();
server_->WaitForShutdown();
}

TEST_P(ClientServerTestByGridType, LoadStateAndDelete) {
if (GetParam() == ::cartographer::mapping::GridType::TSDF) {
SetOptionsToTSDF2D();
Expand Down
74 changes: 38 additions & 36 deletions cartographer/cloud/internal/local_trajectory_uploader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,8 @@ bool IsNewSubmap(const mapping::proto::Submap& submap) {
class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {
public:
struct TrajectoryInfo {
TrajectoryInfo() = default;
TrajectoryInfo(
const int uplink_trajectory_id,
const std::set<SensorId>& expected_sensor_ids,
const mapping::proto::TrajectoryBuilderOptions& trajectory_options,
const std::string& client_id)
: uplink_trajectory_id(uplink_trajectory_id),
expected_sensor_ids(expected_sensor_ids),
trajectory_options(trajectory_options),
client_id(client_id) {}
const int uplink_trajectory_id;
// nullopt if uplink has not yet responded to AddTrajectoryRequest.
absl::optional<int> uplink_trajectory_id;
const std::set<SensorId> expected_sensor_ids;
const mapping::proto::TrajectoryBuilderOptions trajectory_options;
const std::string client_id;
Expand Down Expand Up @@ -105,6 +96,7 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface {
void ProcessSendQueue();
// Returns 'false' for failure.
bool TranslateTrajectoryId(proto::SensorMetadata* sensor_metadata);
grpc::Status RegisterTrajectory(int local_trajectory_id);

std::shared_ptr<::grpc::Channel> client_channel_;
int batch_size_;
Expand Down Expand Up @@ -192,19 +184,11 @@ void LocalTrajectoryUploader::TryRecovery() {
}

// Attempt to recreate the trajectories.
const auto local_trajectory_id_to_trajectory_info =
local_trajectory_id_to_trajectory_info_;
local_trajectory_id_to_trajectory_info_.clear();
for (const auto& entry : local_trajectory_id_to_trajectory_info) {
grpc::Status status = AddTrajectory(entry.second.client_id, entry.first,
entry.second.expected_sensor_ids,
entry.second.trajectory_options);
for (const auto& entry : local_trajectory_id_to_trajectory_info_) {
grpc::Status status = RegisterTrajectory(entry.first);
if (!status.ok()) {
LOG(ERROR) << "Failed to create trajectory. Aborting recovery attempt. "
<< status.error_message();
// Restore the previous state for the next recovery attempt.
local_trajectory_id_to_trajectory_info_ =
local_trajectory_id_to_trajectory_info;
return;
}
}
Expand Down Expand Up @@ -262,7 +246,11 @@ bool LocalTrajectoryUploader::TranslateTrajectoryId(
if (it == local_trajectory_id_to_trajectory_info_.end()) {
return false;
}
int cloud_trajectory_id = it->second.uplink_trajectory_id;
if (!it->second.uplink_trajectory_id.has_value()) {
// Could not yet register trajectory with uplink server.
return false;
}
int cloud_trajectory_id = it->second.uplink_trajectory_id.value();
sensor_metadata->set_trajectory_id(cloud_trajectory_id);
return true;
}
Expand All @@ -271,10 +259,23 @@ grpc::Status LocalTrajectoryUploader::AddTrajectory(
const std::string& client_id, int local_trajectory_id,
const std::set<SensorId>& expected_sensor_ids,
const mapping::proto::TrajectoryBuilderOptions& trajectory_options) {
CHECK_EQ(local_trajectory_id_to_trajectory_info_.count(local_trajectory_id),
0);
local_trajectory_id_to_trajectory_info_.emplace(
local_trajectory_id,
TrajectoryInfo{{}, expected_sensor_ids, trajectory_options, client_id});
return RegisterTrajectory(local_trajectory_id);
}

grpc::Status LocalTrajectoryUploader::RegisterTrajectory(
int local_trajectory_id) {
TrajectoryInfo& trajectory_info =
local_trajectory_id_to_trajectory_info_.at(local_trajectory_id);
proto::AddTrajectoryRequest request;
request.set_client_id(client_id);
*request.mutable_trajectory_builder_options() = trajectory_options;
for (const SensorId& sensor_id : expected_sensor_ids) {
request.set_client_id(trajectory_info.client_id);
*request.mutable_trajectory_builder_options() =
trajectory_info.trajectory_options;
for (const SensorId& sensor_id : trajectory_info.expected_sensor_ids) {
// Range sensors are not forwarded, but combined into a LocalSlamResult.
if (sensor_id.type != SensorId::SensorType::RANGE) {
*request.add_expected_sensor_ids() = cloud::ToProto(sensor_id);
Expand All @@ -286,19 +287,15 @@ grpc::Status LocalTrajectoryUploader::AddTrajectory(
client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds));
::grpc::Status status;
if (!client.Write(request, &status)) {
LOG(ERROR) << status.error_message();
LOG(ERROR) << "Failed to register local_trajectory_id "
<< local_trajectory_id << " with uplink server. "
<< status.error_message();
return status;
}
LOG(INFO) << "Created trajectory for client_id: " << client_id
LOG(INFO) << "Created trajectory for client_id: " << trajectory_info.client_id
<< " local trajectory_id: " << local_trajectory_id
<< " uplink trajectory_id: " << client.response().trajectory_id();
CHECK_EQ(local_trajectory_id_to_trajectory_info_.count(local_trajectory_id),
0);
local_trajectory_id_to_trajectory_info_.emplace(
std::piecewise_construct, std::forward_as_tuple(local_trajectory_id),
std::forward_as_tuple(client.response().trajectory_id(),
expected_sensor_ids, trajectory_options,
client_id));
trajectory_info.uplink_trajectory_id = client.response().trajectory_id();
return status;
}

Expand All @@ -310,10 +307,15 @@ grpc::Status LocalTrajectoryUploader::FinishTrajectory(
"local_trajectory_id has not been"
" registered with AddTrajectory.");
}
int cloud_trajectory_id = it->second.uplink_trajectory_id;
auto cloud_trajectory_id = it->second.uplink_trajectory_id;
if (!cloud_trajectory_id.has_value()) {
return grpc::Status(
grpc::StatusCode::UNAVAILABLE,
"trajectory_id has not been created in uplink, ignoring.");
}
proto::FinishTrajectoryRequest request;
request.set_client_id(client_id);
request.set_trajectory_id(cloud_trajectory_id);
request.set_trajectory_id(cloud_trajectory_id.value());
async_grpc::Client<handlers::FinishTrajectorySignature> client(
client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds));
grpc::Status status;
Expand Down

0 comments on commit e318751

Please sign in to comment.