Skip to content

Commit

Permalink
cassandra: Add wrapper functions in preparation for following commits
Browse files Browse the repository at this point in the history
No functional changes. Shrinks the following commits.
  • Loading branch information
sirainen authored and cmouse committed Jul 17, 2017
1 parent 55bfcb6 commit a5f2707
Showing 1 changed file with 54 additions and 28 deletions.
82 changes: 54 additions & 28 deletions src/lib-sql/driver-cassandra.c
Expand Up @@ -699,12 +699,37 @@ static void driver_cassandra_result_unlink(struct cassandra_db *db,
i_unreached();
}

static void driver_cassandra_log_result(struct cassandra_result *result,
long long reply_usecs)
{
struct cassandra_db *db = (struct cassandra_db *)result->api.db;
struct timeval now;
const char *str;

if (db->log_level < CASS_LOG_DEBUG && !db->debug_queries &&
reply_usecs/1000 < db->warn_timeout_msecs)
return;

if (gettimeofday(&now, NULL) < 0)
i_fatal("gettimeofday() failed: %m");
str = t_strdup_printf(
"cassandra: Finished query '%s' (%u rows, %lld+%lld us): %s",
result->query, result->row_count, reply_usecs,
timeval_diff_usecs(&now, &result->finish_time),
result->error != NULL ? result->error : "success");

if (reply_usecs/1000 >= db->warn_timeout_msecs) {
db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SLOW]++;
i_warning("%s", str);
} else {
i_debug("%s", str);
}
}

static void driver_cassandra_result_free(struct sql_result *_result)
{
struct cassandra_db *db = (struct cassandra_db *)_result->db;
struct cassandra_result *result = (struct cassandra_result *)_result;
struct timeval now;
const char *str;
long long reply_usecs;

i_assert(!result->api.callback);
Expand All @@ -714,22 +739,7 @@ static void driver_cassandra_result_free(struct sql_result *_result)
db->sync_result = NULL;

reply_usecs = timeval_diff_usecs(&result->finish_time, &result->start_time);
if (db->log_level >= CASS_LOG_DEBUG || db->debug_queries ||
reply_usecs/1000 >= db->warn_timeout_msecs) {
if (gettimeofday(&now, NULL) < 0)
i_fatal("gettimeofday() failed: %m");
str = t_strdup_printf(
"cassandra: Finished query '%s' (%u rows, %lld+%lld us): %s",
result->query, result->row_count, reply_usecs,
timeval_diff_usecs(&now, &result->finish_time),
result->error != NULL ? result->error : "success");

if (reply_usecs/1000 >= db->warn_timeout_msecs) {
db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SLOW]++;
i_warning("%s", str);
} else
i_debug("%s", str);
}
driver_cassandra_log_result(result, reply_usecs);

if (result->result != NULL)
cass_result_free(result->result);
Expand Down Expand Up @@ -879,19 +889,24 @@ static void query_callback(CassFuture *future, void *context)
result_finish(result);
}

static void driver_cassandra_result_send_query(struct cassandra_result *result)
static void driver_cassandra_init_statement(struct cassandra_result *result)
{
struct cassandra_db *db = (struct cassandra_db *)result->api.db;
CassFuture *future;

db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SENT]++;

result->statement = cass_statement_new(result->query, 0);
cass_statement_set_consistency(result->statement, result->consistency);

#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
cass_statement_set_is_idempotent(result->statement, cass_true);
#endif
}

static void driver_cassandra_result_send_query(struct cassandra_result *result)
{
struct cassandra_db *db = (struct cassandra_db *)result->api.db;
CassFuture *future;

db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SENT]++;
driver_cassandra_init_statement(result);

future = cass_session_execute(db->session, result->statement);
driver_cassandra_set_callback(future, db, query_callback, result);
}
Expand Down Expand Up @@ -990,24 +1005,35 @@ static void exec_callback(struct sql_result *_result ATTR_UNUSED,
{
}

static void
driver_cassandra_query_full(struct sql_db *_db, const char *query,
static struct cassandra_result *
driver_cassandra_query_init(struct cassandra_db *db, const char *query,
enum cassandra_query_type query_type,
sql_query_callback_t *callback, void *context)
{
struct cassandra_db *db = (struct cassandra_db *)_db;
struct cassandra_result *result;

result = i_new(struct cassandra_result, 1);
result->api = driver_cassandra_result;
result->api.db = _db;
result->api.db = &db->api;
result->api.refcount = 1;
result->callback = callback;
result->context = context;
result->query_type = query_type;
result->query = i_strdup(query);
array_append(&db->results, &result, 1);
return result;
}

static void
driver_cassandra_query_full(struct sql_db *_db, const char *query,
enum cassandra_query_type query_type,
sql_query_callback_t *callback, void *context)
{
struct cassandra_db *db = (struct cassandra_db *)_db;
struct cassandra_result *result;

result = driver_cassandra_query_init(db, query, query_type,
callback, context);
(void)driver_cassandra_send_query(result);
}

Expand Down

0 comments on commit a5f2707

Please sign in to comment.