Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Improve fb_call_user_func_async

Summary:
This diff improves fb_call_user_func_async to make it more suitable as an
alternative to call_user_func_async.

When I talked with various devs about using fb_call_user_func_async instead
of call_user_func_async, it sounded like switching was painful for a few
reasons:

  1) Xbox is not enabled by default, the user must set config options to
     enable it.
  2) The config options for xbox are global, meaning that multiple
     developers that use xbox must 'share' a single initial document and
     a single xbox_process_message function. This can require a scheme for
     multiplexing different kinds of xbox messages if there are multiple
     developers using xbox in different ways.

fb_call_user_func_async now takes an additional argument called the
'initial document'. The user provides an initial document that is used to
initialize the worker thread. At a minimum, the initial document needs to
pull in definitions for the function being called and the classes of all
the arguments. The idea here is that a developer who wants to use
fb_call_user_func_async would need to do the one-time setup of writing an
initial document that pulls in everything their workers need, and then they
could write a handy wrapper like so:

  function start_foo_worker($fn, $args) {
    return fb_call_user_func_array_async('foo_worker_init.php', $fn, $args);
  }

This diff tries to address the pain points listed above by doing the
following:

  1) Make xbox enabled by default. To avoid the perf hit of creating a
     bunch of xbox worker threads at process start time, I changed the
     JobQueueDispatcher to lazily create worker threads. This way, if
     xbox is not used then we don't create any xbox worker threads.
  2) Add logic to xbox so that each xbox task can have a different
     initial document.
  3) Fix a bug where hphp_process_exit() was returning while xbox worker
     threads were still running.

Blame Rev:

Reviewers: myang, mwilliams, qigao, kma, je

CC:

Test Plan:
make fast_tests
make slow_tests
Run www in prod
Write some command line scripts that use fb_call_user_func_async

Revert Plan:

Differential Revision: 320810
  • Loading branch information...
