Skip to content
This repository has been archived by the owner on May 28, 2019. It is now read-only.

add close_on_exec and agent_work_dir #1

Merged
merged 5 commits into from
Mar 23, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions COMAKE
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ CopyUsingHardLink(True)
#ENABLE_MULTI_LIBS(True)

#C预处理器参数.
#CPPFLAGS('-D_GNU_SOURCE -D__STDC_LIMIT_MACROS -DVERSION=\\\"1.9.8.7\\\" -DAGENT_WORK_DIR=\\\"test\/\\\"')
CPPFLAGS('-D_GNU_SOURCE -D__STDC_LIMIT_MACROS -DVERSION=\\\"1.9.8.7\\\"')
#为32位目标编译指定额外的预处理参数
#CPPFLAGS_32('-D_XOPEN_SOURE=500')
Expand Down
24 changes: 0 additions & 24 deletions Makefile

This file was deleted.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Galaxy
## Todolist
###主线:
1. Agent启动子进程时能关闭自己所有的文件
1. Agent启动子进程时能关闭自己所有的文件 -- **Done**(FD_CLOEXEC解决)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没看懂这个怎么解决的

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mesos得做法是用的libprocess,libprocess本身再处理socket得时候就考虑了这点 所以直接设置了cloexec,暂时得解决办法只能扫/proc下得fd,clone的方法暂未发现能够解决

2. Agent启动任务时,放在本任务专用一个文件夹里,而不是现在的/home/下


Expand Down
64 changes: 64 additions & 0 deletions common/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@
#ifndef COMMON_UTIL_H_
#define COMMON_UTIL_H_

#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/types.h>
#include <dirent.h>

#include <map>
#include <sstream>
#include <string>

// use only in linux
extern char** environ;

namespace common {
namespace util {

Expand All @@ -20,6 +33,57 @@ std::string GetLocalHostName() {
return hostname;
}

void CloseOnExec(int fd) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个函数在哪调用了?

int flags = fcntl(fd, F_GETFD);
flags |= FD_CLOEXEC;
fcntl(fd, F_SETFD, flags);
}

void GetEnviron(std::map<std::string, std::string>& env) {
char** cur_environ = environ;

env.clear();
for (int index = 0; cur_environ[index] != NULL; index++) {
// split =
std::string env_item(cur_environ[index]);
size_t pos = env_item.find_first_of('=');
if (pos == std::string::npos) {
// invalid format
continue;
}
env[env_item.substr(0, pos)] = env_item.substr(pos + 1);
}
return;
}

void GetProcessFdList(int pid, std::vector<int>& fds) {
std::ostringstream stream;
stream << pid;
std::string proc_path = "/proc/";
proc_path.append(stream.str());
proc_path.append("/fd/");

DIR* dir = opendir(proc_path.c_str());
if (dir == NULL) {
return;
}

fds.push_back(0);

struct dirent* entry;
while ((entry = readdir(dir)) != NULL) {
if (strncmp(entry->d_name, ".", 1) == 0) {
continue;
}
int fd = atoi(entry->d_name);
if (fd == 0) {
continue;
}
fds.push_back(fd);
}
closedir(dir);
}

}
}

Expand Down
67 changes: 60 additions & 7 deletions src/agent/agent_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
#include <fcntl.h>

#include <boost/bind.hpp>
#include <errno.h>
#include <string.h>

#include "common/util.h"
#include "rpc/rpc_client.h"

extern std::string FLAGS_master_addr;
extern std::string FLAGS_agent_port;
extern std::string FLAGS_agent_work_dir;

