From da59d9d19f54aa108d7de02ea7fe3323a7f1bf9b Mon Sep 17 00:00:00 2001 From: Eric Olson Date: Tue, 3 Sep 2013 13:01:44 -0400 Subject: [PATCH] Add support for returning results from child functions via IPC --- README.md | 1 + fork_daemon.php | 439 ++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 423 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 6f03670..333b642 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,7 @@ A library to make setup and management of forking daemons in PHP easy. ## Features - Easy management of PHP forks +- Return result of children by callback or polling parent for results - Splitting work units into buckets - Preforking callbacks to manage resources before forking - Dynamic setting of number of children / work per child diff --git a/fork_daemon.php b/fork_daemon.php index 2f8d999..9a9ee90 100644 --- a/fork_daemon.php +++ b/fork_daemon.php @@ -14,6 +14,7 @@ class fork_daemon */ const WORKER = 0; const HELPER = 1; + const STOPPED = 2; /** * Bucket constants @@ -33,6 +34,13 @@ class fork_daemon const LOG_LEVEL_INFO = 6; const LOG_LEVEL_DEBUG = 7; + /** + * Socket constants + * + * @access public + */ + const SOCKET_HEADER_SIZE = 4; + /** * Variables */ @@ -109,6 +117,13 @@ class fork_daemon */ private $child_function_sighup = array(self::DEFAULT_BUCKET => ''); + /** + * Function the parent invokes when a child has results to post + * @access private + * @var integer $parent_function_results + */ + private $parent_function_results = array(self::DEFAULT_BUCKET => ''); + /** * Max number of seconds to wait for a child process * exit once it has been requested to exit @@ -178,6 +193,13 @@ class fork_daemon */ private $forked_children = array(); + /** + * number of tracked children (not stopped) + * @access private + * @var array $forked_children_count + */ + private $forked_children_count = 0; + /** * track the work units to process * @access private @@ -192,6 +214,13 @@ class fork_daemon */ private $buckets = array(0 => self::DEFAULT_BUCKET); + /** + * for the parent the track the results received from chilren + * @access private + * @var array $work_units + */ + private $results = array(self::DEFAULT_BUCKET => array()); + /** * within a child, track the bucket the child exists in. note, * this shouldn't be set or referenced in the parent process @@ -541,6 +570,24 @@ public function register_parent_child_exit($function_name, $bucket = self::DEFAU return false; } + /** + * Allows the app to set the call back function for when the a child has results + * @access public + * @param string name of function to be called. + * @return bool true if the callback was successfully registered, false if it failed + */ + public function register_parent_results($function_name, $bucket = self::DEFAULT_BUCKET) + { + // call parent function + if ( method_exists($this, $function_name) || function_exists($function_name) ) + { + $this->parent_function_results[$bucket] = $function_name; + return true; + } + + return false; + } + /** * Allows the app to set the call back function for logging * @access public @@ -621,6 +668,8 @@ public function signal_handler_sighup($signal_number) { foreach ($this->forked_children as $pid => $pid_info) { + if ($pid_info['status'] == self::STOPPED) + continue; $this->log('parent process [' . getmypid() . '] sending sighup to child ' . $pid, self::LOG_LEVEL_DEBUG); posix_kill($pid, SIGHUP); } @@ -675,8 +724,9 @@ public function signal_handler_sigchild($signal_number) if ($child['status'] == self::WORKER) $this->invoke_callback($this->parent_function_child_exited[ $this->forked_children[$child_pid]['bucket'] ], array($child_pid, $this->forked_children[$child_pid]['identifier']), true); - // stop tracking the childs PID - unset($this->forked_children[$child_pid]); + // stop the child pid + $this->forked_children[$child_pid]['status'] = self::STOPPED; + $this->forked_children_count--; // respawn helper processes if ($child['status'] == self::HELPER && $child['respawn'] === true) @@ -684,6 +734,9 @@ public function signal_handler_sigchild($signal_number) $this->log('Helper process ' . $child_pid . ' died, respawning', self::LOG_LEVEL_INFO); $this->helper_process_spawn($child['function'], $child['arguments'], $child['identifier'], true); } + + // Poll for results from any children + $this->post_results($child['bucket']); } elseif ($child_pid < 0) { @@ -714,6 +767,9 @@ public function signal_handler_sigint($signal_number) { foreach ($this->forked_children as $pid => &$pid_info) { + if ($pid_info['status'] == self::STOPPED) + continue; + // tell helpers not to respawn if ($pid_info['status'] == self::HELPER) $pid_info['respawn'] = false; @@ -730,22 +786,26 @@ public function signal_handler_sigint($signal_number) $start_time = time(); // wait for child processes to go away - while (count($this->forked_children) > 0) + while ($this->forked_children_count > 0) { if (time() > ($start_time + $this->children_max_timeout)) { foreach ($this->forked_children as $pid => $child) { + if ($child['status'] == self::STOPPED) + continue; + $this->log('force killing child pid: ' . $pid, self::LOG_LEVEL_INFO); posix_kill($pid, SIGKILL); - // remove the killed process from being tracked - unset($this->forked_children[$pid]); + // stop the child + $this->forked_children[$pid]['status'] = self::STOPPED; + $this->forked_children_count--; } } else { - $this->log('waiting ' . ($start_time + $this->children_max_timeout - time()) . ' seconds for ' . count($this->forked_children) . ' children to clean up', self::LOG_LEVEL_INFO); + $this->log('waiting ' . ($start_time + $this->children_max_timeout - time()) . ' seconds for ' . $this->forked_children_count . ' children to clean up', self::LOG_LEVEL_INFO); sleep(1); $this->housekeeping_check(); } @@ -759,6 +819,8 @@ public function signal_handler_sigint($signal_number) // invoke child cleanup callback if (isset($this->child_bucket)) $this->invoke_callback($this->child_function_exit[$this->child_bucket], $parameters = array($this->child_bucket), true); + + // close the socket } exit(-1); @@ -836,7 +898,7 @@ public function is_work_running($identifier, $bucket = self::DEFAULT_BUCKET) { foreach ($this->forked_children as $info) { - if (($info['identifier'] == $identifier) && ($info['bucket'] == $bucket)) + if (($info['status'] != self::STOPPED) && ($info['identifier'] == $identifier) && ($info['bucket'] == $bucket)) { return true; } @@ -848,13 +910,18 @@ public function is_work_running($identifier, $bucket = self::DEFAULT_BUCKET) /* * Return array of currently running children * - * @param string unique identifier for the work * @param int $bucket the bucket * @return bool true if child has work, false if not */ public function work_running($bucket = self::DEFAULT_BUCKET) { - return $this->forked_children; + $results = array(); + foreach ($this->forked_children as $pid => $child) + { + if ($child['status'] != self::STOPPED) + $results[$pid] = $child; + } + return $results; } /** @@ -935,27 +1002,50 @@ public function work_sets($bucket = self::DEFAULT_BUCKET) * Return the number of children running * * @param int $bucket the bucket to use + * @param bool $show_pending True to show children that are done, + * but not yet had their results retrieved * @return int the number of children running */ - public function children_running($bucket = self::DEFAULT_BUCKET) + public function children_running($bucket = self::DEFAULT_BUCKET, $show_pending = false) { // force reaping of children $this->signal_handler_sigchild(SIGCHLD); // return global count if bucket is default if ($bucket == self::DEFAULT_BUCKET) - return count($this->forked_children); + return ($show_pending ? count($this->forked_children) : $this->forked_children_count); // count within the specified bucket $count = 0; foreach ($this->forked_children as $child) { - if ($child['bucket'] == $bucket) + if ($show_pending) + { + if ($child['bucket'] == $bucket) + $count++; + } + else if (($child['bucket'] == $bucket) && ($child['status'] != self::STOPPED)) + { $count++; + } } + return $count; } + /** + * Returns the number of pending child items, including running children and + * work sets that have not been allocated. Children running includes those + * that have not had their results retrieved yet. + * + * @param type $bucket The bucket to check for pending children items + * @return int Number of pending children items + */ + public function children_pending($bucket = self::DEFAULT_BUCKET) + { + return $this->children_running($bucket, true) + $this->work_sets_count($bucket); + } + /** * Check if the current processes is a child * @@ -983,6 +1073,9 @@ public function helper_process_spawn($function_name, $arguments = array(), $iden { if ((is_array($function_name) && method_exists($function_name[0], $function_name[1])) || function_exists($function_name)) { + // init the IPC sockets + list($socket_child, $socket_parent) = $this->ipc_init(); + // do not process signals while we are forking declare(ticks = 0); $pid = pcntl_fork(); @@ -995,8 +1088,16 @@ public function helper_process_spawn($function_name, $arguments = array(), $iden { declare(ticks = 1); + // close our socket (we only need the one to the parent) + socket_close($socket_child); + + // execute the function $this->log('Calling function ' . $function_name, self::LOG_LEVEL_DEBUG); - call_user_func_array($function_name, $arguments); + $result = call_user_func_array($function_name, $arguments); + + // send the response to the parent + self::socket_send($socket_parent, $result); + exit(0); } else @@ -1004,6 +1105,10 @@ public function helper_process_spawn($function_name, $arguments = array(), $iden declare(ticks = 1); $this->log('Spawned new helper process with pid ' . $pid, self::LOG_LEVEL_INFO); + // close our socket (we only need the one to the child) + socket_close($socket_parent); + + // track the child $this->forked_children[$pid] = array( 'ctime' => time(), 'identifier' => $idenfifier, @@ -1012,12 +1117,14 @@ public function helper_process_spawn($function_name, $arguments = array(), $iden 'respawn' => true, 'function' => $function_name, 'arguments' => $arguments, + 'socket' => $socket_child, ); + $this->forked_children_count++; } } else { - $this->log("Unable to spawn undefined helper function '" . $function_name . "'", self::LOG_LEVEL_ERR); + $this->log("Unable to spawn undefined helper function '" . $function_name . "'", self::LOG_LEVEL_CRIT); } } @@ -1057,7 +1164,7 @@ public function kill_child_pid($pids, $kill_delay = 30) foreach ($pids as $index => $pid) { // make sure we own this pid - if (! array_key_exists($pid, $this->forked_children)) + if (! array_key_exists($pid, $this->forked_children) || $this->forked_children[$pid]['status'] == self::STOPPED) { $this->log('Skipping kill request on pid ' . $pid . ' because we dont own it', self::LOG_LEVEL_INFO); unset($pids[$index]); @@ -1078,7 +1185,7 @@ public function kill_child_pid($pids, $kill_delay = 30) foreach ($pids as $index => $pid) { // check if the pid exited gracefully - if (! array_key_exists($pid, $this->forked_children)) + if ($this->forked_children[$pid]['status'] == self::STOPPED) { $this->log('Pid ' . $pid . ' has exited gracefully', self::LOG_LEVEL_INFO); unset($pids[$index]); @@ -1172,6 +1279,181 @@ public function process_work($blocking = true, $bucket = self::DEFAULT_BUCKET, $ return true; } + /** + * Returns the first result available from the bucket. This will run + * a non-blocking poll of the children for updated results. + * + * @param string $bucket The bucket to check + * @return mixed The data retrieved from a child process on the buckets + */ + public function get_result($bucket = self::DEFAULT_BUCKET) + { + // check for additional results + $this->post_results($bucket); + + if (! $this->has_result($bucket)) + return null; + + return array_shift($this->results[$bucket]); + } + + /** + * Returns all the results currently in the results queue. This will + * run a non-blocking poll of the children for updated results. + * + * @param string $bucket The bucket to retrieves results + * @return mixed Array of results from each child that has finished. + */ + public function get_all_results($bucket = self::DEFAULT_BUCKET) + { + // check for additional results + $this->post_results($bucket); + + if (! $this->has_result($bucket)) + return array(); + + $results = $this->results[$bucket]; + unset($this->results[$bucket]); + + return $results; + } + + /** + * Checks if there is a result on the bucket. Before checking, + * runs a non-blocking poll of the children for updated results. + * + * @param string $bucket The bucket to check + * @return int Returns true if there is a result + */ + public function has_result($bucket = self::DEFAULT_BUCKET) + { + // check for additional results + $this->post_results($bucket); + + return (! empty($this->results[$bucket])); + } + + /** + * Checks if any changed child sockets are in the bucket. + * + * @param type $bucket The bucket to get results in + * @return type Returns the number of changed sockets for children workers in $bucket, + * or empty array if none. + */ + private function get_changed_sockets($bucket = self::DEFAULT_BUCKET) + { + $dummy = null; + // grab all the children sockets + $sockets = array(); + foreach ($this->forked_children as $pid => $child) + { + if ($child['bucket'] == $bucket) + $sockets[$pid] = $child['socket']; + } + + if (! empty($sockets)) + { + // find changed sockets and return the array of them + $result = socket_select($sockets, $dummy, $dummy, $timeout); + if ($result !== false && $result > 0) + return $sockets; + + } + + return null; + } + + /** + * Returns any pending results from the child sockets. If a + * child has no results and it has status self::STOPPED, this will remove + * the child record from $this->forked_children. + * + * NOTE: This must be polled to check for changed sockets. + * + * @param type $blocking Set to true to block until a result comes in + * @param type $bucket The bucket to look in + * @return type The result of the child worker + */ + private function fetch_results($blocking = true, $timeout = null, $bucket = self::DEFAULT_BUCKET) + { + $start = microtime(true); + $results = array(); + + // loop while there is pending children and pending sockets + do + { + $ready_sockets = $this->get_changed_sockets($bucket); + if (is_array($ready_sockets)) + { + foreach ($ready_sockets as $pid => $socket) + { + $result = $this->socket_receive($socket); + if ($result !== false && (! is_null($result))) + { + $this->forked_children[$pid]['last_active'] = $start; + $results[$pid] = $result; + } + } + } + + // clean up forked children that have stopped and did not have recently + // active sockets. + foreach ($this->forked_children as $pid => $child) + { + if (($child['last_active'] < $start) && ($child['status'] == self::STOPPED)) + { + unset($this->forked_children[$pid]); + } + } + + // check if timed out + if ($timeout && (microtime(true) - $start > $timeout)) + return $results; + + // return null if not blocking and we haven't seen results + if (! $blocking) + { + return $results; + } + } + while (count($this->forked_children) > 0); + + return $results; + } + + /** + * Posts any new results to a callback function if one is available, or stores + * them to the internal results storage if not. This does not block and will + * post any results that are available, so call while children are running + * to check and post more results. + * + * NOTE: This should be polled to update results. + * + * @param type $bucket The bucket to post the results in + * @return type Returns true on successfully posting results, even if none + * to post. Returns false on error from this function or error from + * the $this->parent_function_results callback. + */ + private function post_results($bucket = self::DEFAULT_BUCKET) + { + // fetch all the results up to this point + $results = $this->fetch_results(false, null, $bucket); + if (is_array($results) && empty($results)) + return true; + + if (! empty($this->parent_function_results[$bucket])) + { + if ($this->invoke_callback($this->parent_function_results[$bucket], array($results), true) === fale) + return false; + } + else + { + $this->results[$bucket] += $results; + } + + return true; + } + /** * Pulls items off the work queue for processing * @@ -1204,6 +1486,9 @@ private function process_work_unit($bucket = self::DEFAULT_BUCKET) $this->fork_work_unit(array($child_work_units), '', $bucket); } } + + // Poll for results from children + $this->post_results($bucket); } /** @@ -1225,6 +1510,9 @@ private function fork_work_unit($work_unit, $identifier = '', $bucket = self::DE $this->invoke_callback($function, array(), true); } + // init the IPC sockets + list($socket_child, $socket_parent) = $this->ipc_init(); + // turn off signals temporarily to prevent a SIGCHLD from interupting the parent before $this->forked_children is updated declare(ticks = 0); @@ -1252,11 +1540,16 @@ private function fork_work_unit($work_unit, $identifier = '', $bucket = self::DE 'identifier' => $identifier, 'bucket' => $bucket, 'status' => self::WORKER, + 'socket' => $socket_child, ); + $this->forked_children_count++; // turn back on signals now that $this->forked_children has been updated declare(ticks = 1); + // close our socket (we only need the one to the child) + socket_close($socket_parent); + // debug logging $this->log('forking child ' . $pid . ' for bucket ' . $bucket, self::LOG_LEVEL_DEBUG); @@ -1272,6 +1565,7 @@ private function fork_work_unit($work_unit, $identifier = '', $bucket = self::DE // free up unneeded parent memory for child process $this->work_units = null; $this->forked_children = null; + $this->results = null; // set child properties $this->child_bucket = $bucket; @@ -1279,11 +1573,17 @@ private function fork_work_unit($work_unit, $identifier = '', $bucket = self::DE // turn signals on for the child declare(ticks = 1); + // close our socket (we only need the one to the parent) + socket_close($socket_child); + // re-seed the random generator to prevent clone from parent srand(); // child run callback - $this->invoke_callback($this->child_function_run[$bucket], $work_unit, false); + $result = $this->invoke_callback($this->child_function_run[$bucket], $work_unit, false); + + // send the result to the parent + self::socket_send($socket_parent, $result); // delay the child's exit slightly to avoid race conditions usleep(500); @@ -1322,6 +1622,9 @@ private function kill_maxtime_violators() { foreach ($this->forked_children as $pid => $pid_info) { + if ($pid_info['status'] == self::STOPPED) + continue; + if ((time() - $pid_info['ctime']) > $this->child_max_run_time[$pid_info['bucket']]) { $this->log('Force kill $pid has run too long', self::LOG_LEVEL_INFO); @@ -1376,6 +1679,108 @@ private function invoke_callback($function_name, $parameters, $optional = false) } } + /** + * Initialize interprocess communication by setting up a pair + * of sockets and returning them as an array. + * + * @return type + */ + private function ipc_init() + { + // windows needs AF_INET + $domain = strtoupper(substr(PHP_OS, 0, 3)) == 'WIN' ? AF_INET : AF_UNIX; + + // create a socket pair for IPC + $sockets = array(); + if (socket_create_pair($domain, SOCK_STREAM, 0, $sockets) === false) + { + $this->log('socket_create_pair failed: ' . socket_strerror(socket_last_error()), self::LOG_LEVEL_CRIT); + return false; + } + + // return the sockets + return $sockets; + } + + /** + * Sends a serializable message to the socket. + * + * @param type $socket The socket to send the message on + * @param type $message The serializable message to send + * @return type Returns true on success, false on failure + */ + private function socket_send($socket, $message) + { + $serialized_message = @serialize($message); + if ($serialized_message == false) + { + $this->log('socket_send failed to serialize message', self::LOG_LEVEL_CRIT); + return false; + } + + $header = pack('N', strlen($serialized_message)); + $data = $header . $serialized_message; + $bytes_left = strlen($data); + while ($bytes_left > 0) + { + $bytes_sent = @socket_write($socket, $data); + if ($bytes_sent === false) + { + $this->log('socket_send error: ' . socket_strerror(socket_last_error()), self::LOG_LEVEL_CRIT); + return false; + } + + $bytes_left -= $bytes_sent; + $data = substr($data, $bytes_sent); + } + + return true; + } + + /** + * Receives a serialized message from the socket. + * + * @param type $socket Thes socket to receive the message from + * @return type Returns true on success, false on failure + */ + private function socket_receive($socket) + { + // initially read to the length of the header size, then + // expand to read more + $bytes_total = self::SOCKET_HEADER_SIZE; + $bytes_read = 0; + $have_header = false; + $socket_message = ''; + while ($bytes_read < $bytes_total) + { + $read = @socket_read($socket, $bytes_total - $bytes_read); + if ($read === false) + { + $this->log('socket_receive error: ' . socket_strerror(socket_last_error()), self::LOG_LEVEL_CRIT); + return false; + } + + // blank socket_read means done + if ($read == '') + break; + + $bytes_read += strlen($read); + $socket_message .= $read; + + if (!$have_header && $bytes_read >= self::SOCKET_HEADER_SIZE) + { + $have_header = true; + list($bytes_total) = array_values(unpack('N', $socket_message)); + $bytes_read = 0; + $socket_message = ''; + } + } + + $message = @unserialize($socket_message); + + return $message; + } + /** * Log a message *