Skip to content

Commit

Permalink
Let the user configure C* driver maxConnectionsPerHost and maxRequest…
Browse files Browse the repository at this point in the history
…sPerConnection

Also, log at DEBUG level current connection usage
  • Loading branch information
tsegismont committed Jun 21, 2016
1 parent 961c2c0 commit 5f16f05
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 0 deletions.
Expand Up @@ -23,6 +23,8 @@

import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.CASSANDRA_CQL_PORT;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.CASSANDRA_KEYSPACE;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.CASSANDRA_MAX_CONN_HOST;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.CASSANDRA_MAX_REQUEST_CONN;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.CASSANDRA_NODES;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.CASSANDRA_RESETDB;
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.CASSANDRA_USESSL;
Expand All @@ -40,6 +42,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
Expand Down Expand Up @@ -68,9 +71,14 @@
import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Configuration;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.JdkSSLOptions;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;
Expand Down Expand Up @@ -122,6 +130,16 @@ public enum State {
@ConfigurationProperty(CASSANDRA_RESETDB)
private String resetDb;

@Inject
@Configurable
@ConfigurationProperty(CASSANDRA_MAX_CONN_HOST)
private String maxConnectionsPerHost;

@Inject
@Configurable
@ConfigurationProperty(CASSANDRA_MAX_REQUEST_CONN)
private String maxRequestsPerConnection;

@Inject
@Configurable
@ConfigurationProperty(WAIT_FOR_SERVICE)
Expand Down Expand Up @@ -249,6 +267,22 @@ private void startMetricsService() {

metricsServiceReady.fire(new ServiceReadyEvent(metricsService.insertedDataEvents()));

Configuration configuration = session.getCluster().getConfiguration();
LoadBalancingPolicy loadBalancingPolicy = configuration.getPolicies().getLoadBalancingPolicy();
PoolingOptions poolingOptions = configuration.getPoolingOptions();
lifecycleExecutor.scheduleAtFixedRate(() -> {
if (log.isDebugEnabled()) {
Session.State state = session.getState();
for (Host host : state.getConnectedHosts()) {
HostDistance distance = loadBalancingPolicy.distance(host);
int connections = state.getOpenConnections(host);
int inFlightQueries = state.getInFlightQueries(host);
log.debugf("%s connections=%d, current load=%d, max load=%d%n", host, connections,
inFlightQueries, connections * poolingOptions.getMaxRequestsPerConnection(distance));
}
}
}, 5, 5, TimeUnit.SECONDS);

state = State.STARTED;
log.infoServiceStarted();

Expand Down Expand Up @@ -295,6 +329,29 @@ private Session createSession() {
clusterBuilder.withoutJMXReporting();
}

int newMaxConnections;
try {
newMaxConnections = Integer.parseInt(maxConnectionsPerHost);
} catch (NumberFormatException nfe) {
String defaultMaxConnections = CASSANDRA_MAX_CONN_HOST.defaultValue();
log.warnInvalidMaxConnections(maxConnectionsPerHost, defaultMaxConnections);
newMaxConnections = Integer.parseInt(defaultMaxConnections);
}
int newMaxRequests;
try {
newMaxRequests = Integer.parseInt(maxRequestsPerConnection);
} catch (NumberFormatException nfe) {
String defaultMaxRequests = CASSANDRA_MAX_REQUEST_CONN.defaultValue();
log.warnInvalidMaxRequests(maxRequestsPerConnection, defaultMaxRequests);
newMaxRequests = Integer.parseInt(defaultMaxRequests);
}
clusterBuilder.withPoolingOptions(new PoolingOptions()
.setMaxConnectionsPerHost(HostDistance.LOCAL, newMaxConnections)
.setMaxConnectionsPerHost(HostDistance.REMOTE, newMaxConnections)
.setMaxRequestsPerConnection(HostDistance.LOCAL, newMaxRequests)
.setMaxRequestsPerConnection(HostDistance.REMOTE, newMaxRequests)
);

Cluster cluster = clusterBuilder.build();
cluster.init();
Session createdSession = null;
Expand Down
Expand Up @@ -34,6 +34,10 @@ public enum ConfigurationKey {
CASSANDRA_KEYSPACE("cassandra.keyspace", "hawkular_metrics", null, false),
CASSANDRA_RESETDB("cassandra.resetdb", null, null, true),
CASSANDRA_USESSL("hawkular-metrics.cassandra-use-ssl", "false", "CASSANDRA_USESSL", false),
CASSANDRA_MAX_CONN_HOST("hawkular-metrics.cassandra-max-connections-per-host", "10", "CASSANDRA_MAX_CONN_HOST",
false),
CASSANDRA_MAX_REQUEST_CONN("hawkular-metrics.cassandra-max-requests-per-connection", "5000",
"CASSANDRA_MAX_REQUEST_CONN", false),
WAIT_FOR_SERVICE("hawkular.metrics.waitForService", null, null, true),
USE_VIRTUAL_CLOCK("hawkular.metrics.use-virtual-clock", "false", "USE_VIRTUAL_CLOCK", false),
DEFAULT_TTL("hawkular.metrics.default-ttl", "7", "DEFAULT_TTL", false),
Expand Down
Expand Up @@ -81,4 +81,12 @@ public interface RestLogger extends BasicLogger {
@LogMessage(level = WARN)
@Message(id = 200011, value = "Invalid value [%s] for default TTL. Will use a default of %s days")
void warnInvalidDefaultTTL(String ttl, String defaultTTL);

@LogMessage(level = WARN)
@Message(id = 200012, value = "Invalid value [%s] for max connections per host. Will use a default of %s")
void warnInvalidMaxConnections(String maxConnectionsPerHost, String defaultMaxConnections);

@LogMessage(level = WARN)
@Message(id = 200013, value = "Invalid value [%s] for max requests per connection. Will use a default of %s")
void warnInvalidMaxRequests(String maxRequestsPerConnection, String defaultMaxRequests);
}

0 comments on commit 5f16f05

Please sign in to comment.