Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sched/plugin: track flux-core module API changes #414

Merged
merged 1 commit into from Dec 3, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
191 changes: 110 additions & 81 deletions sched/plugin.c
Expand Up @@ -28,6 +28,7 @@
#include <dlfcn.h>
#include <flux/core.h>
#include <czmq.h>
#include <argz.h>

#if HAVE_VALGRIND
# if HAVE_VALGRIND_H
Expand Down Expand Up @@ -196,6 +197,12 @@ void priority_plugin_unload (struct sched_plugin_loader *sploader)
}
}

static void module_dlerror (const char *errmsg, void *arg)
{
flux_t *h = arg;
flux_log (h, LOG_DEBUG, "flux_modname: %s", errmsg);
}

int sched_plugin_load (struct sched_plugin_loader *sploader, const char *s)
{
char *path = NULL;
Expand All @@ -208,7 +215,7 @@ int sched_plugin_load (struct sched_plugin_loader *sploader, const char *s)
goto error;
}
if (strchr (s, '/')) {
if (!(name = flux_modname (s))) {
if (!(name = flux_modname (s, module_dlerror, sploader->h))) {
flux_log (sploader->h, LOG_ERR, "%s: %s", s, dlerror ());
errno = ENOENT;
goto error;
Expand All @@ -218,12 +225,13 @@ int sched_plugin_load (struct sched_plugin_loader *sploader, const char *s)
goto error;
}
} else {
if (!(path = flux_modfind (searchpath, s))) {
if (!(path = flux_modfind (searchpath, s,
module_dlerror, sploader->h))) {
flux_log (sploader->h, LOG_ERR,
"%s: not found in module search path %s", s, searchpath);
goto error;
}
if (!(name = flux_modname (path)))
if (!(name = flux_modname (path, module_dlerror, sploader->h)))
goto error;
}
if (!(dso = dlopen (path, RTLD_NOW | RTLD_LOCAL | RTLD_DEEPBIND))) {
Expand Down Expand Up @@ -287,72 +295,109 @@ static void rmmod_cb (flux_t *h, flux_msg_handler_t *w,
struct sched_plugin_loader *sploader = arg;
struct behavior_plugin *behavior_plugin = behavior_plugin_get (sploader);
struct priority_plugin *priority_plugin = priority_plugin_get (sploader);
const char *json_str;
char *name = NULL;
int rc = -1;

if (flux_request_decode (msg, NULL, &json_str) < 0)
goto done;
if (!json_str) {
errno = EPROTO;
goto done;
}
if (flux_rmmod_json_decode (json_str, &name) < 0)
goto done;
const char *name;
bool found = false;

if (flux_request_unpack (msg, NULL, "{s:s}", "name", &name) < 0)
goto error;
if ((behavior_plugin) && (strcmp (name, behavior_plugin->name) == 0)) {
behavior_plugin_unload (sploader);
flux_log (h, LOG_INFO, "%s unloaded", name);
rc = 0;
found = true;
}
if ((priority_plugin) && (strcmp (name, priority_plugin->name) == 0)) {
priority_plugin_unload (sploader);
flux_log (h, LOG_INFO, "%s unloaded", name);
rc = 0;
found = true;
}
if (rc)
if (!found) {
errno = ENOENT;
done:
if (flux_respond (h, msg, rc < 0 ? errno : 0, NULL) < 0)
goto error;
}
if (flux_respond (h, msg, 0, NULL) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
if (name)
free (name);
return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
}

static void insmod_cb (flux_t *h, flux_msg_handler_t *w,
const flux_msg_t *msg, void *arg)
{
struct sched_plugin_loader *sploader = arg;
const sched_params_t *sp = sched_params_get (h);
const char *json_str;
char *path = NULL;
json_t *args;
size_t index;
json_t *value;
const char *path = NULL;
char *argz = NULL;
size_t argz_len = 0;
int rc = -1;
error_t e;

if (flux_request_decode (msg, NULL, &json_str) < 0)
goto done;
if (!json_str) {
errno = EPROTO;
goto done;
if (flux_request_unpack (msg, NULL, "{s:s s:o}", "path", &path,
"args", &args) < 0)
goto error;
if (!json_is_array (args))
goto proto;
json_array_foreach (args, index, value) {
if (!json_is_string (value))
goto proto;
if ((e = argz_add (&argz, &argz_len, json_string_value (value)))) {
errno = e;
goto error;
}
}
if (flux_insmod_json_decode (json_str, &path, &argz, &argz_len) < 0)
goto done;
if (sched_plugin_load (sploader, path) < 0)
goto done;
goto error;
if (argz && sploader->behavior_plugin->process_args (sploader->h, argz,
argz_len, sp) < 0) {
goto done;
argz_len, sp) < 0) {
goto error;
}
rc = 0;

done:
if (flux_respond (h, msg, rc < 0 ? errno : 0, NULL) < 0)
if (flux_respond (h, msg, 0, NULL) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
if (path)
free (path);
if (argz)
free (argz);
free (argz);
return;
proto:
errno = EPROTO;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
free (argz);
}

/* For plugin 'name' at 'path', append an RFC 5 lsmod module record to
* the JSON array 'mods'. Return 0 on success, -1 on failure with errno set.
*/
static int lsmod_plugin_append (const char *name, const char *path,
json_t *mods)
{
zfile_t *zf;
struct stat sb;
json_t *entry;

if (stat (path, &sb) < 0)
return -1;
if (!(zf = zfile_new (NULL, path)))
return -1;
entry = json_pack ("{s:s s:i s:s s:i s:i s:[]}",
"name", name,
"size", sb.st_size,
"digest", zfile_digest (zf),
"idle", 0,
"status", FLUX_MODSTATE_RUNNING,
"services");
zfile_destroy (&zf);
if (!entry)
goto nomem;
if (json_array_append_new (mods, entry) < 0) {
json_decref (entry);
goto nomem;
}
return 0;
nomem:
errno = ENOMEM;
return -1;
}

static void lsmod_cb (flux_t *h, flux_msg_handler_t *w,
Expand All @@ -361,48 +406,32 @@ static void lsmod_cb (flux_t *h, flux_msg_handler_t *w,
struct sched_plugin_loader *sploader = arg;
struct behavior_plugin *behavior_plugin = behavior_plugin_get (sploader);
struct priority_plugin *priority_plugin = priority_plugin_get (sploader);
flux_modlist_t *mods = NULL;
zfile_t *zf = NULL;
char *json_str = NULL;
struct stat sb;
int rc = -1;
json_t *mods = NULL;

if (flux_request_decode (msg, NULL, NULL) < 0)
goto done;
if (!(mods = flux_modlist_create ()))
goto done;
if (behavior_plugin) {
if (stat (behavior_plugin->path, &sb) < 0)
goto done;
if (!(zf = zfile_new (NULL, behavior_plugin->path)))
goto done;
if (flux_modlist_append (mods, behavior_plugin->name, sb.st_size,
zfile_digest (zf),
0, FLUX_MODSTATE_RUNNING) < 0)
goto done;
}
goto error;
if (!(mods = json_array ()))
goto nomem;
if (priority_plugin) {
if (stat (priority_plugin->path, &sb) < 0)
goto done;
if (!(zf = zfile_new (NULL, priority_plugin->path)))
goto done;
if (flux_modlist_append (mods, priority_plugin->name, sb.st_size,
zfile_digest (zf),
0, FLUX_MODSTATE_RUNNING) < 0)
goto done;
if (lsmod_plugin_append (priority_plugin->name,
priority_plugin->path, mods) < 0)
goto error;
}
if (!(json_str = flux_lsmod_json_encode (mods)))
goto done;
rc = 0;
done:
if (flux_respond (h, msg, rc < 0 ? errno : 0,
rc < 0 ? NULL : json_str) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
if (mods)
flux_modlist_destroy (mods);
zfile_destroy (&zf);
if (json_str)
free (json_str);
if (behavior_plugin) {
if (lsmod_plugin_append (behavior_plugin->name,
behavior_plugin->path, mods) < 0)
goto error;
}
if (flux_respond_pack (h, msg, "{s:O}", "mods", mods) < 0)
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
json_decref (mods);
return;
nomem:
errno = ENOMEM;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
json_decref (mods);
}


Expand Down