Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions script/e2e-kube.sh
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,10 @@ if [[ ("${ESP_ROLLOUT_STRATEGY}" == "managed") && ("${BACKEND}" == "bookstore")
create_service "${ESP_SERVICE}" "${SERVICE_IDL}"

# Need to wait for ServiceControl to detect new rollout
# Need to run some traffic in order for ServiceControl to send the new rollout.
# Here wait for 200 seconds.
for l in {1..20}
do
echo "Wait for the new config to propagate: ${l}"
check_http_service "${HOST}/shelves" 200
sleep 10
done

Expand Down
10 changes: 9 additions & 1 deletion src/api_manager/api_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ utils::Status ApiManagerImpl::Init() {
if (status.ok()) {
AddAndDeployConfigs(std::move(configs), true);
}
}));
},
[this]() { DetectRolloutIDChange(); }));

if (global_context_->server_config()->has_service_config_rollout()) {
config_manager_->set_current_rollout_id(global_context_->server_config()
Expand All @@ -223,6 +224,13 @@ utils::Status ApiManagerImpl::Init() {
return utils::Status::OK;
}

void ApiManagerImpl::DetectRolloutIDChange() {
if (!service_context_map_.empty()) {
const auto &it = service_context_map_.begin();
it->second->service_control()->SendEmptyReport();
}
}

utils::Status ApiManagerImpl::Close() {
if (global_context_->cloud_trace_aggregator()) {
global_context_->cloud_trace_aggregator()->SendAndClearTraces();
Expand Down
3 changes: 3 additions & 0 deletions src/api_manager/api_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ class ApiManagerImpl : public ApiManager {
utils::Status AddAndDeployConfigs(
std::vector<std::pair<std::string, int>> &&configs, bool initialize);

// Send empty report to detect rollout ID change
void DetectRolloutIDChange();

// The check work flow.
std::shared_ptr<CheckWorkflow> check_workflow_;

Expand Down
28 changes: 21 additions & 7 deletions src/api_manager/config_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,29 @@ const int kFetchThrottleWindowInS = 300;

const char kRolloutStrategyManaged[] = "managed";

// The default periodical interval to detect rollout changes. Unit: seconds.
const int kDetectRolloutChangeIntervalInS = 60;

} // namespace

ConfigManager::ConfigManager(
std::shared_ptr<context::GlobalContext> global_context,
RolloutApplyFunction rollout_apply_function)
RolloutApplyFunction rollout_apply_function,
std::function<void()> detect_rollout_func)
: global_context_(global_context),
rollout_apply_function_(rollout_apply_function),
fetch_throttle_window_in_s_(kFetchThrottleWindowInS) {
int detect_rollout_interval_s = kDetectRolloutChangeIntervalInS;
if (global_context_->server_config() &&
global_context_->server_config()->has_service_management_config()) {
const auto& cfg =
global_context_->server_config()->service_management_config();
// update fetch_throttle_window
if (global_context_->server_config()
->service_management_config()
.fetch_throttle_window_s() > 0) {
fetch_throttle_window_in_s_ = global_context_->server_config()
->service_management_config()
.fetch_throttle_window_s();
if (cfg.fetch_throttle_window_s() > 0) {
fetch_throttle_window_in_s_ = cfg.fetch_throttle_window_s();
}
if (cfg.detect_rollout_interval_s() > 0) {
detect_rollout_interval_s = cfg.detect_rollout_interval_s();
}
}
static std::random_device random_device;
Expand All @@ -52,12 +58,20 @@ ConfigManager::ConfigManager(
0, fetch_throttle_window_in_s_ * 1000));

service_management_fetch_.reset(new ServiceManagementFetch(global_context));

if (detect_rollout_func) {
detect_rollout_change_timer_ = global_context_->env()->StartPeriodicTimer(
std::chrono::seconds(detect_rollout_interval_s), detect_rollout_func);
}
}

ConfigManager::~ConfigManager() {
if (fetch_timer_) {
fetch_timer_->Stop();
}
if (detect_rollout_change_timer_) {
detect_rollout_change_timer_->Stop();
}
};

void ConfigManager::SetLatestRolloutId(
Expand Down
6 changes: 5 additions & 1 deletion src/api_manager/config_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class ConfigManager {
// rollout_apply_function when it successfully downloads the latest successful
// rollout
ConfigManager(std::shared_ptr<context::GlobalContext> global_context,
RolloutApplyFunction rollout_apply_function);
RolloutApplyFunction rollout_apply_function,
std::function<void()> detect_rollout_func);
virtual ~ConfigManager();

public:
Expand Down Expand Up @@ -122,6 +123,9 @@ class ConfigManager {
// The random objects to throttle the timer
std::default_random_engine random_generator_;
std::unique_ptr<std::uniform_int_distribution<int>> random_dist_;

// Periodic timer to send empty report to detect latest rollout change.
std::unique_ptr<PeriodicTimer> detect_rollout_change_timer_;
};

} // namespace api_manager
Expand Down
24 changes: 16 additions & 8 deletions src/api_manager/config_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ TEST_F(ConfigManagerServiceNameConfigIdTest, VerifyTimerIntervalDistribution) {
[this, &sequence](const utils::Status&,
const std::vector<std::pair<std::string, int>>&) {
sequence++;
}));
},
nullptr));
config_manager->set_current_rollout_id("2017-05-01r0");

