Skip to content
This repository has been archived by the owner on May 28, 2019. It is now read-only.

Commit

Permalink
Merge pull request #1 from leoYY/master
Browse files Browse the repository at this point in the history
add close_on_exec and agent_work_dir
  • Loading branch information
imotai committed Mar 23, 2015
2 parents e66e471 + 23e0e05 commit cb86c24
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 33 deletions.
1 change: 1 addition & 0 deletions COMAKE
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ CopyUsingHardLink(True)
#ENABLE_MULTI_LIBS(True)

#C预处理器参数.
#CPPFLAGS('-D_GNU_SOURCE -D__STDC_LIMIT_MACROS -DVERSION=\\\"1.9.8.7\\\" -DAGENT_WORK_DIR=\\\"test\/\\\"')
CPPFLAGS('-D_GNU_SOURCE -D__STDC_LIMIT_MACROS -DVERSION=\\\"1.9.8.7\\\"')
#为32位目标编译指定额外的预处理参数
#CPPFLAGS_32('-D_XOPEN_SOURE=500')
Expand Down
24 changes: 0 additions & 24 deletions Makefile

This file was deleted.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Galaxy
## Todolist
###主线:
1. Agent启动子进程时能关闭自己所有的文件
1. Agent启动子进程时能关闭自己所有的文件 -- **Done**(FD_CLOEXEC解决)
2. Agent启动任务时,放在本任务专用一个文件夹里,而不是现在的/home/下


Expand Down
64 changes: 64 additions & 0 deletions common/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@
#ifndef COMMON_UTIL_H_
#define COMMON_UTIL_H_

#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/types.h>
#include <dirent.h>

#include <map>
#include <sstream>
#include <string>

// use only in linux
extern char** environ;

namespace common {
namespace util {

Expand All @@ -20,6 +33,57 @@ std::string GetLocalHostName() {
return hostname;
}

void CloseOnExec(int fd) {
int flags = fcntl(fd, F_GETFD);
flags |= FD_CLOEXEC;
fcntl(fd, F_SETFD, flags);
}

void GetEnviron(std::map<std::string, std::string>& env) {
char** cur_environ = environ;

env.clear();
for (int index = 0; cur_environ[index] != NULL; index++) {
// split =
std::string env_item(cur_environ[index]);
size_t pos = env_item.find_first_of('=');
if (pos == std::string::npos) {
// invalid format
continue;
}
env[env_item.substr(0, pos)] = env_item.substr(pos + 1);
}
return;
}

void GetProcessFdList(int pid, std::vector<int>& fds) {
std::ostringstream stream;
stream << pid;
std::string proc_path = "/proc/";
proc_path.append(stream.str());
proc_path.append("/fd/");

DIR* dir = opendir(proc_path.c_str());
if (dir == NULL) {
return;
}

fds.push_back(0);

struct dirent* entry;
while ((entry = readdir(dir)) != NULL) {
if (strncmp(entry->d_name, ".", 1) == 0) {
continue;
}
int fd = atoi(entry->d_name);
if (fd == 0) {
continue;
}
fds.push_back(fd);
}
closedir(dir);
}

}
}

Expand Down
67 changes: 60 additions & 7 deletions src/agent/agent_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
#include <fcntl.h>

#include <boost/bind.hpp>
#include <errno.h>
#include <string.h>

#include "common/util.h"
#include "rpc/rpc_client.h"

extern std::string FLAGS_master_addr;
extern std::string FLAGS_agent_port;
extern std::string FLAGS_agent_work_dir;

