Skip to content

Commit

Permalink
- Add analytics package
Browse files Browse the repository at this point in the history
 - Move CacheableDataProvider#buildKey tp superclass
 - Add load method by range of token to dataprovider
 - Remove consistency level from Table and add read and write consistency level fields
  • Loading branch information
alimovalisher committed Nov 13, 2015
1 parent 8827fbf commit 0b415cf
Show file tree
Hide file tree
Showing 34 changed files with 1,433 additions and 661 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Expand Up @@ -6,7 +6,7 @@ apply plugin: 'signing'
if (hasProperty("teamcity")) {
project.ext.buildVersion = teamcity["build.number"]
} else {
project.ext.buildVersion = '0.1.8'
project.ext.buildVersion = '0.2.0'
}

version = project.ext.buildVersion
Expand Down
173 changes: 99 additions & 74 deletions src/main/java/com/fnklabs/draenei/CassandraClient.java
Expand Up @@ -5,30 +5,29 @@
import com.datastax.driver.core.policies.*;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;

public class CassandraClient {
public static final int RECONNECTION_DELAY_TIME = 5000;
private static final int RECONNECTION_DELAY_TIME = 5000;
/**
* Read timeout in ms
*/
private static final int READ_TIMEOUT = 10000;
private static final int READ_TIMEOUT = 15000;
/**
* Read timeout in ms
* Connection timeout in ms
*/
private static final int CONNECT_TIMEOUT_MILLIS = 30000;

private static final Logger LOGGER = LoggerFactory.getLogger(CassandraClient.class);
/**
* Cassandra cluster instance
*/
private final Cluster cluster;

/**
* Prepared statements map that allow solve problem with several prepared statements execution is same query
Expand Down Expand Up @@ -90,28 +89,34 @@ public CassandraClient(@NotNull String username,
}

try {
cluster = builder.build();
} catch (IllegalArgumentException e) {
LOGGER.warn("Cant build cluster", e);
throw e;
}
Cluster cluster = builder.build();

Metadata metadata = cluster.getMetadata();
Metadata metadata = cluster.getMetadata();

LOGGER.info(String.format("Connect to cluster: %s", metadata.getClusterName()));
LOGGER.info(String.format("Connecting to cluster: %s", metadata.getClusterName()));

for (Host host : metadata.getAllHosts()) {
LOGGER.info(String.format("DataCenter: %s; Host: %s; Rack: %s", host.getDatacenter(), host.getAddress(), host.getRack()));
}
for (Host host : metadata.getAllHosts()) {
LOGGER.info(String.format("DataCenter: %s; Host: %s; Rack: %s", host.getDatacenter(), host.getAddress(), host.getRack()));
}

cluster.init();

cluster.init();
session = createSession(cluster, keyspace);
} catch (IllegalArgumentException e) {
LOGGER.warn("Cant build cluster", e);
throw e;
}
}

session = createSession(cluster, keyspace);
public CassandraClient(@NotNull Session session, @NotNull ListeningExecutorService executorService, @NotNull MetricsFactory metricsFactory) {
this.session = session;
this.executorService = executorService;
this.metricsFactory = metricsFactory;
}

@NotNull
public KeyspaceMetadata getKeyspaceMetadata(@NotNull String keyspace) {
return cluster.getMetadata().getKeyspace(keyspace);
return getSession().getCluster().getMetadata().getKeyspace(keyspace);
}

@NotNull
Expand All @@ -124,9 +129,8 @@ public TableMetadata getTableMetadata(@NotNull String keyspace, @NotNull String
return getKeyspaceMetadata(keyspace).getTable(tablename);
}


public void dumpMetrics() {
Metrics metrics = cluster.getMetrics();
Metrics metrics = getSession().getCluster().getMetrics();

Metrics.Errors errorMetrics = metrics.getErrorMetrics();

Expand Down Expand Up @@ -166,58 +170,57 @@ public PreparedStatement prepare(@NotNull String query) {
return preparedStatementsMap.compute(query, new ComputePreparedStatement());
}

public ResultSet execute(String query) {
public ResultSet execute(@NotNull String query) {
getMetricsFactory().getCounter(MetricsType.CASSANDRA_QUERIES_COUNT).inc();
Timer.Context time = getMetricsFactory().getTimer(MetricsType.CASSANDRA_EXECUTE).time();

ResultSet resultSet = getSession().execute(query);
time.stop();

return getSession().execute(query);
return resultSet;
}

public ResultSetFuture executeAsync(String query) throws IllegalStateException {
public ResultSet execute(@NotNull Statement statement) {
Timer.Context time = getMetricsFactory().getTimer(MetricsType.CASSANDRA_EXECUTE).time();

getMetricsFactory().getCounter(MetricsType.CASSANDRA_QUERIES_COUNT).inc();

ResultSet resultSetFuture = getSession().execute(statement);

time.stop();

return resultSetFuture;
}

public ResultSetFuture executeAsync(@NotNull String query) throws IllegalStateException {
Timer.Context time = getMetricsFactory().getTimer(MetricsType.CASSANDRA_EXECUTE).time();

getMetricsFactory().getCounter(MetricsType.CASSANDRA_QUERIES_COUNT).inc();
getMetricsFactory().getCounter(MetricsType.CASSANDRA_PROCESSING_QUERIES).inc();

ResultSetFuture resultSetFuture = getSession().executeAsync(query);
Futures.addCallback(resultSetFuture, new StatementExecutionCallback(query), executorService);

Futures.addCallback(resultSetFuture, new StatementExecutionCallback(query));

monitorFuture(time, resultSetFuture);

return resultSetFuture;
}

public ResultSetFuture executeAsync(BoundStatement boundStatement) throws IllegalStateException {
public ResultSetFuture executeAsync(@NotNull BoundStatement boundStatement) throws IllegalStateException {
Timer.Context time = getMetricsFactory().getTimer(MetricsType.CASSANDRA_EXECUTE).time();

getMetricsFactory().getCounter(MetricsType.CASSANDRA_PROCESSING_QUERIES).inc();
getMetricsFactory().getCounter(MetricsType.CASSANDRA_QUERIES_COUNT).inc();

ResultSetFuture resultSetFuture = getSession().executeAsync(boundStatement);

Futures.addCallback(resultSetFuture, new StatementExecutionCallback(boundStatement.preparedStatement().getQueryString()), executorService);
Futures.addCallback(resultSetFuture, new StatementExecutionCallback(boundStatement.preparedStatement().getQueryString()));
monitorFuture(time, resultSetFuture);

return resultSetFuture;
}

/**
* Initiate close cluster and session operations
*/
public void close() {
session.close();
cluster.close();
}

/**
* Get Cluster session
*
* @return Session instance
*/
@NotNull
public Session getSession() {
return session;
}

/**
* Execute statement
*
Expand All @@ -226,39 +229,42 @@ public Session getSession() {
* @return ResultSetFuture
*/
@NotNull
public ResultSetFuture executeAsync(Statement statement) {
public ResultSetFuture executeAsync(@NotNull Statement statement) {
Timer.Context time = getMetricsFactory().getTimer(MetricsType.CASSANDRA_EXECUTE).time();

getMetricsFactory().getCounter(MetricsType.CASSANDRA_QUERIES_COUNT).inc();

ResultSetFuture resultSetFuture = getSession().executeAsync(statement);

Futures.addCallback(resultSetFuture, new StatementExecutionCallback(statement.getKeyspace()), executorService);
Futures.addCallback(resultSetFuture, new StatementExecutionCallback(statement.getKeyspace()));
monitorFuture(time, resultSetFuture);

return resultSetFuture;
}

/**
* Create session
*
* @param cluster Cluster instance
* @param keyspace Default keyspace
* Initiate close cluster and session operations
*/
public void close() {
session.close();
session.getCluster().close();
}

/**
* Get Cluster session
*
* @return Session instance
*/
private Session createSession(@NotNull Cluster cluster, @NotNull String keyspace) {
Session session = cluster.connect(keyspace);
session.init();

@NotNull
public Session getSession() {
return session;
}

private MetricsFactory getMetricsFactory() {
return metricsFactory;
public Set<Host> getMembers() {
return getSession().getCluster().getMetadata().getAllHosts();
}

private <T> void monitorFuture(Timer.Context timer, com.google.common.util.concurrent.ListenableFuture<T> future) {
protected <T> void monitorFuture(@NotNull Timer.Context timer, @NotNull ListenableFuture<T> future) {
Futures.addCallback(future, new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
Expand All @@ -270,7 +276,28 @@ public void onFailure(Throwable t) {
timer.stop();
LOGGER.warn("Cant complete operation", t);
}
}, executorService);
});
}

@NotNull
protected SocketOptions getSocketOptions() {
SocketOptions socketOptions = new SocketOptions();
socketOptions.setConnectTimeoutMillis(CONNECT_TIMEOUT_MILLIS);
socketOptions.setReadTimeoutMillis(READ_TIMEOUT);
socketOptions.setKeepAlive(true);
socketOptions.setTcpNoDelay(true);
return socketOptions;
}

@NotNull
protected QueryOptions getQueryOptions() {
QueryOptions queryOptions = new QueryOptions();
queryOptions.setConsistencyLevel(ConsistencyLevel.QUORUM);
return queryOptions;
}

private MetricsFactory getMetricsFactory() {
return metricsFactory;
}

private enum MetricsType implements MetricsFactory.Type {
Expand All @@ -281,21 +308,19 @@ private enum MetricsType implements MetricsFactory.Type {
CASSANDRA_PROCESSING_QUERIES,
}

@NotNull
private static QueryOptions getQueryOptions() {
QueryOptions queryOptions = new QueryOptions();
queryOptions.setConsistencyLevel(ConsistencyLevel.QUORUM);
return queryOptions;
}
/**
* Create session
*
* @param cluster Cluster instance
* @param keyspace Default keyspace
*
* @return Session instance
*/
private static Session createSession(@NotNull Cluster cluster, @NotNull String keyspace) {
Session session = cluster.connect(keyspace);
session.init();

@NotNull
private static SocketOptions getSocketOptions() {
SocketOptions socketOptions = new SocketOptions();
socketOptions.setConnectTimeoutMillis(CONNECT_TIMEOUT_MILLIS);
socketOptions.setReadTimeoutMillis(READ_TIMEOUT);
socketOptions.setKeepAlive(true);
socketOptions.setTcpNoDelay(true);
return socketOptions;
return session;
}

@NotNull
Expand Down
4 changes: 0 additions & 4 deletions src/main/java/com/fnklabs/draenei/TimeOutException.java

This file was deleted.

2 changes: 1 addition & 1 deletion src/main/java/com/fnklabs/draenei/orm/Cacheable.java
Expand Up @@ -6,7 +6,7 @@
import java.io.Serializable;

/**
* Entity Cacheable interface
* Cacheable entity interface
*/
public interface Cacheable extends Serializable {
/**
Expand Down

0 comments on commit 0b415cf

Please sign in to comment.