Skip to content

Commit

Permalink
Use dynamic bind address for internal + external mini clusters
Browse files Browse the repository at this point in the history
This technique helps avoid port conflicts. It assigns a unique loopback
IP, based on pid and per-cluster numbering, to every InternalMiniCluster
or ExternalMiniCluster component (including masters and tablet servers).

Changes:

1. Apply the "dynamic bind address" technique to internal mini clusters
   (previously it was only used for external mini clusters).
2. Apply it to masters (previously it was only used for tablet servers).

I will post a follow-up patch to remove some of the CMakeLists.txt
limitations related to port conflicts, although we may need to put some
back due to lack of macOS support for unique loopback addresses.

I had to refactor a few host-related APIs to do this in a relatively
non-hacky manner. All tests still pass (and should more often).

Additional changes:

* Move BindMode to MiniCluster base class.
* Fix a few brittle tests that started failing because of this change.

Change-Id: I35eff9fbf5ccf8822cfe061673bc36598dff56f0
Reviewed-on: http://gerrit.cloudera.org:8080/7274
Tested-by: Mike Percy <mpercy@apache.org>
Reviewed-by: Todd Lipcon <todd@apache.org>
  • Loading branch information
mpercy authored and toddlipcon committed Jun 28, 2017
1 parent 42a0749 commit e675873
Show file tree
Hide file tree
Showing 20 changed files with 316 additions and 192 deletions.
7 changes: 3 additions & 4 deletions src/kudu/client/client-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2131,10 +2131,9 @@ TEST_F(ClientTest, TestWriteTimeout) {
unique_ptr<KuduError> error = GetSingleErrorFromSession(session.get());
const Status& status = error->status();
ASSERT_TRUE(status.IsTimedOut()) << status.ToString();
ASSERT_STR_CONTAINS(status.ToString(),
"Failed to write batch of 1 ops to tablet");
ASSERT_STR_CONTAINS(status.ToString(), "Write RPC to 127.0.0.1:");
ASSERT_STR_CONTAINS(status.ToString(), "after 1 attempt");
ASSERT_STR_MATCHES(status.ToString(),
R"(Failed to write batch of 1 ops to tablet.*after 1 attempt.*)"
R"(Write RPC to 127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}:.*timed out)");
}
}

Expand Down
1 change: 1 addition & 0 deletions src/kudu/integration-tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ set(INTEGRATION_TESTS_SRCS
external_mini_cluster_fs_inspector.cc
internal_mini_cluster.cc
log_verifier.cc
mini_cluster.cc
test_workload.cc
)

