Skip to content

Commit

Permalink
Redmine#7435: jq integration for mapdata()
Browse files Browse the repository at this point in the history
  • Loading branch information
tzz committed Dec 21, 2015
1 parent ccaf5d1 commit bb227d8
Show file tree
Hide file tree
Showing 4 changed files with 350 additions and 228 deletions.
260 changes: 34 additions & 226 deletions cf-agent/package_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -201,219 +201,6 @@ static void ParseAndLogErrorMessage(const Rlist *data)
}
}

static int IsReadWriteReady(const IOData *io, int timeout_sec)
{
fd_set rset;
FD_ZERO(&rset);
FD_SET(io->read_fd, &rset);

struct timeval tv = {
.tv_sec = timeout_sec,
.tv_usec = 0,
};

//TODO: For Windows we will need different method and select might not
// work with file descriptors.
int ret = select(io->read_fd + 1, &rset, NULL, NULL, &tv);

if (ret < 0)
{
Log(LOG_LEVEL_VERBOSE, "Failed checking for data. (select: %s)",
GetErrorStr());
return -1;
}
else if (FD_ISSET(io->read_fd, &rset))
{
return io->read_fd;
}

/* We have reached timeout */
if (ret == 0)
{
Log(LOG_LEVEL_DEBUG, "Timeout reading from package module.");
return 0;
}

Log(LOG_LEVEL_VERBOSE,
"Unknown outcome (ret > 0 but our only fd is not set).");

return -1;
}

static Rlist *ReadDataFromPackageScript(const IOData *io)
{
char buff[CF_BUFSIZE] = {0};

Buffer *data = BufferNew();
if (!data)
{
Log(LOG_LEVEL_VERBOSE,
"Unable to allocate buffer for handling package module responses.");
return NULL;
}

int timeout_seconds_left = PACKAGE_PROMISE_SCRIPT_TIMEOUT_SEC;

while (!IsPendingTermination() && timeout_seconds_left > 0)
{
int fd = IsReadWriteReady(io, PACKAGE_PROMISE_TERMINATION_CHECK_SEC);

if (fd < 0)
{
Log(LOG_LEVEL_VERBOSE,
"Error reading data from package module: %s",
GetErrorStr());
return NULL;
}
else if (fd == io->read_fd)
{
ssize_t res = read(fd, buff, sizeof(buff) - 1);
if (res == -1)
{
if (errno == EINTR)
{
continue;
}
else
{
Log(LOG_LEVEL_ERR,
"Unable to read output from package module: %s",
GetErrorStr());
BufferDestroy(data);
return NULL;
}
}
else if (res == 0) /* reached EOF */
{
break;
}
Log(LOG_LEVEL_DEBUG, "Data read from package module: %zu [%s]",
res, buff);

BufferAppendString(data, buff);
memset(buff, 0, sizeof(buff));
}
else if (fd == 0) /* timeout */
{
timeout_seconds_left -= PACKAGE_PROMISE_TERMINATION_CHECK_SEC;
continue;
}
}

char *read_string = BufferClose(data);
Rlist *response_lines = RlistFromSplitString(read_string, '\n');
free(read_string);

return response_lines;
}

static int WriteScriptData(const char *data, IOData *io)
{
/* If there is nothing to write close writing end of pipe. */
if (data == NULL || strlen(data) == 0)
{
if (io->write_fd >= 0)
{
cf_pclose_full_duplex_side(io->write_fd);
io->write_fd = -1;
}
return 0;
}

ssize_t wrt = write(io->write_fd, data, strlen(data));

/* Make sure to close write_fd after sending all data. */
if (io->write_fd >= 0)
{
cf_pclose_full_duplex_side(io->write_fd);
io->write_fd = -1;
}
return wrt;
}

int WriteDataToPackageModule(const char *args, const char *data,
const PackageModuleWrapper *wrapper)
{
char *command = StringFormat("%s %s", wrapper->path, args);
IOData io = cf_popen_full_duplex(command, false);
free(command);

if (io.write_fd == -1 || io.read_fd == -1)
{
Log(LOG_LEVEL_VERBOSE, "Error occurred while opening pipes for "
"communication with package module.");
return -1;
}

Log(LOG_LEVEL_DEBUG, "Opened fds %d and %d for command '%s'.",
io.read_fd, io.write_fd, args);

int res = 0;
if (WriteScriptData(data, &io) != strlen(data))
{
Log(LOG_LEVEL_VERBOSE,
"Was not able to send whole data to package module.");
res = -1;
}

/* If script returns non 0 status */
int close = cf_pclose_full_duplex(&io);
if (close != EXIT_SUCCESS)
{
Log(LOG_LEVEL_VERBOSE,
"Package module returned with non zero return code: %d",
close);
res = -1;
}
return res;
}

/* In some cases the response is expected to be not filled out. Some requests
will have response filled only in case of errors. */
static int ReadWriteDataToPackageScript(const char *args, const char *request,
Rlist **response,
const PackageModuleWrapper *wrapper)
{
assert(args && wrapper);

char *command = StringFormat("%s %s", wrapper->path, args);
IOData io = cf_popen_full_duplex(command, false);
free(command);

if (io.write_fd == -1 || io.read_fd == -1)
{
Log(LOG_LEVEL_INFO, "Some error occurred while communicating "
"package module.");
return -1;
}

Log(LOG_LEVEL_DEBUG, "Opened fds %d and %d for command '%s'.",
io.read_fd, io.write_fd, args);

if (WriteScriptData(request, &io) != strlen(request))
{
Log(LOG_LEVEL_VERBOSE, "Couldn't send whole data to package module.");
return -1;
}

/* We can have some error message here. */
Rlist *res = ReadDataFromPackageScript(&io);

/* If script returns non 0 status */
int close = cf_pclose_full_duplex(&io);
if (close != EXIT_SUCCESS)
{
Log(LOG_LEVEL_VERBOSE,
"Package module returned with non zero return code: %d",
close);
RlistDestroy(res);
return -1;
}

*response = res;
return 0;
}

static void FreePackageInfo(PackageInfo *package_info)
{
if (package_info)
Expand Down Expand Up @@ -537,19 +324,23 @@ static PackageInfo *ParseAndCheckPackageDataReply(const Rlist *data)

static int NegotiateSupportedAPIVersion(PackageModuleWrapper *wrapper)
{
assert(wrapper);

Log(LOG_LEVEL_DEBUG, "Getting supported API version.");

int api_version = -1;

Rlist *response = NULL;
if (ReadWriteDataToPackageScript("supports-api-version", "",
&response, wrapper) != 0)
if (PipeReadWriteData(wrapper->path, "supports-api-version", "",
&response,
PACKAGE_PROMISE_SCRIPT_TIMEOUT_SEC,
PACKAGE_PROMISE_TERMINATION_CHECK_SEC) != 0)
{
Log(LOG_LEVEL_INFO,
"Error occurred while getting supported API version.");
return -1;
}

if (response)
{
if (RlistLen(response) == 1)
Expand All @@ -568,7 +359,9 @@ static
PackageInfo *GetPackageData(const char *name, const char *version,
const char *architecture, Rlist *options,
const PackageModuleWrapper *wrapper)
{
{
assert(wrapper);

Log(LOG_LEVEL_DEBUG, "Getting package '%s' data.", name);

char *options_str = ParseOptions(options);
Expand All @@ -584,8 +377,10 @@ PackageInfo *GetPackageData(const char *name, const char *version,
free(arch);

Rlist *response = NULL;
if (ReadWriteDataToPackageScript("get-package-data", request, &response,
wrapper) != 0)
if (PipeReadWriteData(wrapper->path, "get-package-data", request,
&response,
PACKAGE_PROMISE_SCRIPT_TIMEOUT_SEC,
PACKAGE_PROMISE_TERMINATION_CHECK_SEC) != 0)
{
Log(LOG_LEVEL_INFO, "Some error occurred while communicating with "
"package module while collecting package data.");
Expand Down Expand Up @@ -873,6 +668,8 @@ int UpdatePackagesDB(Rlist *data, const char *pm_name, UpdateType type)
bool UpdateCache(Rlist* options, const PackageModuleWrapper *wrapper,
UpdateType type)
{
assert(wrapper);

Log(LOG_LEVEL_DEBUG, "Updating cache: %d", type);

char *options_str = ParseOptions(options);
Expand All @@ -891,8 +688,11 @@ bool UpdateCache(Rlist* options, const PackageModuleWrapper *wrapper,
{
req_type = "list-updates-local";
}
if (ReadWriteDataToPackageScript(req_type, options_str, &response,
wrapper) != 0)

if (PipeReadWriteData(wrapper->path, req_type, options_str,
&response,
PACKAGE_PROMISE_SCRIPT_TIMEOUT_SEC,
PACKAGE_PROMISE_TERMINATION_CHECK_SEC) != 0)
{
Log(LOG_LEVEL_VERBOSE, "Some error occurred while communicating with "
"package module while updating cache.");
Expand Down Expand Up @@ -971,6 +771,8 @@ PromiseResult RemovePackage(const char *name, Rlist* options,
const char *version, const char *architecture,
const PackageModuleWrapper *wrapper)
{
assert(wrapper);

Log(LOG_LEVEL_DEBUG, "Removing package '%s'", name);

char *options_str = ParseOptions(options);
Expand All @@ -984,8 +786,10 @@ PromiseResult RemovePackage(const char *name, Rlist* options,
PromiseResult res = PROMISE_RESULT_CHANGE;

Rlist *error_message = NULL;
if (ReadWriteDataToPackageScript("remove", request,
&error_message, wrapper) != 0)
if (PipeReadWriteData(wrapper->path, "remove", request,
&error_message,
PACKAGE_PROMISE_SCRIPT_TIMEOUT_SEC,
PACKAGE_PROMISE_TERMINATION_CHECK_SEC) != 0)
{
Log(LOG_LEVEL_INFO,
"Error communicating package module while removing package.");
Expand All @@ -1012,6 +816,8 @@ static PromiseResult InstallPackageGeneric(Rlist *options,
PackageType type, const char *packages_list_formatted,
const PackageModuleWrapper *wrapper)
{
assert(wrapper);

Log(LOG_LEVEL_DEBUG,
"Installing %s type package: '%s'",
type == PACKAGE_TYPE_FILE ? "file" : "repo",
Expand Down Expand Up @@ -1042,8 +848,10 @@ static PromiseResult InstallPackageGeneric(Rlist *options,
request);

Rlist *error_message = NULL;
if (ReadWriteDataToPackageScript(package_install_command, request,
&error_message, wrapper) != 0)
if (PipeReadWriteData(wrapper->path, package_install_command, request,
&error_message,
PACKAGE_PROMISE_SCRIPT_TIMEOUT_SEC,
PACKAGE_PROMISE_TERMINATION_CHECK_SEC) != 0)
{
Log(LOG_LEVEL_INFO, "Some error occurred while communicating with "
"package module while installing package.");
Expand Down
Loading

0 comments on commit bb227d8

Please sign in to comment.