namespace galaxy {

Expand Down Expand Up @@ -46,7 +49,7 @@ void AgentImpl::Report() {
void AgentImpl::OpenProcess(const std::string& task_name,
const std::string& task_raw,
const std::string& cmd_line) {
std::string task_path = "/home/" + task_name;
std::string task_path = FLAGS_agent_work_dir + task_name;
int fd = open(task_path.c_str(), O_CREAT | O_TRUNC | O_WRONLY, S_IRWXU);
if (fd < 0) {
LOG(WARNING, "Open %s sor write fail", task_path.c_str());
Expand All @@ -58,20 +61,70 @@ void AgentImpl::OpenProcess(const std::string& task_name,
}
LOG(INFO, "Write %d bytes to %s", len, task_path.c_str());
close(fd);
/* FILE* fp = fopen(task_name.c_str(), "w");
fwrite(task_raw.data(), task_raw.size(), 1, fp);
fclose(fp);
*/
LOG(INFO,"Fork to run %s", task_name.c_str());

LOG(INFO, "Fork to Run %s", task_name.c_str());
pid_t pid = fork();
if (pid != 0) {
return;
}
execl("/bin/sh", "sh", "-c", task_path.c_str(), NULL);
std::string root_path = FLAGS_agent_work_dir;
RunInnerChildProcess(root_path, cmd_line);
return;
}

void AgentImpl::RunInnerChildProcess(const std::string& root_path,
const std::string& cmd_line) {
// do some prepare
// 1. change stdout/stderr
std::string task_stdout = root_path + "./stdout";
std::string task_stderr = root_path + "./stderr";
int stdout_fd = open(task_stdout.c_str(), O_CREAT | O_TRUNC | O_WRONLY, S_IRWXU);
int stderr_fd = open(task_stderr.c_str(), O_CREAT | O_TRUNC | O_WRONLY, S_IRWXU);
dup2(stdout_fd, STDOUT_FILENO);
dup2(stderr_fd, STDERR_FILENO);
common::util::CloseOnExec(stdout_fd);
common::util::CloseOnExec(stderr_fd);

// 2. change PWD
//std::map<std::string, std::string> env;
//common::util::GetEnviron(env);
//std::map<std::string, std::string>::iterator it = env.find("PWD");
//if (it != env.end()) {
// it->second = root_path;
//}
//
//it = env.begin();
//const char* envp[env.size() + 1];
//for (int i = 0;it != env.end(); ++i, ++it) {
// envp[i] = it->second.c_str();
//}
//envp[env.size()] = NULL;

//if (setenv("PWD", root_path.c_str(), 1) != 0) {
// LOG(INFO, "set pwd failed %d %s", errno, strerror(errno));
//}

// 3. close on exec
int pid = getpid();
std::vector<int> fds;
common::util::GetProcessFdList(pid, fds);
for (size_t i = 3; i < fds.size(); i++) {
common::util::CloseOnExec(fds[i]);
}
chdir(root_path.c_str());

LOG(INFO, "RunInnerChildProcess task %s", cmd_line.c_str());

int ret = execl("/bin/sh", "sh", "-c", cmd_line.c_str(), NULL);
if (ret != 0) {
LOG(INFO, "exec failed %d %s", errno, strerror(errno));
}

/* Exit the child process if execl fails */
assert(0);
_exit(127);
}

void AgentImpl::RunTask(::google::protobuf::RpcController* controller,
const ::galaxy::RunTaskRequest* request,
::galaxy::RunTaskResponse* response,
Expand Down
2 changes: 2 additions & 0 deletions src/agent/agent_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class AgentImpl : public Agent {
void OpenProcess(const std::string& task_name,
const std::string& task_raw,
const std::string& cmd_line);
void RunInnerChildProcess(const std::string& root_path,
const std::string& cmd_line);
private:
common::ThreadPool thread_pool_;
RpcClient* rpc_client_;
Expand Down
6 changes: 5 additions & 1 deletion src/agent/agent_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

extern std::string FLAGS_agent_port;
extern std::string FLAGS_master_addr;
extern std::string FLAGS_agent_work_dir;

static volatile bool s_quit = false;
static void SignalIntHandler(int /*sig*/)
Expand All @@ -28,7 +29,10 @@ int main(int argc, char* argv[])
FLAGS_agent_port = s;
} else if (sscanf(argv[i], "--master=%s", s) == 1) {
FLAGS_master_addr = s;
} else {
} else if (sscanf(argv[i], "--work_dir=%s", s) == 1) {
FLAGS_agent_work_dir = s;
}
else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1);
}
Expand Down
8 changes: 8 additions & 0 deletions src/flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,12 @@ std::string FLAGS_master_port = "8101";
std::string FLAGS_agent_port = "8102";
std::string FLAGS_master_addr = "localhost:" + FLAGS_master_port;

#ifdef AGENT_WORK_DIR
const char* work_dir = AGENT_WORK_DIR;
std::string FLAGS_agent_work_dir(work_dir);
#else
std::string FLAGS_agent_work_dir;
#endif


/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */

0 comments on commit cb86c24

Please sign in to comment.