Skip to content

Commit

Permalink
dict-sql: Support transaction timestamps with Cassandra driver
Browse files Browse the repository at this point in the history
  • Loading branch information
sirainen authored and GitLab committed Jan 9, 2017
1 parent 345fcea commit 1fbe1b1
Showing 1 changed file with 32 additions and 7 deletions.
39 changes: 32 additions & 7 deletions src/lib-dict/dict-sql.c
Expand Up @@ -31,6 +31,7 @@ struct sql_dict {
const struct dict_sql_settings *set;

bool has_on_duplicate_key:1;
bool has_using_timestamp:1;
};

struct sql_dict_iterate_context {
Expand Down Expand Up @@ -97,6 +98,8 @@ sql_dict_init(struct dict *driver, const char *uri,

/* currently pgsql and sqlite don't support "ON DUPLICATE KEY" */
dict->has_on_duplicate_key = strcmp(driver->name, "mysql") == 0;
/* only Cassandra CQL supports "USING TIMESTAMP" */
dict->has_using_timestamp = strcmp(driver->name, "cassandra") == 0;

dict->db = sql_db_cache_new(dict_sql_db_cache, driver->name,
dict->set->connect);
Expand Down Expand Up @@ -877,6 +880,21 @@ static void sql_dict_transaction_rollback(struct dict_transaction_context *_ctx)
sql_dict_transaction_free(ctx);
}

static void
sql_dict_transaction_add_timestamp(struct sql_dict_transaction_context *ctx,
string_t *query)
{
struct sql_dict *dict = (struct sql_dict *)ctx->ctx.dict;
unsigned long long timestamp_usecs;

if (ctx->ctx.timestamp.tv_sec == 0 || !dict->has_using_timestamp)
return;

timestamp_usecs = ctx->ctx.timestamp.tv_sec * 1000000ULL +
ctx->ctx.timestamp.tv_nsec / 1000;
str_printfa(query, " USING TIMESTAMP %llu", timestamp_usecs);
}

struct dict_sql_build_query_field {
const struct dict_sql_map *map;
const char *value;
Expand All @@ -891,7 +909,8 @@ struct dict_sql_build_query {
bool inc;
};

static int sql_dict_set_query(const struct dict_sql_build_query *build,
static int sql_dict_set_query(struct sql_dict_transaction_context *ctx,
const struct dict_sql_build_query *build,
const char **query_r, const char **error_r)
{
struct sql_dict *dict = build->dict;
Expand All @@ -906,7 +925,9 @@ static int sql_dict_set_query(const struct dict_sql_build_query *build,

prefix = t_str_new(64);
suffix = t_str_new(256);
str_printfa(prefix, "INSERT INTO %s (", fields[0].map->table);
str_printfa(prefix, "INSERT INTO %s", fields[0].map->table);
sql_dict_transaction_add_timestamp(ctx, prefix);
str_append(prefix, " (");
str_append(suffix, ") VALUES (");
for (i = 0; i < field_count; i++) {
if (i > 0) {
Expand Down Expand Up @@ -974,7 +995,8 @@ static int sql_dict_set_query(const struct dict_sql_build_query *build,
}

static int
sql_dict_update_query(const struct dict_sql_build_query *build,
sql_dict_update_query(struct sql_dict_transaction_context *ctx,
const struct dict_sql_build_query *build,
const char **query_r, const char **error_r)
{
struct sql_dict *dict = build->dict;
Expand All @@ -988,7 +1010,9 @@ sql_dict_update_query(const struct dict_sql_build_query *build,
i_assert(field_count > 0);

query = t_str_new(64);
str_printfa(query, "UPDATE %s SET ", fields[0].map->table);
str_printfa(query, "UPDATE %s", fields[0].map->table);
sql_dict_transaction_add_timestamp(ctx, query);
str_append(query, " SET ");
for (i = 0; i < field_count; i++) {
if (i > 0)
str_append_c(query, ',');
Expand Down Expand Up @@ -1041,7 +1065,7 @@ static void sql_dict_set(struct dict_transaction_context *_ctx,
build.extra_values = &values;
build.key1 = key[0];

if (sql_dict_set_query(&build, &query, &error) < 0) {
if (sql_dict_set_query(ctx, &build, &query, &error) < 0) {
ctx->error = i_strdup_printf("dict-sql: Failed to set %s=%s: %s",
key, value, error);
} else {
Expand Down Expand Up @@ -1073,6 +1097,7 @@ static void sql_dict_unset(struct dict_transaction_context *_ctx,
}

str_printfa(query, "DELETE FROM %s", map->table);
sql_dict_transaction_add_timestamp(ctx, query);
if (sql_dict_where_build(dict, map, &values, key[0],
SQL_DICT_RECURSE_NONE, query, &error) < 0) {
ctx->error = i_strdup_printf(
Expand Down Expand Up @@ -1125,7 +1150,7 @@ static void sql_dict_atomic_inc_real(struct sql_dict_transaction_context *ctx,
build.key1 = key[0];
build.inc = TRUE;

if (sql_dict_update_query(&build, &query, &error) < 0) {
if (sql_dict_update_query(ctx, &build, &query, &error) < 0) {
ctx->error = i_strdup_printf(
"dict-sql: Failed to increase %s: %s", key, error);
} else {
Expand Down Expand Up @@ -1228,7 +1253,7 @@ static void sql_dict_atomic_inc(struct dict_transaction_context *_ctx,
field->map = map;
field->value = t_strdup_printf("%lld", diff);

if (sql_dict_update_query(&build, &query, &error) < 0) {
if (sql_dict_update_query(ctx, &build, &query, &error) < 0) {
ctx->error = i_strdup_printf(
"dict-sql: Failed to increase %s: %s", key, error);
} else {
Expand Down

0 comments on commit 1fbe1b1

Please sign in to comment.