Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aggregated slowlog should maintain original timestamps #1199

Merged
merged 1 commit into from
Jul 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/commands/cmd_query.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ void Graph_Query(void *args) {

// Log query to slowlog.
SlowLog *slowlog = GraphContext_GetSlowLog(gc);
SlowLog_Add(slowlog, command_ctx->command_name, command_ctx->query, QueryCtx_GetExecutionTime());
SlowLog_Add(slowlog, command_ctx->command_name, command_ctx->query,
QueryCtx_GetExecutionTime(), NULL);
ExecutionPlan_Free(plan);
ResultSet_Free(result_set);
AST_Free(ast);
Expand Down
28 changes: 21 additions & 7 deletions src/slow_log/slow_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,18 @@ static inline void _ReplyWithRoundedDouble(RedisModuleCtx *ctx, double d) {
RedisModule_ReplyWithStringBuffer(ctx, str, len);
}

static SlowLogItem *_SlowLogItem_New(const char *cmd, const char *query, double latency) {
static SlowLogItem *_SlowLogItem_New
(
const char *cmd,
const char *query,
double latency,
time_t t
) {
SlowLogItem *item = rm_malloc(sizeof(SlowLogItem));
item->time = t;
item->latency = latency;
item->cmd = rm_strdup(cmd);
item->query = rm_strdup(query);
item->latency = latency;
time(&(item->time));
return item;
}

Expand Down Expand Up @@ -109,16 +115,20 @@ SlowLog *SlowLog_New() {
}

void SlowLog_Add(SlowLog *slowlog, const char *cmd, const char *query,
double latency) {
double latency, time_t *t) {
assert(slowlog && cmd && query && latency >= 0);

char *key;
time_t _time;
SlowLogItem *existing_item;
int t_id = get_thread_id();
rax *lookup = slowlog->lookup[t_id];
heap_t *heap = slowlog->min_heap[t_id];
pthread_mutex_t *lock = slowlog->locks + t_id;

// initialise time
(t) ? _time = *t: time(&_time);

if(pthread_mutex_lock(lock) != 0) {
// Failed to lock, skip logging.
return;
Expand All @@ -131,7 +141,10 @@ void SlowLog_Add(SlowLog *slowlog, const char *cmd, const char *query,

if(exists) {
// A similar item already exists, see if we need to update its latency.
if(existing_item->latency < latency) existing_item->latency = latency;
if(existing_item->latency < latency) {
existing_item->time = _time;
existing_item->latency = latency;
}
goto cleanup;
}

Expand All @@ -152,7 +165,7 @@ void SlowLog_Add(SlowLog *slowlog, const char *cmd, const char *query,
}

if(introduce_item) {
SlowLogItem *item = _SlowLogItem_New(cmd, query, latency);
SlowLogItem *item = _SlowLogItem_New(cmd, query, latency, _time);
heap_offer(slowlog->min_heap + t_id, item);
raxInsert(lookup, (unsigned char *)key, key_len, item, NULL);
}
Expand All @@ -177,7 +190,8 @@ void SlowLog_Replay(const SlowLog *slowlog, RedisModuleCtx *ctx) {
raxSeek(&iter, "^", NULL, 0);
while(raxNext(&iter)) {
SlowLogItem *item = iter.data;
SlowLog_Add(aggregated_slowlog, item->cmd, item->query, item->latency);
SlowLog_Add(aggregated_slowlog, item->cmd, item->query,
item->latency, &item->time);
}
raxStop(&iter);
// End of critical section.
Expand Down
21 changes: 18 additions & 3 deletions src/slow_log/slow_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,25 @@ typedef struct {
SlowLog *SlowLog_New();

// Introduce item to slow log.
void SlowLog_Add(SlowLog *slowlog, const char *cmd, const char *query, double latency);
void SlowLog_Add
(
SlowLog *slowlog, // slowlog to add entry to
const char *cmd, // command being logged
const char *query, // query being logged
double latency, // command latency
time_t *time // optional time command was issued
);

// Replies with slow log content.
void SlowLog_Replay(const SlowLog *slowlog, RedisModuleCtx *ctx);
void SlowLog_Replay
(
const SlowLog *slowlog,
RedisModuleCtx *ctx
);

// Free slowlog.
void SlowLog_Free(SlowLog *slowlog);
void SlowLog_Free
(
SlowLog *slowlog
);