namespace galaxy {

Expand Down Expand Up @@ -46,7 +49,7 @@ void AgentImpl::Report() {
void AgentImpl::OpenProcess(const std::string& task_name,
const std::string& task_raw,
const std::string& cmd_line) {
std::string task_path = "/home/" + task_name;
std::string task_path = FLAGS_agent_work_dir + task_name;
int fd = open(task_path.c_str(), O_CREAT | O_TRUNC | O_WRONLY, S_IRWXU);
if (fd < 0) {
LOG(WARNING, "Open %s sor write fail", task_path.c_str());
Expand All @@ -58,20 +61,70 @@ void AgentImpl::OpenProcess(const std::string& task_name,
}
LOG(INFO, "Write %d bytes to %s", len, task_path.c_str());
close(fd);
/* FILE* fp = fopen(task_name.c_str(), "w");
fwrite(task_raw.data(), task_raw.size(), 1, fp);
fclose(fp);
*/
LOG(INFO,"Fork to run %s", task_name.c_str());

LOG(INFO, "Fork to Run %s", task_name.c_str());
pid_t pid = fork();
if (pid != 0) {
return;
}
execl("/bin/sh", "sh", "-c", task_path.c_str(), NULL);
std::string root_path = FLAGS_agent_work_dir;
RunInnerChildProcess(root_path, cmd_line);
return;
}

void AgentImpl::RunInnerChildProcess(const std::string& root_path,
const std::string& cmd_line) {
// do some prepare
// 1. change stdout/stderr
std::string task_stdout = root_path + "./stdout";
std::string task_stderr = root_path + "./stderr";
int stdout_fd = open(task_stdout.c_str(), O_CREAT | O_TRUNC | O_WRONLY, S_IRWXU);
int stderr_fd = open(task_stderr.c_str(), O_CREAT | O_TRUNC | O_WRONLY, S_IRWXU);
dup2(stdout_fd, STDOUT_FILENO);
dup2(stderr_fd, STDERR_FILENO);
common::util::CloseOnExec(stdout_fd);
common::util::CloseOnExec(stderr_fd);

// 2. change PWD
//std::map<std::string, std::string> env;
//common::util::GetEnviron(env);
//std::map<std::string, std::string>::iterator it = env.find("PWD");
//if (it != env.end()) {
// it->second = root_path;
//}
//
//it = env.begin();
//const char* envp[env.size() + 1];
//for (int i = 0;it != env.end(); ++i, ++it) {
// envp[i] = it->second.c_str();
//}
//envp[env.size()] = NULL;

//if (setenv("PWD", root_path.c_str(), 1) != 0) {
// LOG(INFO, "set pwd failed %d %s", errno, strerror(errno));
//}

// 3. close on exec
int pid = getpid();
std::vector<int> fds;
common::util::GetProcessFdList(pid, fds);
for (size_t i = 3; i < fds.size(); i++) {
common::util::CloseOnExec(fds[i]);
}
chdir(root_path.c_str());

LOG(INFO, "RunInnerChildProcess task %s", cmd_line.c_str());

int ret = execl("/bin/sh", "sh", "-c", cmd_line.c_str(), NULL);
if (ret != 0) {
LOG(INFO, "exec failed %d %s", errno, strerror(errno));
}

/* Exit the child process if execl fails */
assert(0);
_exit(127);
}

void AgentImpl::RunTask(::google::protobuf::RpcController* controller,
const ::galaxy::RunTaskRequest* request,
::galaxy::RunTaskResponse* response,
Expand Down
2 changes: 2 additions & 0 deletions src/agent/agent_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class AgentImpl : public Agent {
void OpenProcess(const std::string& task_name,
const std::string& task_raw,
const std::string& cmd_line);
void RunInnerChildProcess(const std::string& root_path,
const std::string& cmd_line);
private:
common::ThreadPool thread_pool_;
RpcClient* rpc_client_;
Expand Down
6 changes: 5 additions & 1 deletion src/agent/agent_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

extern std::string FLAGS_agent_port;
extern std::string FLAGS_master_addr;
extern std::string FLAGS_agent_work_dir;

static volatile bool s_quit = false;
static void SignalIntHandler(int /*sig*/)
Expand All @@ -28,7 +29,10 @@ int main(int argc, char* argv[])
FLAGS_agent_port = s;
} else if (sscanf(argv[i], "--master=%s", s) == 1) {
FLAGS_master_addr = s;
} else {
} else if (sscanf(argv[i], "--work_dir=%s", s) == 1) {
FLAGS_agent_work_dir = s;
}
else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1);
}
Expand Down
8 changes: 8 additions & 0 deletions src/flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,12 @@ std::string FLAGS_master_port = "8101";
std::string FLAGS_agent_port = "8102";
std::string FLAGS_master_addr = "localhost:" + FLAGS_master_port;

#ifdef AGENT_WORK_DIR
const char* work_dir = AGENT_WORK_DIR;
std::string FLAGS_agent_work_dir(work_dir);
#else
std::string FLAGS_agent_work_dir;
#endif


/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */