Skip to content

Commit

Permalink
Big fat refactoring of how redirections work. In fish 1.x and 2.0.0, …
Browse files Browse the repository at this point in the history
…the redirections for a process were flattened into a big list associated with the job, so there was no way to tell which redirections applied to each process. Each process therefore got all the redirections associated with the job. See fish-shell#877 for how this could manifest.

With this change, jobs only track their block-level redirections. Process level redirections are correctly associated with the process, and at exec time we stitch them together (block, pipe, and process redirects).

This fixes the weird issues where redirects bleed across pipelines (like fish-shell#877), and also allows us to play with the order in which redirections are applied, since the final list is constructed right before it's needed.  This lets us put pipes after block level redirections but before process level redirections, so that a 2>&1-type redirection gets picked up after the pipe, i.e. it should fix fish-shell#110

This is a significant change. The tests all pass. Cross your fingers.
  • Loading branch information
ridiculousfish committed Aug 20, 2013
1 parent f4f2847 commit 4899086
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 116 deletions.
166 changes: 97 additions & 69 deletions exec.cpp

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion exec.h
Expand Up @@ -42,7 +42,7 @@
*/
class parser_t;
void exec(parser_t &parser, job_t *j);
void exec_job(parser_t &parser, job_t *j);

/**
Evaluate the expression cmd in a subshell, add the outputs into the
Expand Down
7 changes: 6 additions & 1 deletion io.cpp
Expand Up @@ -128,7 +128,7 @@ io_buffer_t *io_buffer_t::create(bool is_input, int fd)
bool success = true;
if (fd == -1)
{
fd = is_input ? 0 : 1;
fd = is_input ? STDIN_FILENO : STDOUT_FILENO;
}
io_buffer_t *buffer_redirect = new io_buffer_t(fd, is_input);

Expand Down Expand Up @@ -208,6 +208,11 @@ void io_chain_t::push_front(const shared_ptr<io_data_t> &element)
this->insert(this->begin(), element);
}

void io_chain_t::append(const io_chain_t &chain)
{
this->insert(this->end(), chain.begin(), chain.end());
}

void io_remove(io_chain_t &list, const shared_ptr<const io_data_t> &element)
{
list.remove(element);
Expand Down
1 change: 1 addition & 0 deletions io.h
Expand Up @@ -189,6 +189,7 @@ class io_chain_t : public std::vector<shared_ptr<io_data_t> >
void remove(const shared_ptr<const io_data_t> &element);
void push_back(const shared_ptr<io_data_t> &element);
void push_front(const shared_ptr<io_data_t> &element);
void append(const io_chain_t &chain);

shared_ptr<const io_data_t> get_io_for_fd(int fd) const;
shared_ptr<io_data_t> get_io_for_fd(int fd);
Expand Down
21 changes: 13 additions & 8 deletions parser.cpp
Expand Up @@ -1168,9 +1168,9 @@ int parser_t::is_help(const wchar_t *s, int min_match) const
(len >= (size_t)min_match && (wcsncmp(L"--help", s, len) == 0));
}

job_t *parser_t::job_create(void)
job_t *parser_t::job_create()
{
job_t *res = new job_t(acquire_job_id());
job_t *res = new job_t(acquire_job_id(), this->block_io);
this->my_job_list.push_front(res);

job_set_flag(res,
Expand Down Expand Up @@ -1256,6 +1256,9 @@ void parser_t::parse_job_argument_list(process_t *p,
wcstring unmatched;
int unmatched_pos=0;

/* The set of IO redirections that we construct for the process */
io_chain_t process_io_chain;

/*
Test if this is the 'count' command. We need to special case
count in the shell, since it should display a help message on
Expand Down Expand Up @@ -1559,7 +1562,7 @@ void parser_t::parse_job_argument_list(process_t *p,

if (new_io.get() != NULL)
{
j->append_io(new_io);
process_io_chain.push_back(new_io);
}

}
Expand Down Expand Up @@ -1613,7 +1616,9 @@ void parser_t::parse_job_argument_list(process_t *p,
}
}

return;
/* Store our IO chain. The existing chain should be empty. */
assert(p->io_chain().empty());
p->set_io_chain(process_io_chain);
}

