diff --git a/src/lib-dict/dict-sql.c b/src/lib-dict/dict-sql.c index 9dcf293687..4cca6b9538 100644 --- a/src/lib-dict/dict-sql.c +++ b/src/lib-dict/dict-sql.c @@ -31,6 +31,7 @@ struct sql_dict { const struct dict_sql_settings *set; bool has_on_duplicate_key:1; + bool has_using_timestamp:1; }; struct sql_dict_iterate_context { @@ -97,6 +98,8 @@ sql_dict_init(struct dict *driver, const char *uri, /* currently pgsql and sqlite don't support "ON DUPLICATE KEY" */ dict->has_on_duplicate_key = strcmp(driver->name, "mysql") == 0; + /* only Cassandra CQL supports "USING TIMESTAMP" */ + dict->has_using_timestamp = strcmp(driver->name, "cassandra") == 0; dict->db = sql_db_cache_new(dict_sql_db_cache, driver->name, dict->set->connect); @@ -877,6 +880,21 @@ static void sql_dict_transaction_rollback(struct dict_transaction_context *_ctx) sql_dict_transaction_free(ctx); } +static void +sql_dict_transaction_add_timestamp(struct sql_dict_transaction_context *ctx, + string_t *query) +{ + struct sql_dict *dict = (struct sql_dict *)ctx->ctx.dict; + unsigned long long timestamp_usecs; + + if (ctx->ctx.timestamp.tv_sec == 0 || !dict->has_using_timestamp) + return; + + timestamp_usecs = ctx->ctx.timestamp.tv_sec * 1000000ULL + + ctx->ctx.timestamp.tv_nsec / 1000; + str_printfa(query, " USING TIMESTAMP %llu", timestamp_usecs); +} + struct dict_sql_build_query_field { const struct dict_sql_map *map; const char *value; @@ -891,7 +909,8 @@ struct dict_sql_build_query { bool inc; }; -static int sql_dict_set_query(const struct dict_sql_build_query *build, +static int sql_dict_set_query(struct sql_dict_transaction_context *ctx, + const struct dict_sql_build_query *build, const char **query_r, const char **error_r) { struct sql_dict *dict = build->dict; @@ -906,7 +925,9 @@ static int sql_dict_set_query(const struct dict_sql_build_query *build, prefix = t_str_new(64); suffix = t_str_new(256); - str_printfa(prefix, "INSERT INTO %s (", fields[0].map->table); + str_printfa(prefix, "INSERT INTO %s", fields[0].map->table); + sql_dict_transaction_add_timestamp(ctx, prefix); + str_append(prefix, " ("); str_append(suffix, ") VALUES ("); for (i = 0; i < field_count; i++) { if (i > 0) { @@ -974,7 +995,8 @@ static int sql_dict_set_query(const struct dict_sql_build_query *build, } static int -sql_dict_update_query(const struct dict_sql_build_query *build, +sql_dict_update_query(struct sql_dict_transaction_context *ctx, + const struct dict_sql_build_query *build, const char **query_r, const char **error_r) { struct sql_dict *dict = build->dict; @@ -988,7 +1010,9 @@ sql_dict_update_query(const struct dict_sql_build_query *build, i_assert(field_count > 0); query = t_str_new(64); - str_printfa(query, "UPDATE %s SET ", fields[0].map->table); + str_printfa(query, "UPDATE %s", fields[0].map->table); + sql_dict_transaction_add_timestamp(ctx, query); + str_append(query, " SET "); for (i = 0; i < field_count; i++) { if (i > 0) str_append_c(query, ','); @@ -1041,7 +1065,7 @@ static void sql_dict_set(struct dict_transaction_context *_ctx, build.extra_values = &values; build.key1 = key[0]; - if (sql_dict_set_query(&build, &query, &error) < 0) { + if (sql_dict_set_query(ctx, &build, &query, &error) < 0) { ctx->error = i_strdup_printf("dict-sql: Failed to set %s=%s: %s", key, value, error); } else { @@ -1073,6 +1097,7 @@ static void sql_dict_unset(struct dict_transaction_context *_ctx, } str_printfa(query, "DELETE FROM %s", map->table); + sql_dict_transaction_add_timestamp(ctx, query); if (sql_dict_where_build(dict, map, &values, key[0], SQL_DICT_RECURSE_NONE, query, &error) < 0) { ctx->error = i_strdup_printf( @@ -1125,7 +1150,7 @@ static void sql_dict_atomic_inc_real(struct sql_dict_transaction_context *ctx, build.key1 = key[0]; build.inc = TRUE; - if (sql_dict_update_query(&build, &query, &error) < 0) { + if (sql_dict_update_query(ctx, &build, &query, &error) < 0) { ctx->error = i_strdup_printf( "dict-sql: Failed to increase %s: %s", key, error); } else { @@ -1228,7 +1253,7 @@ static void sql_dict_atomic_inc(struct dict_transaction_context *_ctx, field->map = map; field->value = t_strdup_printf("%lld", diff); - if (sql_dict_update_query(&build, &query, &error) < 0) { + if (sql_dict_update_query(ctx, &build, &query, &error) < 0) { ctx->error = i_strdup_printf( "dict-sql: Failed to increase %s: %s", key, error); } else {