Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions src/DAG/dag.c
Original file line number Diff line number Diff line change
Expand Up @@ -884,20 +884,3 @@ void DAG_ReplyAndUnblock(RedisAI_OnFinishCtx *ctx, void *private_data) {
if (rinfo->client)
RedisModule_UnblockClient(rinfo->client, rinfo);
}

void Dag_PopulateOp(RAI_DagOp *currentOp, void *rctx, RedisModuleString **inkeys,
RedisModuleString **outkeys, RedisModuleString *runkey) {

if (currentOp->commandType == REDISAI_DAG_CMD_MODELRUN) {
currentOp->mctx = (RAI_ModelRunCtx *)rctx;
currentOp->devicestr = currentOp->mctx->model->devicestr;
} else {
assert(currentOp->commandType == REDISAI_DAG_CMD_SCRIPTRUN);
currentOp->sctx = (RAI_ScriptRunCtx *)rctx;
currentOp->devicestr = currentOp->sctx->script->devicestr;
}

currentOp->inkeys = inkeys;
currentOp->outkeys = outkeys;
currentOp->runkey = runkey;
}
13 changes: 0 additions & 13 deletions src/DAG/dag.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,4 @@ void RunInfo_FreeData(RedisModuleCtx *ctx, void *rinfo);
*/
void RedisAI_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);

/**
* @brief Populate a DAG modelrun/scriptrun op with its params .
* @param rinfo An existing DAG to populate.
* @param rctx ModelRunCtx or ScriptRunCtx that represents the single MODELRUN op.
* @param inkeys The DAG operation inkeys (the input tensors).
* @param outkeys The DAG operation outkeys (the output tensors).
* @param runkey The model key.
* @param cmd The DAG command (modelrun/scriptrun).
*/

void Dag_PopulateOp(RAI_DagOp *currentOp, void *rctx, RedisModuleString **inkeys,
RedisModuleString **outkeys, RedisModuleString *runkey);