/*
Expand Down Expand Up @@ -2256,7 +2261,7 @@ void parser_t::skipped_exec(job_t * j)
{
if (!current_block->outer->skip)
{
exec(*this, j);
exec_job(*this, j);
return;
}
parser_t::pop_block();
Expand All @@ -2269,7 +2274,7 @@ void parser_t::skipped_exec(job_t * j)
const if_block_t *ib = static_cast<const if_block_t*>(current_block);
if (ib->if_expr_evaluated && ! ib->any_branch_taken)
{
exec(*this, j);
exec_job(*this, j);
return;
}
}
Expand All @@ -2278,7 +2283,7 @@ void parser_t::skipped_exec(job_t * j)
{
if (current_block->type() == SWITCH)
{
exec(*this, j);
exec_job(*this, j);
return;
}
}
Expand Down Expand Up @@ -2415,7 +2420,7 @@ void parser_t::eval_job(tokenizer_t *tok)
if (j->first_process->type==INTERNAL_BUILTIN && !j->first_process->next)
was_builtin = 1;
scoped_push<int> tokenizer_pos_push(&current_tokenizer_pos, job_begin_pos);
exec(*this, j);
exec_job(*this, j);

/* Only external commands require a new fishd barrier */
if (!was_builtin)
Expand Down
6 changes: 3 additions & 3 deletions parser.h
Expand Up @@ -352,6 +352,9 @@ class parser_t
void print_errors(wcstring &target, const wchar_t *prefix);
void print_errors_stderr();

/** Create a job */
job_t *job_create();

public:
std::vector<profile_item_t*> profile_items;

Expand Down Expand Up @@ -457,9 +460,6 @@ class parser_t
/** Return a description of the given blocktype */
const wchar_t *get_block_desc(int block) const;

/** Create a job */
job_t *job_create();

/** Removes a job */
bool job_remove(job_t *job);

Expand Down
14 changes: 7 additions & 7 deletions postfork.cpp
Expand Up @@ -292,7 +292,7 @@ static int handle_child_io(const io_chain_t &io_chain)
}


