Skip to content

CPP-499 allow local bind #411

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions gtests/src/integration/objects/cluster.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,19 @@ class Cluster : public Object<CassCluster, cass_cluster_free> {
return *this;
}

/**
* Assign the local address to bind; passing an empty string will clear
* the local address.
*
* @param name An IP address or hostname
* @return Cluster object
*/
Cluster& with_local_address(const std::string& name) {
EXPECT_EQ(CASS_OK, cass_cluster_set_local_address(get(),
name.c_str()));
return *this;
}

/**
* Assign the number of connections made to each node/server for each
* connections thread
Expand Down
93 changes: 93 additions & 0 deletions gtests/src/integration/tests/test_control_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@
* Control connection integration tests; single node cluster
*/
class ControlConnectionTests : public Integration {

public:

void SetUp() {
// Call the parent setup function (don't automatically start session,
// because we don't want any connections established until we have
// set up the cluster).
is_session_requested_ = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is ok for the existing tests? @mikefero

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a nice improvement as almost all of the tests create a cluster and session for themselves, meaning their are currently multiple cluster and sessions for each test. The only test that I see failing would be FullOutage; before the cluster is stopped a call to connect() should be made to establish the session_ instance in the base class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I've added that in.

Integration::SetUp();
}

protected:
/**
* Execute multiple requests and ensure the expected nodes are used during
Expand Down Expand Up @@ -186,6 +197,87 @@ CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests, ConnectUsingInvalidPort) {
}
}

/**
* Perform session connection using unresolvable local IP address
*
* This test will attempt to perform a connection using an unresolvable local
* IP address and ensure the control connection is not established against a
* single node cluster.
*
* @test_category control_connection
* @since core:1.0.0
* @expected_result Control connection will not be established
*/
CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests,
ConnectUsingUnresolvableLocalIpAddress) {
CHECK_FAILURE;

// Attempt to connect to the server using an unresolvable local IP address
Cluster cluster = default_cluster();
EXPECT_EQ(CASS_ERROR_LIB_HOST_RESOLUTION,
cass_cluster_set_local_address(cluster.get(), "unknown.invalid"));
}

/**
* Perform session connection using unbindable local IP address
*
* This test will attempt to perform a connection using an unbindable local IP
* address and ensure the control connection is not established against a
* single node cluster.
*
* @test_category control_connection
* @since core:1.0.0
* @expected_result Control connection will not be established
*/
CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests,
ConnectUsingUnbindableLocalIpAddress) {
CHECK_FAILURE;

// Attempt to connect to the server using an unbindable local IP address
logger_.add_critera("Unable to bind local address: address not available");
Cluster cluster = default_cluster().with_local_address("1.1.1.1");
try {
cluster.connect();
FAIL() << "Connection was established using unbindable local IP address";
} catch (Session::Exception& se) {
ASSERT_EQ(CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, se.error_code());
ASSERT_GE(logger_.count(), 1u);
}
}

/**
* Perform session connection using valid local IP address but invalid
* remote address
*
* This test will attempt to perform a connection using a valid local IP
* address and invalid remote address and ensure the control connection is
* not established against a single node cluster.
*
* @test_category control_connection
* @since core:1.0.0
* @expected_result Control connection will not be established
*/
CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests,
ConnectUsingValidLocalIpAddressButInvalidRemote) {
CHECK_FAILURE;

// Attempt to connect to the server using an valid local IP address
// but invalid remote address. The specified remote is not routable
// from the specified local.
logger_.add_critera("Unable to establish a control connection to host " \
"1.1.1.1 because of the following error: " \
"Connect error 'operation not permitted'");
Cluster cluster = Cluster::build().with_contact_points("1.1.1.1")
.with_local_address("127.0.0.1");
try {
cluster.connect();
FAIL() << "Connection was established using invalid IP address";
} catch (Session::Exception& se) {
ASSERT_EQ(CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, se.error_code());
ASSERT_GE(logger_.count(), 1u);
}
}