#endif /* SRC_DAG_H_ */
4 changes: 3 additions & 1 deletion src/DAG/dag_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ int DAG_CommandParser(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, b
return REDISMODULE_ERR;
}
currentOp->devicestr = mto->devicestr;
RAI_HoldString(NULL, argv[arg_pos + 1]);
currentOp->runkey = argv[arg_pos + 1];
currentOp->mctx = RAI_ModelRunCtxCreate(mto);
}
Expand All @@ -249,6 +250,7 @@ int DAG_CommandParser(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, b
}
currentOp->devicestr = sto->devicestr;
const char *functionName = RedisModule_StringPtrLen(argv[arg_pos + 2], NULL);
RAI_HoldString(NULL, argv[arg_pos + 1]);
currentOp->runkey = argv[arg_pos + 1];
currentOp->sctx = RAI_ScriptRunCtxCreate(sto, functionName);
}
Expand Down Expand Up @@ -395,8 +397,8 @@ int DAG_CommandParser(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, b
sprintf(buf, "%04d", *instance);
RedisModuleString *mangled_key = RedisModule_CreateStringFromString(NULL, key);
RedisModule_StringAppendBuffer(NULL, mangled_key, buf, strlen(buf));

AI_dictAdd(mangled_persisted, (void *)mangled_key, (void *)1);
RedisModule_FreeString(NULL, mangled_key);
entry = AI_dictNext(iter);
}
AI_dictReleaseIterator(iter);
Expand Down
1 change: 1 addition & 0 deletions src/backends/tensorflow.c
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ RAI_Model *RAI_ModelCreateTF(RAI_Backend backend, const char *devicestr, RAI_Mod
char *msg = RedisModule_Calloc(60 + len, sizeof(*msg));
sprintf(msg, "ERR Input node named \"%s\" not found in TF graph.", inputs[i]);
RAI_SetError(error, RAI_EMODELIMPORT, msg);
RedisModule_Free(msg);
return NULL;
}
}
Expand Down
56 changes: 24 additions & 32 deletions src/background_workers.c
Original file line number Diff line number Diff line change
Expand Up @@ -391,14 +391,11 @@ void *RedisAI_Run_ThreadMain(void *arg) {

// Run is over, now iterate over the run info structs in the batch
// and see if any error was generated
bool first_dag_error = false;
for (long long i = 0; i < array_len(batch_rinfo); i++) {
RedisAI_RunInfo *rinfo = batch_rinfo[i];
// We record that there was an error for later on
run_error = __atomic_load_n(rinfo->dagError, __ATOMIC_RELAXED);
if (i == 0 && run_error == 1) {
first_dag_error = true;
}

// If there was an error and the reference count for the dag
// has gone to zero and the client is still around, we unblock
if (run_error) {
Expand All @@ -413,37 +410,35 @@ void *RedisAI_Run_ThreadMain(void *arg) {
__atomic_add_fetch(rinfo->dagCompleteOpCount, 1, __ATOMIC_RELAXED);
}
}
if (first_dag_error) {
run_queue_len = queueLength(run_queue_info->run_queue);
continue;
}
}

// We initialize variables where we'll store the fact hat, after the current
// run, all ops for the device or all ops in the dag could be complete. This
// way we can avoid placing the op back on the queue if there's nothing left
// to do.
RedisModule_Assert(run_error == 0);
int device_complete_after_run = RedisAI_DagDeviceComplete(batch_rinfo[0]);
int dag_complete_after_run = RedisAI_DagComplete(batch_rinfo[0]);

long long dagRefCount = -1;
RedisAI_RunInfo *orig;
if (device_complete == 1 || device_complete_after_run == 1) {
RedisAI_RunInfo *evicted_rinfo = (RedisAI_RunInfo *)(evicted_items[0]->value);
orig = evicted_rinfo->orig_copy;
// We decrease and get the reference count for the DAG.
dagRefCount = RAI_DagRunInfoFreeShallowCopy(evicted_rinfo);
}
int device_complete_after_run;
if (run_error == 0) {
device_complete_after_run = RedisAI_DagDeviceComplete(batch_rinfo[0]);
int dag_complete_after_run = RedisAI_DagComplete(batch_rinfo[0]);

long long dagRefCount = -1;
RedisAI_RunInfo *orig;
if (device_complete == 1 || device_complete_after_run == 1) {
RedisAI_RunInfo *evicted_rinfo = (RedisAI_RunInfo *)(evicted_items[0]->value);
orig = evicted_rinfo->orig_copy;
// We decrease and get the reference count for the DAG.
dagRefCount = RAI_DagRunInfoFreeShallowCopy(evicted_rinfo);
}

// If the DAG was complete, then it's time to unblock the client
if (do_unblock == 1 || dag_complete_after_run == 1) {
// If the DAG was complete, then it's time to unblock the client
if (do_unblock == 1 || dag_complete_after_run == 1) {

// If the reference count for the DAG is zero and the client is still around,
// then we actually unblock the client
if (dagRefCount == 0) {
RedisAI_OnFinishCtx *finish_ctx = orig;
orig->OnFinish(finish_ctx, orig->private_data);
// If the reference count for the DAG is zero and the client is still around,
// then we actually unblock the client
if (dagRefCount == 0) {
RedisAI_OnFinishCtx *finish_ctx = orig;
orig->OnFinish(finish_ctx, orig->private_data);
}
}
}

Expand Down Expand Up @@ -499,11 +494,8 @@ void *RedisAI_Run_ThreadMain(void *arg) {

// If there's nothing else to do for the DAG in the current worker or if an error
// occurred in any worker, we just move on
if (device_complete == 1 || device_complete_after_run == 1 || do_unblock == 1 ||
run_error == 1) {
for (long long i = 0; i < array_len(evicted_items); i++) {
RedisModule_Free(evicted_items[i]);
}
for (long long i = 0; i < array_len(evicted_items); i++) {
RedisModule_Free(evicted_items[i]);
}
run_queue_len = queueLength(run_queue_info->run_queue);
}
Expand Down
96 changes: 37 additions & 59 deletions src/command_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,52 +125,41 @@ static int _ModelRunCtx_SetParams(RedisModuleCtx *ctx, RedisModuleString **inkey
int ParseModelRunCommand(RedisAI_RunInfo *rinfo, RedisModuleCtx *ctx, RedisModuleString **argv,
int argc) {

RAI_DagOp *currentOp;
RAI_InitDagOp(&currentOp);
rinfo->dagOps = array_append(rinfo->dagOps, currentOp);

// Build a ModelRunCtx from command.
RAI_Error error = {0};
RAI_Model *model;
RedisModuleString **inkeys = array_new(RedisModuleString *, 1);
RedisModuleString **outkeys = array_new(RedisModuleString *, 1);
RedisModuleString *runkey = NULL;
RAI_ModelRunCtx *mctx = NULL;
RAI_DagOp *currentOp;

long long timeout = 0;
if (_ModelRunCommand_ParseArgs(ctx, argv, argc, &model, &error, &inkeys, &outkeys, &runkey,
if (_ModelRunCommand_ParseArgs(ctx, argv, argc, &model, currentOp->err, &currentOp->inkeys,
&currentOp->outkeys, &currentOp->runkey,
&timeout) == REDISMODULE_ERR) {
RedisModule_ReplyWithError(ctx, RAI_GetErrorOneLine(&error));
RedisModule_ReplyWithError(ctx, RAI_GetErrorOneLine(currentOp->err));
goto cleanup;
}

if (timeout > 0 && !rinfo->single_op_dag) {
RedisModule_ReplyWithError(ctx, "ERR TIMEOUT not allowed within a DAG command");
goto cleanup;
}
mctx = RAI_ModelRunCtxCreate(model);

RAI_ModelRunCtx *mctx = RAI_ModelRunCtxCreate(model);
if (rinfo->single_op_dag) {
rinfo->timeout = timeout;
// Set params in ModelRunCtx, bring inputs from key space.
if (_ModelRunCtx_SetParams(ctx, inkeys, outkeys, mctx) == REDISMODULE_ERR)
if (_ModelRunCtx_SetParams(ctx, currentOp->inkeys, currentOp->outkeys, mctx) ==
REDISMODULE_ERR)
goto cleanup;
}
if (RAI_InitDagOp(&currentOp) == REDISMODULE_ERR) {
RedisModule_ReplyWithError(
ctx, "ERR Unable to allocate the memory and initialise the RAI_dagOp structure");
goto cleanup;
}

currentOp->commandType = REDISAI_DAG_CMD_MODELRUN;
Dag_PopulateOp(currentOp, mctx, inkeys, outkeys, runkey);
rinfo->dagOps = array_append(rinfo->dagOps, currentOp);
currentOp->mctx = mctx;
currentOp->devicestr = mctx->model->devicestr;
return REDISMODULE_OK;

cleanup:
for (size_t i = 0; i < array_len(inkeys); i++) {
RedisModule_FreeString(NULL, inkeys[i]);
}
array_free(inkeys);
for (size_t i = 0; i < array_len(outkeys); i++) {
RedisModule_FreeString(NULL, outkeys[i]);
}
array_free(outkeys);
if (runkey)
RedisModule_FreeString(NULL, runkey);
if (mctx)
RAI_ModelRunCtxFree(mctx);
RAI_FreeRunInfo(rinfo);
return REDISMODULE_ERR;
}
Expand Down Expand Up @@ -283,55 +272,44 @@ static int _ScriptRunCtx_SetParams(RedisModuleCtx *ctx, RedisModuleString **inke
int ParseScriptRunCommand(RedisAI_RunInfo *rinfo, RedisModuleCtx *ctx, RedisModuleString **argv,
int argc) {

RAI_DagOp *currentOp;
RAI_InitDagOp(&currentOp);
rinfo->dagOps = array_append(rinfo->dagOps, currentOp);

// Build a ScriptRunCtx from command.
RAI_Error error = {0};
RAI_Script *script;
RedisModuleString **inkeys = array_new(RedisModuleString *, 1);
RedisModuleString **outkeys = array_new(RedisModuleString *, 1);
RedisModuleString *runkey = NULL;
const char *func_name = NULL;
RAI_ScriptRunCtx *sctx = NULL;
RAI_DagOp *currentOp;

long long timeout = 0;
int variadic = -1;
if (_ScriptRunCommand_ParseArgs(ctx, argv, argc, &script, &error, &inkeys, &outkeys, &runkey,
&func_name, &timeout, &variadic) == REDISMODULE_ERR) {
RedisModule_ReplyWithError(ctx, RAI_GetErrorOneLine(&error));
if (_ScriptRunCommand_ParseArgs(ctx, argv, argc, &script, currentOp->err, &currentOp->inkeys,
&currentOp->outkeys, &currentOp->runkey, &func_name, &timeout,
&variadic) == REDISMODULE_ERR) {
RedisModule_ReplyWithError(ctx, RAI_GetErrorOneLine(currentOp->err));
goto cleanup;
}
sctx = RAI_ScriptRunCtxCreate(script, func_name);
if (timeout > 0 && !rinfo->single_op_dag) {
RedisModule_ReplyWithError(ctx, "ERR TIMEOUT not allowed within a DAG command");
goto cleanup;
}

RAI_ScriptRunCtx *sctx = RAI_ScriptRunCtxCreate(script, func_name);
sctx->variadic = variadic;

if (rinfo->single_op_dag) {
rinfo->timeout = timeout;
// Set params in ScriptRunCtx, bring inputs from key space.
if (_ScriptRunCtx_SetParams(ctx, inkeys, outkeys, sctx) == REDISMODULE_ERR)
if (_ScriptRunCtx_SetParams(ctx, currentOp->inkeys, currentOp->outkeys, sctx) ==
REDISMODULE_ERR)
goto cleanup;
}
if (RAI_InitDagOp(&currentOp) == REDISMODULE_ERR) {
RedisModule_ReplyWithError(
ctx, "ERR Unable to allocate the memory and initialise the RAI_dagOp structure");
goto cleanup;
}
currentOp->sctx = sctx;
currentOp->commandType = REDISAI_DAG_CMD_SCRIPTRUN;
Dag_PopulateOp(currentOp, sctx, inkeys, outkeys, runkey);
rinfo->dagOps = array_append(rinfo->dagOps, currentOp);
currentOp->devicestr = sctx->script->devicestr;

return REDISMODULE_OK;

cleanup:
for (size_t i = 0; i < array_len(inkeys); i++) {
RedisModule_FreeString(NULL, inkeys[i]);
}
array_free(inkeys);
for (size_t i = 0; i < array_len(outkeys); i++) {
RedisModule_FreeString(NULL, outkeys[i]);
}
array_free(outkeys);
if (runkey)
RedisModule_FreeString(NULL, runkey);
if (sctx)
RAI_ScriptRunCtxFree(sctx);
RAI_FreeRunInfo(rinfo);
return REDISMODULE_ERR;
}
Expand Down
3 changes: 0 additions & 3 deletions src/err.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ void RAI_SetError(RAI_Error *err, RAI_ErrorCode code, const char *detail) {
int RAI_InitError(RAI_Error **result) {
RAI_Error *err;
err = (RAI_Error *)RedisModule_Calloc(1, sizeof(RAI_Error));
if (!err) {
return REDISMODULE_ERR;
}
err->code = 0;
err->detail = NULL;
err->detail_oneline = NULL;
Expand Down
26 changes: 14 additions & 12 deletions src/model.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,24 @@ static void *RAI_Model_RdbLoad(struct RedisModuleIO *io, int encver) {
return NULL;
}

for (size_t i = 0; i < ninputs; i++) {
RedisModule_Free(inputs[i]);
}
for (size_t i = 0; i < noutputs; i++) {
RedisModule_Free(outputs[i]);
}
RedisModule_Free(inputs);
RedisModule_Free(outputs);
RedisModule_Free(buffer);

RedisModuleCtx *stats_ctx = RedisModule_GetContextFromIO(io);
RedisModuleString *stats_keystr =
RedisModule_CreateStringFromString(stats_ctx, RedisModule_GetKeyNameFromIO(io));
const char *stats_devicestr = RedisModule_Strdup(devicestr);
RedisModuleString *stats_tag = RAI_HoldString(NULL, tag);

model->infokey =
RAI_AddStatsEntry(stats_ctx, stats_keystr, RAI_MODEL, backend, stats_devicestr, stats_tag);
model->infokey = RAI_AddStatsEntry(stats_ctx, stats_keystr, RAI_MODEL, backend, devicestr, tag);

RedisModule_FreeString(NULL, tag);
RedisModule_Free(devicestr);
RedisModule_FreeString(NULL, stats_keystr);

return model;
Expand Down Expand Up @@ -371,7 +376,6 @@ void RAI_ModelFree(RAI_Model *model, RAI_Error *err) {
}

RedisModule_FreeString(NULL, model->tag);

RAI_RemoveStatsEntry(model->infokey);

RedisModule_Free(model);
Expand Down Expand Up @@ -504,19 +508,17 @@ RedisModuleType *RAI_ModelRedisType(void) { return RedisAI_ModelType; }
int RAI_ModelRunAsync(RAI_ModelRunCtx *mctx, RAI_OnFinishCB ModelAsyncFinish, void *private_data) {

RedisAI_RunInfo *rinfo = NULL;
if (RAI_InitRunInfo(&rinfo) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
RAI_InitRunInfo(&rinfo);

rinfo->single_op_dag = 1;
rinfo->OnFinish = (RedisAI_OnFinishCB)ModelAsyncFinish;
rinfo->private_data = private_data;

RAI_DagOp *op;
if (RAI_InitDagOp(&op) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
RAI_InitDagOp(&op);
op->commandType = REDISAI_DAG_CMD_MODELRUN;
Dag_PopulateOp(op, mctx, NULL, NULL, NULL);
op->devicestr = mctx->model->devicestr;
op->mctx = mctx;

rinfo->dagOps = array_append(rinfo->dagOps, op);
rinfo->dagOpCount = 1;
Expand Down
Loading