Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions external/storm-cassandra/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ The following properties may be passed to storm configuration.
| **cassandra.retryPolicy** | - | DefaultRetryPolicy |
| **cassandra.reconnectionPolicy.baseDelayMs** | - | 100 (ms) |
| **cassandra.reconnectionPolicy.maxDelayMs** | - | 60000 (ms) |
| **cassandra.pool.max.size** | - | 256 |
| **cassandra.loadBalancingPolicy** | - | TokenAwarePolicy |
| **cassandra.datacenter.name** | - | - |
| **cassandra.max.requests.per.con.local** | - | 1024 |
| **cassandra.max.requests.per.con.remote** | - | 256 |
| **cassandra.heartbeat.interval.sec** | - | 30 |
| **cassandra.idle.timeout.sec** | - | 60 |
| **cassandra.socket.read.timeout.millis** | - | 12000 |
| **cassandra.socket.connect.timeout.millis** | - | 5000 |

## CassandraWriterBolt

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void prepare(Map stormConfig, TopologyContext topologyContext, OutputColl
session = client.connect();
} catch (NoHostAvailableException e) {
outputCollector.reportError(e);
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@
*/
package org.apache.storm.cassandra.client;

import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy.Builder;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.LoggingRetryPolicy;
import com.datastax.driver.core.policies.RoundRobinPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import org.apache.commons.lang3.StringUtils;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.ObjectReader;
import com.datastax.driver.core.ConsistencyLevel;
Expand Down Expand Up @@ -46,6 +54,15 @@ public class CassandraConf implements Serializable {
public static final String CASSANDRA_RETRY_POLICY = "cassandra.retryPolicy";
public static final String CASSANDRA_RECONNECT_POLICY_BASE_MS = "cassandra.reconnectionPolicy.baseDelayMs";
public static final String CASSANDRA_RECONNECT_POLICY_MAX_MS = "cassandra.reconnectionPolicy.maxDelayMs";
public static final String CASSANDRA_POOL_MAX_SIZE = "cassandra.pool.max.size";
public static final String CASSANDRA_LOAD_BALANCING_POLICY = "cassandra.loadBalancingPolicy";
public static final String CASSANDRA_DATACENTER_NAME = "cassandra.datacenter.name";
public static final String CASSANDRA_MAX_REQUESTS_PER_CON_LOCAL = "cassandra.max.requests.per.con.local";
public static final String CASSANDRA_MAX_REQUESTS_PER_CON_REMOTE = "cassandra.max.requests.per.con.remote";
public static final String CASSANDRA_HEARTBEAT_INTERVAL_SEC = "cassandra.heartbeat.interval.sec";
public static final String CASSANDRA_IDLE_TIMEOUT_SEC = "cassandra.idle.timeout.sec";
public static final String CASSANDRA_SOCKET_READ_TIMEOUT_MS = "cassandra.socket.read.timeout.millis";
public static final String CASSANDRA_SOCKET_CONNECT_TIMEOUT_MS = "cassandra.socket.connect.timeout.millis";

/**
* The authorized cassandra username.
Expand Down Expand Up @@ -92,7 +109,34 @@ public class CassandraConf implements Serializable {
* The maximum delay to wait between two attempts.
*/
private long reconnectionPolicyMaxMs;


/**
* The maximum queue for connection pool.
*/
private int poolMaxQueueSize;

private String loadBalancingPolicyName;

private String datacenterName;

private int maxRequestPerConnectionLocal;

private int maxRequestPerConnectionRemote;

private int heartbeatIntervalSeconds;

private int idleTimeoutSeconds;

/**
* The timeout for read for socket options.
*/
private long socketReadTimeoutMillis;

/**
* The timeout for connect for socket options.
*/
private long socketConnectTimeoutMillis;

/**
* Creates a new {@link CassandraConf} instance.
*/
Expand All @@ -117,6 +161,15 @@ public CassandraConf(Map<String, Object> conf) {
this.retryPolicyName = (String) Utils.get(conf, CASSANDRA_RETRY_POLICY, DefaultRetryPolicy.class.getSimpleName());
this.reconnectionPolicyBaseMs = getLong(conf.get(CASSANDRA_RECONNECT_POLICY_BASE_MS), 100L);
this.reconnectionPolicyMaxMs = getLong(conf.get(CASSANDRA_RECONNECT_POLICY_MAX_MS), TimeUnit.MINUTES.toMillis(1));
this.poolMaxQueueSize = getInt(conf.get(CASSANDRA_POOL_MAX_SIZE), 256);
this.loadBalancingPolicyName = (String) Utils.get(conf, CASSANDRA_LOAD_BALANCING_POLICY, TokenAwarePolicy.class.getSimpleName());
this.datacenterName = (String)Utils.get(conf, CASSANDRA_DATACENTER_NAME, null);
this.maxRequestPerConnectionLocal = getInt(conf.get(CASSANDRA_MAX_REQUESTS_PER_CON_LOCAL), 1024);
this.maxRequestPerConnectionRemote = getInt(conf.get(CASSANDRA_MAX_REQUESTS_PER_CON_REMOTE), 256);
this.heartbeatIntervalSeconds = getInt(conf.get(CASSANDRA_HEARTBEAT_INTERVAL_SEC), 30);
this.idleTimeoutSeconds = getInt(conf.get(CASSANDRA_IDLE_TIMEOUT_SEC), 60);
this.socketReadTimeoutMillis = getLong(conf.get(CASSANDRA_SOCKET_READ_TIMEOUT_MS), (long)SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS);
this.socketConnectTimeoutMillis = getLong(conf.get(CASSANDRA_SOCKET_CONNECT_TIMEOUT_MS), (long)SocketOptions.DEFAULT_CONNECT_TIMEOUT_MILLIS);
}

public String getUsername() {
Expand Down Expand Up @@ -157,14 +210,60 @@ public long getReconnectionPolicyMaxMs() {

public RetryPolicy getRetryPolicy() {
if(this.retryPolicyName.equals(DowngradingConsistencyRetryPolicy.class.getSimpleName()))
return DowngradingConsistencyRetryPolicy.INSTANCE;
return new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE);
if(this.retryPolicyName.equals(FallthroughRetryPolicy.class.getSimpleName()))
return FallthroughRetryPolicy.INSTANCE;
if(this.retryPolicyName.equals(DefaultRetryPolicy.class.getSimpleName()))
return DefaultRetryPolicy.INSTANCE;
throw new IllegalArgumentException("Unknown cassandra retry policy " + this.retryPolicyName);
}

public LoadBalancingPolicy getLoadBalancingPolicy() {
if (this.loadBalancingPolicyName.equals(TokenAwarePolicy.class.getSimpleName())) {
return new TokenAwarePolicy(new RoundRobinPolicy());
}
if (this.loadBalancingPolicyName.equals(DCAwareRoundRobinPolicy.class.getSimpleName())) {
Builder builder = DCAwareRoundRobinPolicy.builder();
if (StringUtils.isNotBlank(datacenterName)) {
builder = builder.withLocalDc(this.datacenterName);
}
return new TokenAwarePolicy(builder.build());
}
throw new IllegalArgumentException("Unknown cassandra load balancing policy " + this.loadBalancingPolicyName);
}

public int getPoolMaxQueueSize() {
return poolMaxQueueSize;
}

public String getDatacenterName() {
return datacenterName;
}

public int getMaxRequestPerConnectionLocal() {
return maxRequestPerConnectionLocal;
}

public int getMaxRequestPerConnectionRemote() {
return maxRequestPerConnectionRemote;
}

public int getHeartbeatIntervalSeconds() {
return heartbeatIntervalSeconds;
}

public int getIdleTimeoutSeconds() {
return idleTimeoutSeconds;
}

public long getSocketReadTimeoutMillis() {
return socketReadTimeoutMillis;
}

public long getSocketConnectTimeoutMillis() {
return socketConnectTimeoutMillis;
}

private <T> T get(Map<String, Object> conf, String key) {
Object o = conf.get(key);
if(o == null) {
Expand All @@ -173,6 +272,18 @@ private <T> T get(Map<String, Object> conf, String key) {
return (T)o;
}

public static Integer getInt(Object o, Integer defaultValue) {
if (null == o) {
return defaultValue;
}
if (o instanceof Number) {
return ((Number) o).intValue();
} else if (o instanceof String) {
return Integer.parseInt((String) o);
}
throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
}

public static Long getLong(Object o, Long defaultValue) {
if (null == o) {
return defaultValue;
Expand All @@ -198,6 +309,13 @@ public String toString() {
.add("retryPolicyName", retryPolicyName)
.add("reconnectionPolicyBaseMs", reconnectionPolicyBaseMs)
.add("reconnectionPolicyMaxMs", reconnectionPolicyMaxMs)
.add("poolMaxQueueSize", poolMaxQueueSize)
.add("datacenterName", datacenterName)
.add("maxRequestPerConnectionLocal", maxRequestPerConnectionLocal)
.add("maxRequestPerConnectionRemote", maxRequestPerConnectionRemote)
.add("heartbeatIntervalSeconds", heartbeatIntervalSeconds)
.add("idleTimeoutSeconds", idleTimeoutSeconds)
.add("socketReadTimeoutMillis", socketReadTimeoutMillis)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@
package org.apache.storm.cassandra.client;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.PlainTextAuthProvider;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
import com.datastax.driver.core.policies.RoundRobinPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.storm.cassandra.context.BaseBeanFactory;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Default interface to build cassandra Cluster from the a Storm Topology configuration.
*/
Expand All @@ -54,7 +51,9 @@ protected Cluster make(Map<String, Object> stormConf) {
.withReconnectionPolicy(new ExponentialReconnectionPolicy(
cassandraConf.getReconnectionPolicyBaseMs(),
cassandraConf.getReconnectionPolicyMaxMs()))
.withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy()));
.withLoadBalancingPolicy(cassandraConf.getLoadBalancingPolicy());
cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis((int)cassandraConf.getSocketReadTimeoutMillis());
cluster.getConfiguration().getSocketOptions().setConnectTimeoutMillis((int)cassandraConf.getSocketConnectTimeoutMillis());

final String username = cassandraConf.getUsername();
final String password = cassandraConf.getPassword();
Expand All @@ -67,6 +66,13 @@ protected Cluster make(Map<String, Object> stormConf) {
.setConsistencyLevel(cassandraConf.getConsistencyLevel());
cluster.withQueryOptions(options);

PoolingOptions poolOps = new PoolingOptions();
poolOps.setMaxQueueSize(cassandraConf.getPoolMaxQueueSize());
poolOps.setHeartbeatIntervalSeconds(cassandraConf.getHeartbeatIntervalSeconds());
poolOps.setIdleTimeoutSeconds(cassandraConf.getIdleTimeoutSeconds());
poolOps.setMaxRequestsPerConnection(HostDistance.LOCAL, cassandraConf.getMaxRequestPerConnectionLocal());
poolOps.setMaxRequestsPerConnection(HostDistance.REMOTE, cassandraConf.getMaxRequestPerConnectionRemote());
cluster.withPoolingOptions(poolOps);

return cluster.build();
}
Expand Down