int setup_child_process(job_t *j, process_t *p)
int setup_child_process(job_t *j, process_t *p, const io_chain_t &io_chain)
{
bool ok=true;

Expand All @@ -303,7 +303,7 @@ int setup_child_process(job_t *j, process_t *p)

if (ok)
{
ok = (0 == handle_child_io(j->io_chain()));
ok = (0 == handle_child_io(io_chain));
if (p != 0 && ! ok)
{
exit_without_destructors(1);
Expand Down Expand Up @@ -379,7 +379,7 @@ pid_t execute_fork(bool wait_for_threads_to_die)
}

#if FISH_USE_POSIX_SPAWN
bool fork_actions_make_spawn_properties(posix_spawnattr_t *attr, posix_spawn_file_actions_t *actions, job_t *j, process_t *p)
bool fork_actions_make_spawn_properties(posix_spawnattr_t *attr, posix_spawn_file_actions_t *actions, job_t *j, process_t *p, const io_chain_t &io_chain)
{
/* Initialize the output */
if (posix_spawnattr_init(attr) != 0)
Expand Down Expand Up @@ -444,19 +444,19 @@ bool fork_actions_make_spawn_properties(posix_spawnattr_t *attr, posix_spawn_fil
err = posix_spawnattr_setsigmask(attr, &sigmask);

/* Make sure that our pipes don't use an fd that the redirection itself wants to use */
free_redirected_fds_from_pipes(j->io_chain());
free_redirected_fds_from_pipes(io_chain);

/* Close unused internal pipes */
std::vector<int> files_to_close;
get_unused_internal_pipes(files_to_close, j->io_chain());
get_unused_internal_pipes(files_to_close, io_chain);
for (size_t i = 0; ! err && i < files_to_close.size(); i++)
{
err = posix_spawn_file_actions_addclose(actions, files_to_close.at(i));
}

for (size_t idx = 0; idx < j->io_chain().size(); idx++)
for (size_t idx = 0; idx < io_chain.size(); idx++)
{
shared_ptr<const io_data_t> io = j->io_chain().at(idx);
const shared_ptr<const io_data_t> io = io_chain.at(idx);

if (io->io_mode == IO_FD)
{
Expand Down
6 changes: 3 additions & 3 deletions postfork.h
Expand Up @@ -53,13 +53,13 @@ int set_child_group(job_t *j, process_t *p, int print_errors);
\param j the job to set up the IO for
\param p the child process to set up
\param io_chain the IO chain to use (ignores the job's iochain)
\param io_chain the IO chain to use
\return 0 on sucess, -1 on failiure. When this function returns,
signals are always unblocked. On failiure, signal handlers, io
redirections and process group of the process is undefined.
*/
int setup_child_process(job_t *j, process_t *p);
int setup_child_process(job_t *j, process_t *p, const io_chain_t &io_chain);

/* Call fork(), optionally waiting until we are no longer multithreaded. If the forked child doesn't do anything that could allocate memory, take a lock, etc. (like call exec), then it's not necessary to wait for threads to die. If the forked child may do those things, it should wait for threads to die.
*/
Expand All @@ -73,7 +73,7 @@ void safe_report_exec_error(int err, const char *actual_cmd, const char * const

#if FISH_USE_POSIX_SPAWN
/* Initializes and fills in a posix_spawnattr_t; on success, the caller should destroy it via posix_spawnattr_destroy */
bool fork_actions_make_spawn_properties(posix_spawnattr_t *attr, posix_spawn_file_actions_t *actions, job_t *j, process_t *p);
bool fork_actions_make_spawn_properties(posix_spawnattr_t *attr, posix_spawn_file_actions_t *actions, job_t *j, process_t *p, const io_chain_t &io_chain);
#endif

#endif
25 changes: 19 additions & 6 deletions proc.cpp
Expand Up @@ -537,10 +537,10 @@ process_t::~process_t()
delete this->next;
}

job_t::job_t(job_id_t jobid) :
job_t::job_t(job_id_t jobid, const io_chain_t &bio) :
command_str(),
command_narrow(),
io(),
block_io(bio),
first_process(NULL),
pgid(0),
tmodes(),
Expand All @@ -556,6 +556,17 @@ job_t::~job_t()
release_job_id(job_id);
}

/* Return all the IO redirections. Start with the block IO, then walk over the processes */
io_chain_t job_t::all_io_redirections() const
{
io_chain_t result = this->block_io;
for (process_t *p = this->first_process; p != NULL; p = p->next)
{
result.append(p->io_chain());
}
return result;
}

/* This is called from a signal handler */
void job_handle_signal(int signal, siginfo_t *info, void *con)
{
Expand Down Expand Up @@ -874,9 +885,10 @@ static int select_try(job_t *j)

FD_ZERO(&fds);

for (size_t idx = 0; idx < j->io_chain().size(); idx++)
const io_chain_t chain = j->all_io_redirections();
for (size_t idx = 0; idx < chain.size(); idx++)
{
const io_data_t *io = j->io_chain().at(idx).get();
const io_data_t *io = chain.at(idx).get();
if (io->io_mode == IO_BUFFER)
{
CAST_INIT(const io_pipe_t *, io_pipe, io);
Expand Down Expand Up @@ -915,9 +927,10 @@ static void read_try(job_t *j)
/*
Find the last buffer, which is the one we want to read from
*/
for (size_t idx = 0; idx < j->io_chain().size(); idx++)
const io_chain_t chain = j->all_io_redirections();
for (size_t idx = 0; idx < chain.size(); idx++)
{
io_data_t *d = j->io_chain().at(idx).get();
io_data_t *d = chain.at(idx).get();
if (d->io_mode == IO_BUFFER)
{
buff = static_cast<io_buffer_t *>(d);
Expand Down
26 changes: 20 additions & 6 deletions proc.h
Expand Up @@ -134,6 +134,8 @@ class process_t
/* narrow copy of argv0 so we don't have to convert after fork */
narrow_string_rep_t argv0_narrow;

io_chain_t process_io_chain;

/* No copying */
process_t(const process_t &rhs);
void operator=(const process_t &rhs);
Expand Down Expand Up @@ -190,6 +192,17 @@ class process_t
return argv0_narrow.get();
}

/* IO chain getter and setter */
const io_chain_t &io_chain() const
{
return process_io_chain;
}

void set_io_chain(const io_chain_t &chain)
{
this->process_io_chain = chain;
}

/** actual command to pass to exec in case of EXTERNAL or INTERNAL_EXEC. */
wcstring actual_cmd;

Expand Down Expand Up @@ -285,17 +298,16 @@ class job_t
/* narrow copy so we don't have to convert after fork */
narrow_string_rep_t command_narrow;

/** List of all IO redirections for this job. */
io_chain_t io;
friend void exec(parser_t &parser, job_t *j);
/* The IO chain associated with the block */
const io_chain_t block_io;

/* No copying */
job_t(const job_t &rhs);
void operator=(const job_t &);

public:

job_t(job_id_t jobid);
job_t(job_id_t jobid, const io_chain_t &bio);
~job_t();

/** Returns whether the command is empty. */
Expand Down Expand Up @@ -360,9 +372,11 @@ class job_t
*/
unsigned int flags;

const io_chain_t &io_chain() const { return this->io; }
/* Returns the block IO redirections associated with the job. These are things like the IO redirections associated with the begin...end statement. */
const io_chain_t &block_io_chain() const { return this->block_io; }

void append_io(const shared_ptr<io_data_t> & new_io) { this->io.push_back(new_io); }
/* Fetch all the IO redirections associated with the job */
io_chain_t all_io_redirections() const;
};

/**
Expand Down
24 changes: 12 additions & 12 deletions reader.cpp
Expand Up @@ -1274,11 +1274,11 @@ static void run_pager(const wcstring &prefix, int is_quoted, const std::vector<c
wcstring prefix_esc;
char *foo;

shared_ptr<io_buffer_t> in(io_buffer_t::create(true, 3));
shared_ptr<io_buffer_t> out(io_buffer_t::create(false, 4));
shared_ptr<io_buffer_t> in_buff(io_buffer_t::create(true, 3));
shared_ptr<io_buffer_t> out_buff(io_buffer_t::create(false, 4));

// The above may fail e.g. if we have too many open fds
if (in.get() == NULL || out.get() == NULL)
if (in_buff.get() == NULL || out_buff.get() == NULL)
return;

wchar_t *escaped_separator;
Expand Down Expand Up @@ -1350,26 +1350,26 @@ static void run_pager(const wcstring &prefix, int is_quoted, const std::vector<c
free(escaped_separator);

foo = wcs2str(msg.c_str());
in->out_buffer_append(foo, strlen(foo));
in_buff->out_buffer_append(foo, strlen(foo));
free(foo);

term_donate();
parser_t &parser = parser_t::principal_parser();
io_chain_t io_chain;
io_chain.push_back(out);
io_chain.push_back(in);
io_chain.push_back(out_buff);
io_chain.push_back(in_buff);
parser.eval(cmd, io_chain, TOP);
term_steal();

out->read();
out_buff->read();

int nil=0;
out->out_buffer_append((char *)&nil, 1);
const char zero = 0;
out_buff->out_buffer_append(&zero, 1);

const char *outbuff = out->out_buffer_ptr();
if (outbuff)
const char *out_data = out_buff->out_buffer_ptr();
if (out_data)
{
const wcstring str = str2wcstring(outbuff);
const wcstring str = str2wcstring(out_data);
size_t idx = str.size();
while (idx--)
{
Expand Down

0 comments on commit 4899086

Please sign in to comment.