// Default is 5 minute interval. Use 5 slot for each minute.
Expand Down Expand Up @@ -283,7 +284,8 @@ TEST_F(ConfigManagerServiceNameConfigIdTest, RolloutSingleServiceConfig) {
EXPECT_EQ(kServiceConfig1, list[0].first);
EXPECT_EQ(100, list[0].second);
sequence++;
}));
},
nullptr));

config_manager->SetLatestRolloutId("2017-05-01r0",
std::chrono::system_clock::now());
Expand All @@ -306,7 +308,8 @@ TEST_F(ConfigManagerServiceNameConfigIdTest, RolloutIDNotChanged) {
[this, &sequence](const utils::Status& status,
const std::vector<std::pair<std::string, int>>& list) {
sequence++;
}));
},
nullptr));

// set rollout_id to 2017-05-01r0 which is same as kRolloutsResponse1
config_manager->set_current_rollout_id("2017-05-01r0");
Expand All @@ -327,7 +330,8 @@ TEST_F(ConfigManagerServiceNameConfigIdTest, RepeatedTrigger) {
EXPECT_EQ(kServiceConfig1, list[0].first);
EXPECT_EQ(100, list[0].second);
sequence++;
}));
},
nullptr));
config_manager->set_current_rollout_id("2017-05-01r0");

auto now = std::chrono::system_clock::now();
Expand Down Expand Up @@ -416,7 +420,8 @@ TEST_F(ConfigManagerServiceNameConfigIdTest, RolloutMultipleServiceConfig) {
EXPECT_EQ(kServiceConfig2, list[1].first);
EXPECT_EQ(20, list[1].second);
sequence++;
}));
},
nullptr));

config_manager->SetLatestRolloutId("2017-05-01r0",
std::chrono::system_clock::now());
Expand Down Expand Up @@ -487,7 +492,8 @@ TEST_F(ConfigManagerServiceNameConfigIdTest,
[this, &sequence](const utils::Status& status,
const std::vector<std::pair<std::string, int>>& list) {
sequence++;
}));
},
nullptr));

config_manager->SetLatestRolloutId("2017-05-01r0",
std::chrono::system_clock::now());
Expand Down Expand Up @@ -553,7 +559,8 @@ TEST_F(ConfigManagerServiceNameConfigIdTest, RolloutSingleServiceConfigUpdate) {
EXPECT_EQ(100, list[0].second);

sequence++;
}));
},
nullptr));

config_manager->SetLatestRolloutId("2017-05-01r0",
std::chrono::system_clock::now());
Expand Down Expand Up @@ -600,7 +607,8 @@ TEST_F(ConfigManagerServiceNameConfigIdTest,
EXPECT_EQ(100, list[0].second);

sequence++;
}));
},
nullptr));

config_manager->SetLatestRolloutId("2017-05-01r0",
std::chrono::system_clock::now());
Expand Down
4 changes: 4 additions & 0 deletions src/api_manager/proto/server_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ message ServiceManagementConfig {
// at the same time to exceed its quota.
// If not specified, or 0, default is 300 (5 minutes).
int32 fetch_throttle_window_s = 2;

// The periodical timer interval in seconds to detect rollout changes.
// If not specified, or 0, default is 60s (1 minute).
Comment thread
nareddyt marked this conversation as resolved.
int32 detect_rollout_interval_s = 3;
}

// Maps service configuration files to their corresponding traffic percentage.
Expand Down
10 changes: 10 additions & 0 deletions src/api_manager/service_control/aggregated.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,16 @@ Status Aggregated::Close() {
return Status::OK;
}

void Aggregated::SendEmptyReport() {
ReportRequest request;
ReportResponse* response = new ReportResponse;
Call(request, response,
[this, response](const ::google::protobuf::util::Status&) {
delete response;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused: Where does ESP read the latest rollout ID from the ReportResponse? Shouldn't it be using the response to check for it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just using the existing code to process the Check/Report response. It is in the HandleRespones() function.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change only adds timer to trigger a dummy Report calls in every minutes. The existing code will handle the rest: on every check/response code it will check rollout id, then it set to a callback function which is intercepted by config_manager.

},
nullptr);
}

Status Aggregated::Report(const ReportRequestInfo& info) {
if (!client_) {
return Status(Code::INTERNAL, "Missing service control client");
Expand Down
2 changes: 2 additions & 0 deletions src/api_manager/service_control/aggregated.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class Aggregated : public Interface {

virtual ~Aggregated();

void SendEmptyReport() override;

virtual utils::Status Report(const ReportRequestInfo& info);

virtual void Check(
Expand Down
3 changes: 3 additions & 0 deletions src/api_manager/service_control/interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class Interface {
// is enabled. HTTP request errors carry Nginx error code.
virtual utils::Status Report(const ReportRequestInfo& info) = 0;

// Send an empty report to get the latest rollout_id.
virtual void SendEmptyReport() = 0;

// Sends ServiceControl Check asynchronously.
// on_done() function will be called once it is completed.
// utils::Status in the on_done callback:
Expand Down
3 changes: 2 additions & 1 deletion src/nginx/t/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
################################################################################
#
load("@io_bazel_rules_perl//perl:perl.bzl", "perl_library")
load("//:nginx.bzl", "nginx_test", "nginx_suite")
load("//:nginx.bzl", "nginx_suite", "nginx_test")

perl_library(
name = "perl_library",
Expand Down Expand Up @@ -211,6 +211,7 @@ nginx_suite(
"check_with_token.t",
"config_extra_field.t",
"config_missing.t",
"config_rollouts_by_timer.t",
"config_rollouts_managed.t",
"cors.t",
"cors_disabled.t",
Expand Down
Loading