/**
* Perform session connection while forcing a control connection reconnect
*
Expand Down Expand Up @@ -495,6 +587,7 @@ CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests,
CHECK_FAILURE;

// Stop the cluster and attempt to perform a request
connect();
ccm_->stop_cluster();
Result result = session_.execute(SELECT_ALL_SYSTEM_LOCAL_CQL,
CASS_CONSISTENCY_ONE, false, false);
Expand Down
33 changes: 33 additions & 0 deletions include/cassandra.h
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,39 @@ CASS_EXPORT CassError
cass_cluster_set_port(CassCluster* cluster,
int port);

/**
* Sets the local address to bind when connecting to the cluster,
* if desired.
*
* @public @memberof CassCluster
*
* @param[in] cluster
* @param[in] name IP address to bind, or empty string for no binding.
* Only numeric addresses are supported; no resolution is done.
* @return CASS_OK if successful, otherwise an error occurred.
*/
CASS_EXPORT CassError
cass_cluster_set_local_address(CassCluster* cluster,
const char* name);

/**
* Same as cass_cluster_set_local_address(), but with lengths for string
* parameters.
*
* @public @memberof CassCluster
*
* @param[in] cluster
* @param[in] name
* @param[in] name_length
* @return same as cass_cluster_set_local_address()
*
* @see cass_cluster_set_local_address()
*/
CASS_EXPORT CassError
cass_cluster_set_local_address_n(CassCluster* cluster,
const char* name,
size_t name_length);

/**
* Sets the SSL context and enables SSL.
*
Expand Down
19 changes: 19 additions & 0 deletions src/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,25 @@ CassError cass_cluster_set_prepare_on_up_or_add_host(CassCluster* cluster,
return CASS_OK;
}

CassError cass_cluster_set_local_address(CassCluster* cluster,
const char* name) {
return cass_cluster_set_local_address_n(cluster, name, SAFE_STRLEN(name));
}

CassError cass_cluster_set_local_address_n(CassCluster* cluster,
const char* name,
size_t name_length) {
cass::Address address; // default to AF_UNSPEC
if (name_length == 0 ||
name == NULL ||
cass::Address::from_string(std::string(name, name_length), 0, &address)) {
cluster->config().set_local_address(address);
} else {
return CASS_ERROR_LIB_HOST_RESOLUTION;
}
return CASS_OK;
}

CassError cass_cluster_set_no_compact(CassCluster* cluster,
cass_bool_t enabled) {
cluster->config().set_no_compact(enabled == cass_true);
Expand Down
8 changes: 8 additions & 0 deletions src/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,13 @@ class Config {
prepare_on_up_or_add_host_ = enabled;
}

const Address* local_address() const {
return local_address_.is_valid() ? &local_address_ : NULL; }

void set_local_address(const Address& address) {
local_address_ = address;
}

bool no_compact() const { return no_compact_; }

void set_no_compact(bool enabled) {
Expand Down Expand Up @@ -448,6 +455,7 @@ class Config {
bool use_randomized_contact_points_;
bool prepare_on_all_hosts_;
bool prepare_on_up_or_add_host_;
Address local_address_;
bool no_compact_;
};

Expand Down
12 changes: 11 additions & 1 deletion src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,15 @@ Connection::Connection(uv_loop_t* loop,
LOG_WARN("Unable to set tcp keepalive");
}

const Address* local_address = config_.local_address();
if (local_address) {
int rc = uv_tcp_bind(&socket_, local_address->addr(), 0);
if (rc) {
notify_error("Unable to bind local address: " + std::string(UV_ERRSTR(rc, loop_)));
return;
}
}

SslContext* ssl_context = config_.ssl_context();
if (ssl_context != NULL) {
ssl_session_.reset(ssl_context->create_session(host));
Expand Down Expand Up @@ -378,7 +387,8 @@ void Connection::set_state(ConnectionState new_state) {

switch (state_) {
case CONNECTION_STATE_NEW:
assert(new_state == CONNECTION_STATE_CONNECTING &&
assert((new_state == CONNECTION_STATE_CONNECTING ||
new_state == CONNECTION_STATE_CLOSE_DEFUNCT) &&
"Invalid connection state after new");
state_ = new_state;
break;
Expand Down