commit 7917c1ec4dd7f6a00f706651dba70bcb5deaeede 1 parent 39a8fcf
@paroski paroski authored scottmac committed
View
2  src/idl/function.idl.php
@@ -117,7 +117,7 @@
DefineFunction(
array(
'name' => "call_user_func_array",
- 'desc' => "Call a user defined function with the parameters in param_arr.",
+ 'desc' => "Call a user defined function with the parameters in params.",
'flags' => HasDocComment | HasOptFunction | NoProfile | ContextSensitive,
'opt' => "hphp_opt_call_user_func",
'return' => array(
View
18 src/idl/server.idl.php
@@ -324,6 +324,24 @@
DefineFunction(
array(
+ 'name' => "xbox_process_call_message",
+ 'desc' => "This function is invoked by the xbox facility to start an xbox call task. This function is not intended to be called directly by user code.",
+ 'flags' => HasDocComment | HipHopSpecific,
+ 'return' => array(
+ 'type' => Variant,
+ 'desc' => "The return value of the xbox call task.",
+ ),
+ 'args' => array(
+ array(
+ 'name' => "msg",
+ 'type' => String,
+ 'desc' => "The call message.",
+ ),
+ ),
+ ));
+
+DefineFunction(
+ array(
'name' => "xbox_get_thread_timeout",
'desc' => "Gets the timeout (maximum duration), in seconds, of the current xbox thread. Throws for non-xbox threads.",
'flags' => HasDocComment | HipHopSpecific,
View
3  src/runtime/base/program_functions.cpp
@@ -284,6 +284,7 @@ static bool handle_exception(ExecutionContext *context, std::string &errorMsg,
Logger::Error("%s", errorMsg.c_str());
error = true;
} catch (const Exception &e) {
+ errorMsg = "";
if (where == HandlerException) {
errorMsg = "Exception handler threw an exception: ";
}
@@ -302,6 +303,7 @@ static bool handle_exception(ExecutionContext *context, std::string &errorMsg,
Logger::Error("%s", errorMsg.c_str());
}
} catch (const Object &e) {
+ errorMsg = "";
if (where == HandlerException) {
errorMsg = "Exception handler threw an object exception: ";
}
@@ -1265,6 +1267,7 @@ void hphp_session_exit() {
void hphp_process_exit() {
FiberAsyncFunc::Stop();
+ XboxServer::Stop();
Eval::Debugger::Stop();
Extension::ShutdownModules();
LightProcess::Close();
View
8 src/runtime/base/runtime_option.cpp
@@ -154,7 +154,7 @@ VirtualHostPtrVec RuntimeOption::VirtualHosts;
IpBlockMapPtr RuntimeOption::IpBlocks;
SatelliteServerInfoPtrVec RuntimeOption::SatelliteServerInfos;
-int RuntimeOption::XboxServerThreadCount = 0;
+int RuntimeOption::XboxServerThreadCount = 10;
int RuntimeOption::XboxServerMaxQueueLength = INT_MAX;
int RuntimeOption::XboxServerPort = 0;
int RuntimeOption::XboxDefaultLocalTimeoutMilliSeconds = 500;
@@ -165,7 +165,7 @@ std::string RuntimeOption::XboxServerInfoWarmupDoc;
std::string RuntimeOption::XboxServerInfoReqInitFunc;
std::string RuntimeOption::XboxServerInfoReqInitDoc;
bool RuntimeOption::XboxServerInfoAlwaysReset = false;
-bool RuntimeOption::XboxServerLogInfo = true;
+bool RuntimeOption::XboxServerLogInfo = false;
std::string RuntimeOption::XboxProcessMessageFunc = "xbox_process_message";
std::string RuntimeOption::XboxPassword;
std::set<std::string> RuntimeOption::XboxPasswords;
@@ -842,7 +842,7 @@ void RuntimeOption::Load(Hdf &config, StringVec *overwrites /* = NULL */) {
}
{
Hdf xbox = config["Xbox"];
- XboxServerThreadCount = xbox["ServerInfo.ThreadCount"].getInt32(0);
+ XboxServerThreadCount = xbox["ServerInfo.ThreadCount"].getInt32(10);
XboxServerMaxQueueLength =
xbox["ServerInfo.MaxQueueLength"].getInt32(INT_MAX);
if (XboxServerMaxQueueLength < 0) XboxServerMaxQueueLength = INT_MAX;
@@ -857,7 +857,7 @@ void RuntimeOption::Load(Hdf &config, StringVec *overwrites /* = NULL */) {
XboxServerInfoReqInitFunc = xbox["ServerInfo.RequestInitFunction"].get("");
XboxServerInfoReqInitDoc = xbox["ServerInfo.RequestInitDocument"].get("");
XboxServerInfoAlwaysReset = xbox["ServerInfo.AlwaysReset"].getBool(false);
- XboxServerLogInfo = xbox["ServerInfo.LogInfo"].getBool(true);
+ XboxServerLogInfo = xbox["ServerInfo.LogInfo"].getBool(false);
XboxProcessMessageFunc =
xbox["ProcessMessageFunc"].get("xbox_process_message");
}
View
2  src/runtime/base/server/http_server.cpp
@@ -172,6 +172,8 @@ void HttpServer::onServerShutdown() {
Logger::Info("debugger server stopped");
}
+ XboxServer::Stop();
+
// When a new instance of HPHP has taken over our page server socket,
// stop our admin server and satellites so it can acquire those ports.
for (unsigned int i = 0; i < m_satellites.size(); i++) {
View
15 src/runtime/base/server/rpc_request_handler.cpp
@@ -35,7 +35,15 @@ RPCRequestHandler::RPCRequestHandler(bool info /* = true */)
: m_count(0), m_reset(false),
m_returnEncodeType(Json) {
hphp_session_init();
- m_context = hphp_context_init();
+ bool isServer = (strcmp(RuntimeOption::ExecutionMode, "srv") == 0);
+ if (isServer) {
+ m_context = hphp_context_init();
+ } else {
+ // In command line mode, we want the xbox workers to
+ // output to STDOUT
+ m_context = g_context.getNoCheck();
+ m_context->obSetImplicitFlush(true);
+ }
m_created = time(0);
Logger::ResetRequestCount();
@@ -181,7 +189,8 @@ bool RPCRequestHandler::executePHPFunction(Transport *transport,
Variant funcRet;
string errorMsg = "Internal Server Error";
string warmupDoc, reqInitFunc, reqInitDoc;
- if (m_serverInfo) {
+ reqInitDoc = transport->getHeader("ReqInitDoc");
+ if (reqInitDoc.empty() && m_serverInfo) {
warmupDoc = m_serverInfo->getWarmupDoc();
reqInitFunc = m_serverInfo->getReqInitFunc();
reqInitDoc = m_serverInfo->getReqInitDoc();
@@ -193,7 +202,7 @@ bool RPCRequestHandler::executePHPFunction(Transport *transport,
if (!reqInitDoc.empty()) reqInitDoc = canonicalize_path(reqInitDoc, "", 0);
if (!reqInitDoc.empty()) {
- reqInitDoc = getSourceFilename(reqInitDoc, sourceRootInfo);
+ reqInitDoc = getSourceFilename(reqInitDoc, sourceRootInfo);
}
bool runOnce = false;
View
59 src/runtime/base/server/xbox_server.cpp
@@ -23,6 +23,7 @@
#include <util/job_queue.h>
#include <util/lock.h>
#include <util/logger.h>
+#include <system/lib/systemlib.h>
using namespace std;
@@ -31,10 +32,12 @@ namespace HPHP {
class XboxTransport : public Transport, public Synchronizable {
public:
- XboxTransport(CStrRef message) : m_refCount(0), m_done(false), m_code(0) {
+ XboxTransport(CStrRef message, CStrRef reqInitDoc = "")
+ : m_refCount(0), m_done(false), m_code(0) {
gettime(CLOCK_MONOTONIC, &m_queueTime);
m_message.append(message.data(), message.size());
+ m_reqInitDoc.append(reqInitDoc.data(), reqInitDoc.size());
disableCompression(); // so we don't have to decompress during sendImpl()
}
@@ -44,6 +47,9 @@ class XboxTransport : public Transport, public Synchronizable {
* Implementing Transport...
*/
virtual const char *getUrl() {
+ if (!m_reqInitDoc.empty()) {
+ return "xbox_process_call_message";
+ }
return RuntimeOption::XboxProcessMessageFunc.c_str();
}
virtual const char *getRemoteHost() {
@@ -61,6 +67,7 @@ class XboxTransport : public Transport, public Synchronizable {
}
virtual std::string getHeader(const char *name) {
if (!strcasecmp(name, "Host")) return m_host;
+ if (!strcasecmp(name, "ReqInitDoc")) return m_reqInitDoc;
return "";
}
virtual void getHeaders(HeaderMap &headers) {
@@ -134,6 +141,7 @@ class XboxTransport : public Transport, public Synchronizable {
string m_response;
int m_code;
string m_host;
+ string m_reqInitDoc;
};
class XboxRequestHandler: public RPCRequestHandler {
@@ -148,28 +156,37 @@ bool XboxRequestHandler::Info = false;
static IMPLEMENT_THREAD_LOCAL(XboxServerInfoPtr, s_xbox_server_info);
static IMPLEMENT_THREAD_LOCAL(XboxRequestHandler, s_xbox_request_handler);
+static IMPLEMENT_THREAD_LOCAL(string, s_xbox_prev_req_init_doc);
///////////////////////////////////////////////////////////////////////////////
class XboxWorker : public JobQueueWorker<XboxTransport*, true> {
public:
virtual void doJob(XboxTransport *job) {
try {
+ // If this job or the previous job that ran on this thread have
+ // a custom initial document, make sure we do a reset
+ string reqInitDoc = job->getHeader("ReqInitDoc");
+ bool needReset = !reqInitDoc.empty() ||
+ !s_xbox_prev_req_init_doc->empty();
+ *s_xbox_prev_req_init_doc = reqInitDoc;
+
job->onRequestStart(job->getStartTimer());
- createRequestHandler()->handleRequest(job);
+ createRequestHandler(needReset)->handleRequest(job);
job->decRefCount();
} catch (...) {
Logger::Error("RpcRequestHandler leaked exceptions");
}
}
private:
- RequestHandler *createRequestHandler() {
+ RequestHandler *createRequestHandler(bool needReset = false) {
if (!*s_xbox_server_info) {
*s_xbox_server_info = XboxServerInfoPtr(new XboxServerInfo());
}
if (RuntimeOption::XboxServerLogInfo) XboxRequestHandler::Info = true;
s_xbox_request_handler->setServerInfo(*s_xbox_server_info);
s_xbox_request_handler->setReturnEncodeType(RPCRequestHandler::Serialize);
- if (s_xbox_request_handler->needReset() ||
+ if (needReset ||
+ s_xbox_request_handler->needReset() ||
s_xbox_request_handler->incRequest() >
(*s_xbox_server_info)->getMaxRequest()) {
Logger::Verbose("resetting xbox request handler");
@@ -180,6 +197,12 @@ class XboxWorker : public JobQueueWorker<XboxTransport*, true> {
}
return s_xbox_request_handler.get();
}
+
+ virtual void onThreadExit() {
+ if (!s_xbox_request_handler.isNull()) {
+ s_xbox_request_handler.destroy();
+ }
+ }
};
///////////////////////////////////////////////////////////////////////////////
@@ -207,6 +230,14 @@ void XboxServer::Restart() {
}
}
+void XboxServer::Stop() {
+ if (s_dispatcher) {
+ s_dispatcher->stop();
+ delete s_dispatcher;
+ s_dispatcher = NULL;
+ }
+}
+
///////////////////////////////////////////////////////////////////////////////
static bool isLocalHost(CStrRef host) {
@@ -331,8 +362,8 @@ class XboxTask : public SweepableResourceData {
public:
DECLARE_OBJECT_ALLOCATION(XboxTask)
- XboxTask(CStrRef message) {
- m_job = new XboxTransport(message);
+ XboxTask(CStrRef message, CStrRef reqInitDoc = "") {
+ m_job = new XboxTransport(message, reqInitDoc);
m_job->incRefCount();
}
@@ -362,12 +393,18 @@ bool XboxServer::Available() {
RuntimeOption::XboxServerMaxQueueLength;
}
-Object XboxServer::TaskStart(CStrRef message) {
- if (RuntimeOption::XboxServerThreadCount <= 0 ||
- !Available()) {
- return null_object;
+Object XboxServer::TaskStart(CStrRef msg, CStrRef reqInitDoc /* = "" */) {
+ bool xboxEnabled = (RuntimeOption::XboxServerThreadCount > 0);
+ if (!xboxEnabled || !Available()) {
+ const char* errMsg = (xboxEnabled ?
+ "Cannot create new Xbox task because the Xbox queue has "
+ "reached maximum capacity" :
+ "Cannot create new Xbox task because the Xbox is not enabled");
+ Object e = SystemLib::AllocExceptionObject(errMsg);
+ throw_exception(e);
+ return Object();
}
- XboxTask *task = NEWOBJ(XboxTask)(message);
+ XboxTask *task = NEWOBJ(XboxTask)(msg, reqInitDoc);
Object ret(task);
XboxTransport *job = task->getJob();
job->incRefCount(); // paired with worker's decRefCount()
View
3  src/runtime/base/server/xbox_server.h
@@ -34,6 +34,7 @@ class XboxServer {
* Start or restart xbox server.
*/
static void Restart();
+ static void Stop();
public:
/**
@@ -51,7 +52,7 @@ class XboxServer {
/**
* Local tasklet for parallel processing.
*/
- static Object TaskStart(CStrRef message);
+ static Object TaskStart(CStrRef msg, CStrRef reqInitDoc = "");
static bool TaskStatus(CObjRef task);
static int TaskResult(CObjRef task, int timeout_ms, Variant &ret);
View
17 src/runtime/ext/ext_server.cpp
@@ -183,6 +183,23 @@ int64 f_xbox_task_result(CObjRef task, int64 timeout_ms, VRefParam ret) {
return XboxServer::TaskResult(task, timeout_ms, ret);
}
+Variant f_xbox_process_call_message(CStrRef msg) {
+ Variant v = f_unserialize(msg);
+ if (!v.isArray()) {
+ raise_error("Error decoding xbox call message");
+ }
+ Array arr = v.toArray();
+ if (arr.size() != 2 || !arr.exists(0) || !arr.exists(1)) {
+ raise_error("Error decoding xbox call message");
+ }
+ Variant fn = arr.rvalAt(0);
+ Variant args = arr.rvalAt(1);
+ if (!fn.isString() || !args.isArray()) {
+ raise_error("Error decoding xbox call message");
+ }
+ return f_call_user_func_array(fn, args.toArray());
+}
+
int f_xbox_get_thread_timeout() {
XboxServerInfoPtr server_info = XboxServer::GetServerInfo();
if (server_info) {
View
1  src/runtime/ext/ext_server.h
@@ -49,6 +49,7 @@ bool f_xbox_post_message(CStrRef msg, CStrRef host = "localhost");
Object f_xbox_task_start(CStrRef message);
bool f_xbox_task_status(CObjRef task);
int64 f_xbox_task_result(CObjRef task, int64 timeout_ms, VRefParam ret);
+Variant f_xbox_process_call_message(CStrRef msg);
int f_xbox_get_thread_timeout();
void f_xbox_set_thread_timeout(int timeout);
void f_xbox_schedule_thread_reset();
View
5 src/runtime/ext/profile/extprofile_server.h
@@ -85,6 +85,11 @@ inline int64 x_xbox_task_result(CObjRef task, int64 timeout_ms, VRefParam ret) {
return f_xbox_task_result(task, timeout_ms, ret);
}
+inline Variant x_xbox_process_call_message(CStrRef msg) {
+ FUNCTION_INJECTION_BUILTIN(xbox_process_call_message);
+ return f_xbox_process_call_message(msg);
+}
+
inline int x_xbox_get_thread_timeout() {
FUNCTION_INJECTION_BUILTIN(xbox_get_thread_timeout);
return f_xbox_get_thread_timeout();
View
2  src/system/function.inc
@@ -4,7 +4,7 @@
"get_defined_functions", T(Array), S(0), NULL, S(16384), "/**\n * ( excerpt from\n * http://php.net/manual/en/function.get-defined-functions.php )\n *\n * Gets an array of all defined functions.\n *\n * @return map Returns an multidimensional array containing a list\n * of all defined functions, both built-in (internal)\n * and user-defined. The internal functions will be\n * accessible via $arr[\"internal\"], and the user\n * defined ones using $arr[\"user\"] (see example below).\n */",
"function_exists", T(Boolean), S(0), "function_name", T(String), NULL, NULL, S(0), NULL, S(16384), "/**\n * ( excerpt from http://php.net/manual/en/function.function-exists.php )\n *\n * Checks the list of defined functions, both built-in (internal) and\n * user-defined, for function_name.\n *\n * @function_name\n * string The function name, as a string.\n *\n * @return bool Returns TRUE if function_name exists and is a\n * function, FALSE otherwise.\n *\n * This function will return FALSE for constructs,\n * such as include_once() and echo().\n */",
"is_callable", T(Boolean), S(0), "v", T(Some), NULL, NULL, S(0), "syntax", T(Boolean), "b:0;", "false", S(0), "name", T(Variant), "N;", "null", S(1), NULL, S(75513856), "/**\n * ( excerpt from http://php.net/manual/en/function.is-callable.php )\n *\n * Verify that the contents of a variable can be called as a function.\n * This can check that a simple variable contains the name of a valid\n * function, or that an array contains a properly encoded object and\n * function name.\n *\n * @v mixed Can be either the name of a function stored in a\n * string variable, or an object and the name of a\n * method within the object, like this:\n * array($SomeObject, 'MethodName')\n * @syntax bool If set to TRUE the function only verifies that name\n * might be a function or method. It will only reject\n * simple variables that are not strings, or an array\n * that does not have a valid structure to be used as a\n * callback. The valid ones are supposed to have only 2\n * entries, the first of which is an object or a\n * string, and the second a string.\n * @name mixed Receives the \"callable name\". In the example below\n * it is \"someClass::someMethod\". Note, however, that\n * despite the implication that someClass::SomeMethod()\n * is a callable static method, this is not the case.\n *\n * @return bool Returns TRUE if name is callable, FALSE otherwise.\n */", S(hphp_opt_is_callable),
-"call_user_func_array", T(Variant), S(0), "function", T(Variant), NULL, NULL, S(0), "params", T(Array), NULL, NULL, S(0), NULL, S(109068288), "/**\n * ( excerpt from\n * http://php.net/manual/en/function.call-user-func-array.php )\n *\n * Call a user defined function with the parameters in param_arr.\n *\n * @function mixed The function to be called.\n * @params vector The parameters to be passed to the function, as an\n * indexed array.\n *\n * @return mixed Returns the function result, or FALSE on error.\n */", S(hphp_opt_call_user_func),
+"call_user_func_array", T(Variant), S(0), "function", T(Variant), NULL, NULL, S(0), "params", T(Array), NULL, NULL, S(0), NULL, S(109068288), "/**\n * ( excerpt from\n * http://php.net/manual/en/function.call-user-func-array.php )\n *\n * Call a user defined function with the parameters in params.\n *\n * @function mixed The function to be called.\n * @params vector The parameters to be passed to the function, as an\n * indexed array.\n *\n * @return mixed Returns the function result, or FALSE on error.\n */", S(hphp_opt_call_user_func),
"call_user_func", T(Variant), S(0), "function", T(Variant), NULL, NULL, S(0), NULL, S(109592576), "/**\n * ( excerpt from http://php.net/manual/en/function.call-user-func.php )\n *\n * Call a user defined function given by the function parameter.\n *\n * @function mixed The function to be called. Class methods may also be\n * invoked statically using this function by passing\n * array($classname, $methodname) to this parameter.\n * Additionally class methods of an object instance may\n * be called by passing array($objectinstance,\n * $methodname) to this parameter.\n *\n * @return mixed Returns the function result, or FALSE on error.\n */", S(hphp_opt_call_user_func),
"call_user_func_array_async", T(Object), S(0), "function", T(Variant), NULL, NULL, S(0), "params", T(Array), NULL, NULL, S(0), NULL, S(67190784), "/**\n * ( HipHop specific )\n *\n * Same as call_user_func_array(), but returns an object immediately\n * without waiting for the function to finish. The object can be used with\n * end_user_func_async() to eventually retrieve function's return, if\n * needed.\n *\n * @function mixed The function to be called, same as in\n * call_user_func_array().\n * @params vector Parameters, same as in call_user_func_array().\n *\n * @return object An object end_user_func_async() uses for final\n * waiting of function's return.\n */",
"call_user_func_async", T(Object), S(0), "function", T(Variant), NULL, NULL, S(0), NULL, S(67715072), "/**\n * ( HipHop specific )\n *\n * Same as call_user_func(), but returns an object immediately without\n * waiting for the function to finish. The object can be used with\n * end_user_func_async() to eventually retrieve function's return, if\n * needed.\n *\n * @function mixed The function to be called, same as in\n * call_user_func_array().\n *\n * @return object An object end_user_func_async() uses for final\n * waiting of function's return.\n */",
View
15 src/system/gen/sys/dynamic_table_func.no.cpp
@@ -12287,6 +12287,14 @@ Variant ifa_fnmatch(void *extra, int count, INVOKE_FEW_ARGS_IMPL_ARGS) {
Variant i_fnmatch(void *extra, CArrRef params) {
return invoke_func_few_handler(extra, params, &ifa_fnmatch);
}
+Variant ifa_xbox_process_call_message(void *extra, int count, INVOKE_FEW_ARGS_IMPL_ARGS) {
+ if (UNLIKELY(count != 1)) return throw_wrong_arguments("xbox_process_call_message", count, 1, 1, 1);
+ CVarRef arg0(a0);
+ return (x_xbox_process_call_message(arg0));
+}
+Variant i_xbox_process_call_message(void *extra, CArrRef params) {
+ return invoke_func_few_handler(extra, params, &ifa_xbox_process_call_message);
+}
Variant ifa_forward_static_call_array(void *extra, int count, INVOKE_FEW_ARGS_IMPL_ARGS) {
if (UNLIKELY(count != 2)) return throw_wrong_arguments("forward_static_call_array", count, 2, 2, 1);
CVarRef arg0(a0);
@@ -22578,6 +22586,7 @@ CallInfo ci_rename_function((void*)&i_rename_function, (void*)&ifa_rename_functi
CallInfo ci_apd_set_pprof_trace((void*)&i_apd_set_pprof_trace, (void*)&ifa_apd_set_pprof_trace, 2, 0, 0x0000000000000000LL);
CallInfo ci_openssl_public_encrypt((void*)&i_openssl_public_encrypt, (void*)&ifa_openssl_public_encrypt, 4, 0, 0x0000000000000002LL);
CallInfo ci_fnmatch((void*)&i_fnmatch, (void*)&ifa_fnmatch, 3, 0, 0x0000000000000000LL);
+CallInfo ci_xbox_process_call_message((void*)&i_xbox_process_call_message, (void*)&ifa_xbox_process_call_message, 1, 0, 0x0000000000000000LL);
CallInfo ci_forward_static_call_array((void*)&i_forward_static_call_array, (void*)&ifa_forward_static_call_array, 2, 0, 0x0000000000000000LL);
CallInfo ci_xmlwriter_write_dtd_element((void*)&i_xmlwriter_write_dtd_element, (void*)&ifa_xmlwriter_write_dtd_element, 3, 0, 0x0000000000000000LL);
CallInfo ci_intval((void*)&i_intval, (void*)&ifa_intval, 2, 0, 0x0000000000000000LL);
@@ -31780,6 +31789,12 @@ bool get_call_info_builtin(const CallInfo *&ci, void *&extra, const char *s, int
return true;
}
break;
+ case 5338:
+ HASH_GUARD(0x1E6E2E538A0754DALL, xbox_process_call_message) {
+ ci = &ci_xbox_process_call_message;
+ return true;
+ }
+ break;
case 5343:
HASH_GUARD(0x747A7F585CD694DFLL, zend_version) {
ci = &ci_zend_version;
View
1  src/system/server.inc
@@ -13,6 +13,7 @@
"xbox_task_start", T(Object), S(0), "message", T(String), NULL, NULL, S(0), NULL, S(81920), "/**\n * ( HipHop specific )\n *\n * Starts a local xbox task.\n *\n * @message string A message to send to xbox's message processing\n * function.\n *\n * @return resource\n * A task handle xbox_task_status() and\n * xbox_task_result() can use.\n */",
"xbox_task_status", T(Boolean), S(0), "task", T(Object), NULL, NULL, S(0), NULL, S(81920), "/**\n * ( HipHop specific )\n *\n * Checks an xbox task's status.\n *\n * @task resource\n * The xbox task object created by xbox_task_start().\n *\n * @return bool TRUE if finished, FALSE otherwise.\n */",
"xbox_task_result", T(Int64), S(0), "task", T(Object), NULL, NULL, S(0), "timeout_ms", T(Int64), NULL, NULL, S(0), "ret", T(Variant), NULL, NULL, S(1), NULL, S(81920), "/**\n * ( HipHop specific )\n *\n * Block and wait for xbox task's result.\n *\n * @task resource\n * The xbox task object created by xbox_task_start().\n * @timeout_ms int How many milli-seconds to wait.\n * @ret mixed xbox message processing function's return value.\n *\n * @return int Response code following HTTP's responses. For\n * example, 200 for success and 500 for server error.\n */",
+"xbox_process_call_message", T(Variant), S(0), "msg", T(String), NULL, NULL, S(0), NULL, S(81920), "/**\n * ( HipHop specific )\n *\n * This function is invoked by the xbox facility to start an xbox call\n * task. This function is not intended to be called directly by user code.\n *\n * @msg string The call message.\n *\n * @return mixed The return value of the xbox call task.\n */",
"xbox_get_thread_timeout", T(Int32), S(0), NULL, S(81920), "/**\n * ( HipHop specific )\n *\n * Gets the timeout (maximum duration), in seconds, of the current xbox\n * thread. Throws for non-xbox threads.\n *\n * @return int The current timeout (maximum duration).\n */",
"xbox_set_thread_timeout", T(Void), S(0), "timeout", T(Int32), NULL, NULL, S(0), NULL, S(81920), "/**\n * ( HipHop specific )\n *\n * Sets the timeout (maximum duration), in seconds, of the current xbox\n * thread. The xbox thread would reset when this amount of time has passed\n * since the previous reset. Throws for non-xbox threads.\n *\n * @timeout int The new timeout (maximum duration).\n */",
"xbox_schedule_thread_reset", T(Void), S(0), NULL, S(81920), "/**\n * ( HipHop specific )\n *\n * Schedules a reset of the current xbox thread, when the next request\n * comes in. Throws for non-xbox threads.\n *\n */",
View
20 src/util/job_queue.h
@@ -301,12 +301,10 @@ class JobQueueDispatcher {
int dropCacheTimeout, bool dropStack, void *opaque,
bool lifo = false)
: m_stopped(true), m_id(0), m_opaque(opaque),
+ m_maxThreadCount(threadCount),
m_queue(threadCount, threadRoundRobin, dropCacheTimeout, dropStack,
lifo) {
ASSERT(threadCount >= 1);
- for (int i = 0; i < threadCount; i++) {
- addWorkerImpl(false);
- }
}
~JobQueueDispatcher() {
@@ -329,12 +327,21 @@ class JobQueueDispatcher {
int getQueuedJobs() {
return m_queue.getQueuedJobs();
}
+ int getTargetNumWorkers() {
+ int target = getActiveWorker() + getQueuedJobs();
+ return (target > m_maxThreadCount) ? m_maxThreadCount : target;
+ }
/**
* Creates worker threads and start running them. This is non-blocking.
*/
void start() {
Lock lock(m_mutex);
+ // Spin up more worker threads if appropriate
+ int target = getTargetNumWorkers();
+ for (int n = m_workers.size(); n < target; ++n) {
+ addWorkerImpl(false);
+ }
for (typename
std::set<AsyncFunc<TWorker>*>::iterator iter = m_funcs.begin();
iter != m_funcs.end(); ++iter) {
@@ -348,6 +355,12 @@ class JobQueueDispatcher {
*/
void enqueue(TJob job) {
m_queue.enqueue(job);
+ // Spin up another worker thread if appropriate
+ int target = getTargetNumWorkers();
+ int n = m_workers.size();
+ if (n < target) {
+ addWorker();
+ }
}
/**
@@ -439,6 +452,7 @@ class JobQueueDispatcher {
bool m_stopped;
int m_id;
void *m_opaque;
+ int m_maxThreadCount;
JobQueue<TJob, TWorker::Waitable> m_queue;
Mutex m_mutex;
Please sign in to comment.
Something went wrong with that request. Please try again.