Skip to content

Commit

Permalink
Adding in bridge for creating java driver config from astyanax config
Browse files Browse the repository at this point in the history
  • Loading branch information
opuneet committed Jul 29, 2014
1 parent d6b7b6d commit 1da33c0
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,6 @@

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Configuration;
import com.datastax.driver.core.MetricsOptions;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.Policies;
import com.datastax.driver.core.policies.RoundRobinPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.netflix.astyanax.AstyanaxConfiguration;
import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.AstyanaxTypeFactory;
Expand Down Expand Up @@ -139,44 +130,14 @@ public static boolean batchColumnUpdates() {

private ConnectionPoolConfiguration getOrCreateJDConfiguration(AstyanaxConfiguration asConfig, ConnectionPoolConfiguration cpConfig) {

if (asConfig.getConnectionPoolType() == ConnectionPoolType.BAG) {
}

Configuration actualConfig = null;

JavaDriverConnectionPoolConfigurationImpl jdConfig = (JavaDriverConnectionPoolConfigurationImpl) cpConfig;
if (jdConfig != null) {
if (cpConfig instanceof JavaDriverConnectionPoolConfigurationImpl) {
JavaDriverConnectionPoolConfigurationImpl jdConfig = (JavaDriverConnectionPoolConfigurationImpl) cpConfig;
if (jdConfig.getJavaDriverConfig() != null) {
actualConfig = jdConfig.getJavaDriverConfig();
if (actualConfig != null) {
return jdConfig;
}
return jdConfig; // Java Driver config has already been setup.
}
}

LoadBalancingPolicy lbPolicy = null;
switch (asConfig.getConnectionPoolType()) {
case BAG:
throw new RuntimeException("Cannot use ConnectionPoolType.BAG with java driver, " +
"use TOKEN_AWARE or ROUND_ROBIN or configure java driver directly");
case ROUND_ROBIN:
lbPolicy = new RoundRobinPolicy();
break;
case TOKEN_AWARE:
lbPolicy = new TokenAwarePolicy(new RoundRobinPolicy());
break;
};

Policies policies = new Policies(lbPolicy, Policies.defaultReconnectionPolicy(), Policies.defaultRetryPolicy());

actualConfig = new Configuration(
policies,
new ProtocolOptions(),
new PoolingOptions(),
new SocketOptions(),
new MetricsOptions(),
new QueryOptions());

return new JavaDriverConnectionPoolConfigurationImpl().withJavaDriverConfig(actualConfig);
// Else create Java Driver Config from AstyanaxConfiguration and ConnectionPoolConfiguration and return that.
return new JavaDriverConnectionPoolConfigurationImpl(new JavaDriverConfigBridge(asConfig, cpConfig).getJDConfig());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package com.netflix.astyanax.cql;

import com.datastax.driver.core.AuthProvider;
import com.datastax.driver.core.Configuration;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.MetricsOptions;
import com.datastax.driver.core.PlainTextAuthProvider;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.Policies;
import com.datastax.driver.core.policies.RoundRobinPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.netflix.astyanax.AstyanaxConfiguration;
import com.netflix.astyanax.AuthenticationCredentials;
import com.netflix.astyanax.connectionpool.ConnectionPoolConfiguration;
import com.netflix.astyanax.cql.util.ConsistencyLevelTransform;

public class JavaDriverConfigBridge {

private final AstyanaxConfiguration asConfig;
private final ConnectionPoolConfiguration cpConfig;

public JavaDriverConfigBridge(AstyanaxConfiguration asConfig, ConnectionPoolConfiguration cpConfig) {
this.asConfig = asConfig;
this.cpConfig = cpConfig;
}

public Configuration getJDConfig() {

return new Configuration(getPolicies(),
getProtocolOptions(),
getPoolingOptions(),
getSocketOptions(),
getMetricsOptions(),
getQueryOptions());
}

private Policies getPolicies() {
return new Policies(getLB(),
Policies.defaultReconnectionPolicy(),
Policies.defaultRetryPolicy(),
Policies.defaultAddressTranslater());
}

private LoadBalancingPolicy getLB() {

switch (asConfig.getConnectionPoolType()) {
case ROUND_ROBIN:
return new RoundRobinPolicy();
case TOKEN_AWARE:
return new TokenAwarePolicy(new RoundRobinPolicy());
case BAG:
throw new RuntimeException("Unsupported connection pool type, use ROUND_ROBIN or TOKEN_AWARE");
default:
return new RoundRobinPolicy();
}
}

private ProtocolOptions getProtocolOptions() {

int port = cpConfig.getPort();
int protocolVersion = -1; // use default

AuthProvider authProvider = AuthProvider.NONE;

AuthenticationCredentials creds = cpConfig.getAuthenticationCredentials();
if (creds != null) {
authProvider = new PlainTextAuthProvider(creds.getUsername(), creds.getPassword());
}

return new ProtocolOptions(port, protocolVersion, null, authProvider);
}

private PoolingOptions getPoolingOptions() {
return new CpConfigBasedPoolingOptions();
}

private SocketOptions getSocketOptions() {
return new CpConfigBasedSocketOptions();
}

private MetricsOptions getMetricsOptions() {
return new MetricsOptions();
}

private QueryOptions getQueryOptions() {
return new ConfigBasedQueryOptions();
}

private class CpConfigBasedPoolingOptions extends PoolingOptions {

private CpConfigBasedPoolingOptions() {

}

@Override
public int getCoreConnectionsPerHost(HostDistance distance) {
return cpConfig.getMaxConnsPerHost() > 4 ? cpConfig.getMaxConnsPerHost()/2 : cpConfig.getMaxConnsPerHost();
}

@Override
public int getMaxConnectionsPerHost(HostDistance distance) {
return cpConfig.getMaxConnsPerHost();
}
}

private class CpConfigBasedSocketOptions extends SocketOptions {

private CpConfigBasedSocketOptions() {

}

@Override
public int getConnectTimeoutMillis() {
return cpConfig.getConnectTimeout();
}

@Override
public int getReadTimeoutMillis() {
return cpConfig.getSocketTimeout();
}
}

private class ConfigBasedQueryOptions extends QueryOptions {

@Override
public ConsistencyLevel getConsistencyLevel() {
return ConsistencyLevelTransform.getConsistencyLevel(asConfig.getDefaultReadConsistencyLevel());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* @author poberai
*
*/
public class JavaDriverConfigBuilder extends JavaDriverConnectionPoolConfigurationImpl {
public class JavaDriverConfigBuilder {

// Config for Policies
private LoadBalancingPolicy loadBalancingPolicy = new RoundRobinPolicy();
Expand All @@ -50,7 +50,7 @@ public JavaDriverConfigBuilder() {
super();
}

public JavaDriverConfigBuilder build() {
public JavaDriverConnectionPoolConfigurationImpl build() {

Policies policies = new Policies(loadBalancingPolicy, reconnectionPolicy, retryPolicy);
ProtocolOptions protocolOptions = (nativeProtocolPort == -1) ? new ProtocolOptions() : new ProtocolOptions(nativeProtocolPort);
Expand All @@ -59,13 +59,12 @@ public JavaDriverConfigBuilder build() {
MetricsOptions metricsOptions = new MetricsOptions(jmxReportingEnabled);
QueryOptions qOptions = queryOptions;

super.withJavaDriverConfig(new Configuration(policies,
return new JavaDriverConnectionPoolConfigurationImpl(new Configuration(policies,
protocolOptions,
poolOptions,
sockOptions,
metricsOptions,
qOptions));
return this;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@
*/
public class JavaDriverConnectionPoolConfigurationImpl implements ConnectionPoolConfiguration {

private Configuration jdConfig = new Configuration();
private final Configuration jdConfig;

public JavaDriverConnectionPoolConfigurationImpl withJavaDriverConfig(Configuration jdCfg) {
jdConfig = jdCfg;
return this;
public JavaDriverConnectionPoolConfigurationImpl(Configuration configuration) {
this.jdConfig = configuration;
}

public Configuration getJavaDriverConfig() {
return jdConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,7 @@ public List<Host> get() {
.forKeyspace(keyspaceName)
.withHostSupplier(HostSupplier)
.withAstyanaxConfiguration(new AstyanaxConfigurationImpl())
.withConnectionPoolConfiguration(new JavaDriverConnectionPoolConfigurationImpl()
.withJavaDriverConfig(jdConfig)
)
.withConnectionPoolConfiguration(new JavaDriverConnectionPoolConfigurationImpl(jdConfig))
.buildKeyspace(CqlFamilyFactory.getInstance());

return context;
Expand Down

0 comments on commit 1da33c0

Please sign in to comment.