Skip to content

Commit

Permalink
driver-cassandra: Add support for speculative execution
Browse files Browse the repository at this point in the history
  • Loading branch information
cmouse authored and sirainen committed Feb 17, 2017
1 parent 583320b commit cc42a09
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
3 changes: 3 additions & 0 deletions configure.ac
Expand Up @@ -2382,6 +2382,9 @@ if test $want_cassandra != no; then
AC_DEFINE(HAVE_CASSANDRA,, [Build with Cassandra support])
found_sql_drivers="$found_sql_drivers cassandra"
AC_CHECK_LIB(cassandra, cass_cluster_set_constant_speculative_execution_policy, [
AC_DEFINE(HAVE_CASSANDRA_SPECULATIVE_POLICY, 1, [Cassandra supports speculative execution policy])
])
], [
if test $want_cassandra = yes; then
AC_ERROR([Can't build with Cassandra support: cassandra.h not found])
Expand Down
20 changes: 20 additions & 0 deletions src/lib-sql/driver-cassandra.c
Expand Up @@ -85,6 +85,7 @@ struct cassandra_db {
unsigned int connect_timeout_secs, request_timeout_secs;
unsigned int warn_timeout_secs;
unsigned int heartbeat_interval_secs, idle_timeout_secs;
unsigned int execution_retry_interval_msecs, execution_retry_times;
in_port_t port;

CassCluster *cluster;
Expand Down Expand Up @@ -494,6 +495,18 @@ static void driver_cassandra_parse_connect_string(struct cassandra_db *db,
} else if (strcmp(key, "metrics") == 0) {
i_free(db->metrics_path);
db->metrics_path = i_strdup(value);
} else if (strcmp(key, "execution_retry_interval") == 0) {
if (settings_get_time_msecs(value, &db->execution_retry_interval_msecs, &error) < 0)
i_fatal("cassandra: Invalid execution_retry_interval '%s': %s", value, error);
#ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
i_fatal("cassandra: This cassandra version does not support execution_retry_interval");
#endif
} else if (strcmp(key, "execution_retry_times") == 0) {
if (str_to_uint(value, &db->execution_retry_times) < 0)
i_fatal("cassandra: Invalid execution_retry_times %s", value);
#ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
i_fatal("cassandra: This cassandra version does not support execution_retry_times");
#endif
} else {
i_fatal("cassandra: Unknown connect string: %s", key);
}
Expand Down Expand Up @@ -618,6 +631,10 @@ static struct sql_db *driver_cassandra_init_v(const char *connect_string)
cass_cluster_set_connection_heartbeat_interval(db->cluster, db->heartbeat_interval_secs);
if (db->idle_timeout_secs != 0)
cass_cluster_set_connection_idle_timeout(db->cluster, db->idle_timeout_secs);
#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
if (db->execution_retry_times > 0 && db->execution_retry_interval_msecs > 0)
cass_cluster_set_constant_speculative_execution_policy(db->cluster, db->execution_retry_interval_msecs, db->execution_retry_times);
#endif
db->session = cass_session_new();
if (db->metrics_path != NULL)
db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db);
Expand Down Expand Up @@ -848,6 +865,9 @@ static void driver_cassandra_result_send_query(struct cassandra_result *result)
result->statement = cass_statement_new(result->query, 0);
cass_statement_set_consistency(result->statement, result->consistency);

#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
cass_statement_set_is_idempotent(result->statement, cass_true);
#endif
future = cass_session_execute(db->session, result->statement);
driver_cassandra_set_callback(future, db, query_callback, result);
}
Expand Down

0 comments on commit cc42a09

Please sign in to comment.