Expand Down
12 changes: 9 additions & 3 deletions src/kudu/integration-tests/external_mini_cluster-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,14 @@ TEST_P(ExternalMiniClusterTest, TestBasicOperation) {
SCOPED_TRACE(i);
ExternalMaster* master = CHECK_NOTNULL(cluster.master(i));
HostPort master_rpc = master->bound_rpc_hostport();
EXPECT_TRUE(HasPrefixString(master_rpc.ToString(), "127.0.0.1:")) << master_rpc.ToString();
string expected_prefix = Substitute("$0:", cluster.GetBindIpForMaster(i));
if (cluster.bind_mode() == MiniCluster::UNIQUE_LOOPBACK) {
EXPECT_NE(expected_prefix, "127.0.0.1:") << "Should bind to unique per-server hosts";
}
EXPECT_TRUE(HasPrefixString(master_rpc.ToString(), expected_prefix)) << master_rpc.ToString();

HostPort master_http = master->bound_http_hostport();
EXPECT_TRUE(HasPrefixString(master_http.ToString(), "127.0.0.1:")) << master_http.ToString();
EXPECT_TRUE(HasPrefixString(master_http.ToString(), expected_prefix)) << master_http.ToString();

// Retrieve a thread metric, which should always be present on any master.
int64_t value;
Expand All @@ -141,7 +145,9 @@ TEST_P(ExternalMiniClusterTest, TestBasicOperation) {
ExternalTabletServer* ts = CHECK_NOTNULL(cluster.tablet_server(i));
HostPort ts_rpc = ts->bound_rpc_hostport();
string expected_prefix = Substitute("$0:", cluster.GetBindIpForTabletServer(i));
EXPECT_NE(expected_prefix, "127.0.0.1") << "Should bind to unique per-server hosts";
if (cluster.bind_mode() == MiniCluster::UNIQUE_LOOPBACK) {
EXPECT_NE(expected_prefix, "127.0.0.1:") << "Should bind to unique per-server hosts";
}
EXPECT_TRUE(HasPrefixString(ts_rpc.ToString(), expected_prefix)) << ts_rpc.ToString();

HostPort ts_http = ts->bound_http_hostport();
Expand Down
114 changes: 57 additions & 57 deletions src/kudu/integration-tests/external_mini_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,13 @@ namespace kudu {

static const char* const kMasterBinaryName = "kudu-master";
static const char* const kTabletServerBinaryName = "kudu-tserver";
static const char* const kWildcardIpAddr = "0.0.0.0";
static const char* const kLoopbackIpAddr = "127.0.0.1";
static double kTabletServerRegistrationTimeoutSeconds = 15.0;
static double kMasterCatalogManagerTimeoutSeconds = 60.0;

#if defined(__APPLE__)
static ExternalMiniClusterOptions::BindMode kBindMode =
ExternalMiniClusterOptions::BindMode::LOOPBACK;
#else
static ExternalMiniClusterOptions::BindMode kBindMode =
ExternalMiniClusterOptions::BindMode::UNIQUE_LOOPBACK;
#endif

ExternalMiniClusterOptions::ExternalMiniClusterOptions()
: num_masters(1),
num_tablet_servers(1),
bind_mode(kBindMode),
bind_mode(MiniCluster::kDefaultBindMode),
num_data_dirs(1),
enable_kerberos(false),
logtostderr(true),
Expand Down Expand Up @@ -303,9 +293,14 @@ Status ExternalMiniCluster::StartSingleMaster() {
}
opts.extra_flags = SubstituteInFlags(opts_.extra_master_flags, 0);
opts.start_process_timeout = opts_.start_process_timeout;

opts.rpc_bind_address = HostPort(GetBindIpForMaster(0), 0);
scoped_refptr<ExternalMaster> master = new ExternalMaster(opts);
if (opts_.enable_kerberos) {
RETURN_NOT_OK_PREPEND(master->EnableKerberos(kdc_.get(), Substitute("$0", kLoopbackIpAddr)),
// The bind host here is the hostname that will be used to generate the
// Kerberos principal, so it has to match the bind address for the master
// rpc endpoint.
RETURN_NOT_OK_PREPEND(master->EnableKerberos(kdc_.get(), opts.rpc_bind_address.host()),
"could not enable Kerberos");
}

Expand All @@ -322,13 +317,10 @@ Status ExternalMiniCluster::StartDistributedMasters() {
opts_.master_rpc_ports.size() << " ports specified in 'master_rpc_ports'";
}

vector<string> peer_addrs;
for (int i = 0; i < num_masters; i++) {
string addr = Substitute("$0:$1", kLoopbackIpAddr, opts_.master_rpc_ports[i]);
peer_addrs.push_back(addr);
}
vector<HostPort> peer_hostports = master_rpc_addrs();
vector<string> flags = opts_.extra_master_flags;
flags.push_back("--master_addresses=" + JoinStrings(peer_addrs, ","));
flags.push_back(Substitute("--master_addresses=$0",
HostPort::ToCommaSeparatedString(peer_hostports)));
string exe = GetBinaryPath(kMasterBinaryName);

// Start the masters.
Expand All @@ -347,10 +339,11 @@ Status ExternalMiniCluster::StartDistributedMasters() {
}
opts.extra_flags = SubstituteInFlags(flags, i);
opts.start_process_timeout = opts_.start_process_timeout;
opts.rpc_bind_address = peer_hostports[i];

scoped_refptr<ExternalMaster> peer = new ExternalMaster(opts, peer_addrs[i]);
scoped_refptr<ExternalMaster> peer = new ExternalMaster(opts);
if (opts_.enable_kerberos) {
RETURN_NOT_OK_PREPEND(peer->EnableKerberos(kdc_.get(), Substitute("$0", kLoopbackIpAddr)),
RETURN_NOT_OK_PREPEND(peer->EnableKerberos(kdc_.get(), peer_hostports[i].host()),
"could not enable Kerberos");
}
RETURN_NOT_OK_PREPEND(peer->Start(),
Expand All @@ -362,17 +355,11 @@ Status ExternalMiniCluster::StartDistributedMasters() {
}

string ExternalMiniCluster::GetBindIpForTabletServer(int index) const {
string bind_ip;
if (opts_.bind_mode == ExternalMiniClusterOptions::UNIQUE_LOOPBACK) {
pid_t p = getpid();
CHECK_LE(p, MathLimits<uint16_t>::kMax) << "Cannot run on systems with >16-bit pid";
bind_ip = Substitute("127.$0.$1.$2", p >> 8, p & 0xff, index);
} else if (opts_.bind_mode == ExternalMiniClusterOptions::WILDCARD) {
bind_ip = Substitute("$0", kWildcardIpAddr);
} else {
bind_ip = Substitute("$0", kLoopbackIpAddr);
}
return bind_ip;
return MiniCluster::GetBindIpForDaemon(MiniCluster::TSERVER, index, opts_.bind_mode);
}

string ExternalMiniCluster::GetBindIpForMaster(int index) const {
return MiniCluster::GetBindIpForDaemon(MiniCluster::MASTER, index, opts_.bind_mode);
}

Status ExternalMiniCluster::AddTabletServer() {
Expand Down Expand Up @@ -400,9 +387,10 @@ Status ExternalMiniCluster::AddTabletServer() {
}
opts.extra_flags = SubstituteInFlags(opts_.extra_tserver_flags, idx);
opts.start_process_timeout = opts_.start_process_timeout;
opts.rpc_bind_address = HostPort(bind_host, 0);

scoped_refptr<ExternalTabletServer> ts =
new ExternalTabletServer(opts, bind_host, master_hostports);
new ExternalTabletServer(opts, master_hostports);
if (opts_.enable_kerberos) {
RETURN_NOT_OK_PREPEND(ts->EnableKerberos(kdc_.get(), bind_host),
"could not enable Kerberos");
Expand Down Expand Up @@ -587,6 +575,16 @@ vector<ExternalDaemon*> ExternalMiniCluster::daemons() const {
return results;
}

vector<HostPort> ExternalMiniCluster::master_rpc_addrs() const {
vector<HostPort> master_rpc_addrs;
for (int i = 0; i < opts_.master_rpc_ports.size(); i++) {
master_rpc_addrs.emplace_back(
GetBindIpForDaemon(MiniCluster::MASTER, i, opts_.bind_mode),
opts_.master_rpc_ports[i]);
}
return master_rpc_addrs;
}

std::shared_ptr<rpc::Messenger> ExternalMiniCluster::messenger() const {
return messenger_;
}
Expand Down Expand Up @@ -650,8 +648,11 @@ ExternalDaemon::ExternalDaemon(ExternalDaemonOptions opts)
perf_record_filename_(std::move(opts.perf_record_filename)),
start_process_timeout_(opts.start_process_timeout),
logtostderr_(opts.logtostderr),
rpc_bind_address_(std::move(opts.rpc_bind_address)),
exe_(std::move(opts.exe)),
extra_flags_(std::move(opts.extra_flags)) {}
extra_flags_(std::move(opts.extra_flags)) {
CHECK(rpc_bind_address_.Initialized());
}

ExternalDaemon::~ExternalDaemon() {
}
Expand Down Expand Up @@ -917,14 +918,13 @@ void ExternalDaemon::Shutdown() {
// Before we kill the process, store the addresses. If we're told to
// start again we'll reuse these. Store only the port if the
// daemons were using wildcard address for binding.
const string& wildcard_ip = Substitute("$0", kWildcardIpAddr);
if (get_rpc_bind_address() != wildcard_ip) {
if (rpc_bind_address().host() != MiniCluster::kWildcardIpAddr) {
bound_rpc_ = bound_rpc_hostport();
bound_http_ = bound_http_hostport();
} else {
bound_rpc_.set_host(wildcard_ip);
bound_rpc_.set_host(MiniCluster::kWildcardIpAddr);
bound_rpc_.set_port(bound_rpc_hostport().port());
bound_http_.set_host(wildcard_ip);
bound_http_.set_host(MiniCluster::kWildcardIpAddr);
bound_http_.set_port(bound_http_hostport().port());
}

Expand Down Expand Up @@ -1112,22 +1112,16 @@ ScopedResumeExternalDaemon::~ScopedResumeExternalDaemon() {

ExternalMaster::ExternalMaster(ExternalDaemonOptions opts)
: ExternalDaemon(std::move(opts)) {
set_rpc_bind_address(Substitute("$0:0", kLoopbackIpAddr));
}

ExternalMaster::ExternalMaster(ExternalDaemonOptions opts,
string rpc_bind_address)
: ExternalDaemon(std::move(opts)) {
set_rpc_bind_address(std::move(rpc_bind_address));
}

ExternalMaster::~ExternalMaster() {
}

Status ExternalMaster::Start() {
vector<string> flags(GetCommonFlags());
flags.push_back(Substitute("--rpc_bind_addresses=$0", rpc_bind_address().ToString()));
flags.push_back(Substitute("--webserver_interface=$0", rpc_bind_address().host()));
flags.emplace_back("--webserver_port=0");
flags.push_back("--rpc_bind_addresses=" + get_rpc_bind_address());
return StartProcess(flags);
}

Expand All @@ -1138,10 +1132,15 @@ Status ExternalMaster::Restart() {
}

vector<string> flags(GetCommonFlags());
flags.push_back(Substitute("--rpc_bind_addresses=$0", bound_rpc_.ToString()));

if (bound_http_.Initialized()) {
flags.push_back(Substitute("--webserver_interface=$0", bound_http_.host()));
flags.push_back(Substitute("--webserver_port=$0", bound_http_.port()));
} else {
flags.push_back(Substitute("--webserver_interface=$0", bound_rpc_.host()));
flags.emplace_back("--webserver_port=0");
}
flags.push_back("--rpc_bind_addresses=" + bound_rpc_.ToString());

return StartProcess(flags);
}
Expand Down Expand Up @@ -1210,11 +1209,10 @@ vector<string> ExternalMaster::GetCommonFlags() const {
//------------------------------------------------------------

ExternalTabletServer::ExternalTabletServer(ExternalDaemonOptions opts,
string bind_host,
vector<HostPort> master_addrs)
: ExternalDaemon(std::move(opts)),
master_addrs_(HostPort::ToCommaSeparatedString(master_addrs)) {
set_rpc_bind_address(std::move(bind_host));
master_addrs_(std::move(master_addrs)) {
DCHECK(!master_addrs_.empty());
}

ExternalTabletServer::~ExternalTabletServer() {
Expand All @@ -1224,14 +1222,15 @@ Status ExternalTabletServer::Start() {
vector<string> flags;
flags.push_back("--fs_wal_dir=" + wal_dir_);
flags.push_back("--fs_data_dirs=" + JoinStrings(data_dirs_, ","));
flags.push_back(Substitute("--rpc_bind_addresses=$0:0",
get_rpc_bind_address()));
flags.push_back(Substitute("--rpc_bind_addresses=$0",
rpc_bind_address().ToString()));
flags.push_back(Substitute("--local_ip_for_outbound_sockets=$0",
get_rpc_bind_address()));
rpc_bind_address().host()));
flags.push_back(Substitute("--webserver_interface=$0",
get_rpc_bind_address()));
rpc_bind_address().host()));
flags.emplace_back("--webserver_port=0");
flags.push_back("--tserver_master_addrs=" + master_addrs_);
flags.push_back(Substitute("--tserver_master_addrs=$0",
HostPort::ToCommaSeparatedString(master_addrs_)));
RETURN_NOT_OK(StartProcess(flags));
return Status::OK();
}
Expand All @@ -1244,15 +1243,16 @@ Status ExternalTabletServer::Restart() {
vector<string> flags;
flags.push_back("--fs_wal_dir=" + wal_dir_);
flags.push_back("--fs_data_dirs=" + JoinStrings(data_dirs_, ","));
flags.push_back("--rpc_bind_addresses=" + bound_rpc_.ToString());
flags.push_back(Substitute("--rpc_bind_addresses=$0", bound_rpc_.ToString()));
flags.push_back(Substitute("--local_ip_for_outbound_sockets=$0",
get_rpc_bind_address()));
rpc_bind_address().host()));
if (bound_http_.Initialized()) {
flags.push_back(Substitute("--webserver_port=$0", bound_http_.port()));
flags.push_back(Substitute("--webserver_interface=$0",
bound_http_.host()));
}
flags.push_back("--tserver_master_addrs=" + master_addrs_);
flags.push_back(Substitute("--tserver_master_addrs=$0",
HostPort::ToCommaSeparatedString(master_addrs_)));
return StartProcess(flags);
}

Expand Down

0 comments on commit e675873

Please sign in to comment.