Skip to content

Commit

Permalink
Revert "Improve fb_call_user_func_async"
Browse files Browse the repository at this point in the history
This reverts commit b06552b0fac63c074f3003d7cfac0be10663028f.

Conflicts:

	src/system/function.inc
  • Loading branch information
paroski authored and macvicar committed Oct 18, 2011
1 parent 23b99ae commit d5c2d1c
Show file tree
Hide file tree
Showing 15 changed files with 24 additions and 147 deletions.
2 changes: 1 addition & 1 deletion src/idl/function.idl.php
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
DefineFunction(
array(
'name' => "call_user_func_array",
'desc' => "Call a user defined function with the parameters in params.",
'desc' => "Call a user defined function with the parameters in param_arr.",
'flags' => HasDocComment | HasOptFunction | NoProfile | ContextSensitive,
'opt' => "hphp_opt_call_user_func",
'return' => array(
Expand Down
18 changes: 0 additions & 18 deletions src/idl/server.idl.php
Original file line number Diff line number Diff line change
Expand Up @@ -322,24 +322,6 @@
),
));

DefineFunction(
array(
'name' => "xbox_process_call_message",
'desc' => "This function is invoked by the xbox facility to start an xbox call task. This function is not intended to be called directly by user code.",
'flags' => HasDocComment | HipHopSpecific,
'return' => array(
'type' => Variant,
'desc' => "The return value of the xbox call task.",
),
'args' => array(
array(
'name' => "msg",
'type' => String,
'desc' => "The call message.",
),
),
));

DefineFunction(
array(
'name' => "xbox_get_thread_timeout",
Expand Down
3 changes: 0 additions & 3 deletions src/runtime/base/program_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ static bool handle_exception(ExecutionContext *context, std::string &errorMsg,
Logger::Error("%s", errorMsg.c_str());
error = true;
} catch (const Exception &e) {
errorMsg = "";
if (where == HandlerException) {
errorMsg = "Exception handler threw an exception: ";
}
Expand All @@ -303,7 +302,6 @@ static bool handle_exception(ExecutionContext *context, std::string &errorMsg,
Logger::Error("%s", errorMsg.c_str());
}
} catch (const Object &e) {
errorMsg = "";
if (where == HandlerException) {
errorMsg = "Exception handler threw an object exception: ";
}
Expand Down Expand Up @@ -1267,7 +1265,6 @@ void hphp_session_exit() {

void hphp_process_exit() {
FiberAsyncFunc::Stop();
XboxServer::Stop();
Eval::Debugger::Stop();
Extension::ShutdownModules();
LightProcess::Close();
Expand Down
8 changes: 4 additions & 4 deletions src/runtime/base/runtime_option.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ VirtualHostPtrVec RuntimeOption::VirtualHosts;
IpBlockMapPtr RuntimeOption::IpBlocks;
SatelliteServerInfoPtrVec RuntimeOption::SatelliteServerInfos;

int RuntimeOption::XboxServerThreadCount = 10;
int RuntimeOption::XboxServerThreadCount = 0;
int RuntimeOption::XboxServerMaxQueueLength = INT_MAX;
int RuntimeOption::XboxServerPort = 0;
int RuntimeOption::XboxDefaultLocalTimeoutMilliSeconds = 500;
Expand All @@ -165,7 +165,7 @@ std::string RuntimeOption::XboxServerInfoWarmupDoc;
std::string RuntimeOption::XboxServerInfoReqInitFunc;
std::string RuntimeOption::XboxServerInfoReqInitDoc;
bool RuntimeOption::XboxServerInfoAlwaysReset = false;
bool RuntimeOption::XboxServerLogInfo = false;
bool RuntimeOption::XboxServerLogInfo = true;
std::string RuntimeOption::XboxProcessMessageFunc = "xbox_process_message";
std::string RuntimeOption::XboxPassword;
std::set<std::string> RuntimeOption::XboxPasswords;
Expand Down Expand Up @@ -842,7 +842,7 @@ void RuntimeOption::Load(Hdf &config, StringVec *overwrites /* = NULL */) {
}
{
Hdf xbox = config["Xbox"];
XboxServerThreadCount = xbox["ServerInfo.ThreadCount"].getInt32(10);
XboxServerThreadCount = xbox["ServerInfo.ThreadCount"].getInt32(0);
XboxServerMaxQueueLength =
xbox["ServerInfo.MaxQueueLength"].getInt32(INT_MAX);
if (XboxServerMaxQueueLength < 0) XboxServerMaxQueueLength = INT_MAX;
Expand All @@ -857,7 +857,7 @@ void RuntimeOption::Load(Hdf &config, StringVec *overwrites /* = NULL */) {
XboxServerInfoReqInitFunc = xbox["ServerInfo.RequestInitFunction"].get("");
XboxServerInfoReqInitDoc = xbox["ServerInfo.RequestInitDocument"].get("");
XboxServerInfoAlwaysReset = xbox["ServerInfo.AlwaysReset"].getBool(false);
XboxServerLogInfo = xbox["ServerInfo.LogInfo"].getBool(false);
XboxServerLogInfo = xbox["ServerInfo.LogInfo"].getBool(true);
XboxProcessMessageFunc =
xbox["ProcessMessageFunc"].get("xbox_process_message");
}
Expand Down
2 changes: 0 additions & 2 deletions src/runtime/base/server/http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,6 @@ void HttpServer::onServerShutdown() {
Logger::Info("debugger server stopped");
}

XboxServer::Stop();

// When a new instance of HPHP has taken over our page server socket,
// stop our admin server and satellites so it can acquire those ports.
for (unsigned int i = 0; i < m_satellites.size(); i++) {
Expand Down
15 changes: 3 additions & 12 deletions src/runtime/base/server/rpc_request_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,7 @@ RPCRequestHandler::RPCRequestHandler(bool info /* = true */)
: m_count(0), m_reset(false),
m_returnEncodeType(Json) {
hphp_session_init();
bool isServer = (strcmp(RuntimeOption::ExecutionMode, "srv") == 0);
if (isServer) {
m_context = hphp_context_init();
} else {
// In command line mode, we want the xbox workers to
// output to STDOUT
m_context = g_context.getNoCheck();
m_context->obSetImplicitFlush(true);
}
m_context = hphp_context_init();
m_created = time(0);

Logger::ResetRequestCount();
Expand Down Expand Up @@ -189,8 +181,7 @@ bool RPCRequestHandler::executePHPFunction(Transport *transport,
Variant funcRet;
string errorMsg = "Internal Server Error";
string warmupDoc, reqInitFunc, reqInitDoc;
reqInitDoc = transport->getHeader("ReqInitDoc");
if (reqInitDoc.empty() && m_serverInfo) {
if (m_serverInfo) {
warmupDoc = m_serverInfo->getWarmupDoc();
reqInitFunc = m_serverInfo->getReqInitFunc();
reqInitDoc = m_serverInfo->getReqInitDoc();
Expand All @@ -202,7 +193,7 @@ bool RPCRequestHandler::executePHPFunction(Transport *transport,

if (!reqInitDoc.empty()) reqInitDoc = canonicalize_path(reqInitDoc, "", 0);
if (!reqInitDoc.empty()) {
reqInitDoc = getSourceFilename(reqInitDoc, sourceRootInfo);
reqInitDoc = getSourceFilename(reqInitDoc, sourceRootInfo);
}

bool runOnce = false;
Expand Down
59 changes: 11 additions & 48 deletions src/runtime/base/server/xbox_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <util/job_queue.h>
#include <util/lock.h>
#include <util/logger.h>
#include <system/lib/systemlib.h>

using namespace std;

Expand All @@ -32,12 +31,10 @@ namespace HPHP {

class XboxTransport : public Transport, public Synchronizable {
public:
XboxTransport(CStrRef message, CStrRef reqInitDoc = "")
: m_refCount(0), m_done(false), m_code(0) {
XboxTransport(CStrRef message) : m_refCount(0), m_done(false), m_code(0) {
gettime(CLOCK_MONOTONIC, &m_queueTime);

m_message.append(message.data(), message.size());
m_reqInitDoc.append(reqInitDoc.data(), reqInitDoc.size());
disableCompression(); // so we don't have to decompress during sendImpl()
}

Expand All @@ -47,9 +44,6 @@ class XboxTransport : public Transport, public Synchronizable {
* Implementing Transport...
*/
virtual const char *getUrl() {
if (!m_reqInitDoc.empty()) {
return "xbox_process_call_message";
}
return RuntimeOption::XboxProcessMessageFunc.c_str();
}
virtual const char *getRemoteHost() {
Expand All @@ -67,7 +61,6 @@ class XboxTransport : public Transport, public Synchronizable {
}
virtual std::string getHeader(const char *name) {
if (!strcasecmp(name, "Host")) return m_host;
if (!strcasecmp(name, "ReqInitDoc")) return m_reqInitDoc;
return "";
}
virtual void getHeaders(HeaderMap &headers) {
Expand Down Expand Up @@ -141,7 +134,6 @@ class XboxTransport : public Transport, public Synchronizable {
string m_response;
int m_code;
string m_host;
string m_reqInitDoc;
};

class XboxRequestHandler: public RPCRequestHandler {
Expand All @@ -156,37 +148,28 @@ bool XboxRequestHandler::Info = false;

static IMPLEMENT_THREAD_LOCAL(XboxServerInfoPtr, s_xbox_server_info);
static IMPLEMENT_THREAD_LOCAL(XboxRequestHandler, s_xbox_request_handler);
static IMPLEMENT_THREAD_LOCAL(string, s_xbox_prev_req_init_doc);
///////////////////////////////////////////////////////////////////////////////

class XboxWorker : public JobQueueWorker<XboxTransport*, true> {
public:
virtual void doJob(XboxTransport *job) {
try {
// If this job or the previous job that ran on this thread have
// a custom initial document, make sure we do a reset
string reqInitDoc = job->getHeader("ReqInitDoc");
bool needReset = !reqInitDoc.empty() ||
!s_xbox_prev_req_init_doc->empty();
*s_xbox_prev_req_init_doc = reqInitDoc;

job->onRequestStart(job->getStartTimer());
createRequestHandler(needReset)->handleRequest(job);
createRequestHandler()->handleRequest(job);
job->decRefCount();
} catch (...) {
Logger::Error("RpcRequestHandler leaked exceptions");
}
}
private:
RequestHandler *createRequestHandler(bool needReset = false) {
RequestHandler *createRequestHandler() {
if (!*s_xbox_server_info) {
*s_xbox_server_info = XboxServerInfoPtr(new XboxServerInfo());
}
if (RuntimeOption::XboxServerLogInfo) XboxRequestHandler::Info = true;
s_xbox_request_handler->setServerInfo(*s_xbox_server_info);
s_xbox_request_handler->setReturnEncodeType(RPCRequestHandler::Serialize);
if (needReset ||
s_xbox_request_handler->needReset() ||
if (s_xbox_request_handler->needReset() ||
s_xbox_request_handler->incRequest() >
(*s_xbox_server_info)->getMaxRequest()) {
Logger::Verbose("resetting xbox request handler");
Expand All @@ -197,12 +180,6 @@ class XboxWorker : public JobQueueWorker<XboxTransport*, true> {
}
return s_xbox_request_handler.get();
}

virtual void onThreadExit() {
if (!s_xbox_request_handler.isNull()) {
s_xbox_request_handler.destroy();
}
}
};

///////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -230,14 +207,6 @@ void XboxServer::Restart() {
}
}

void XboxServer::Stop() {
if (s_dispatcher) {
s_dispatcher->stop();
delete s_dispatcher;
s_dispatcher = NULL;
}
}

///////////////////////////////////////////////////////////////////////////////

static bool isLocalHost(CStrRef host) {
Expand Down Expand Up @@ -362,8 +331,8 @@ class XboxTask : public SweepableResourceData {
public:
DECLARE_OBJECT_ALLOCATION(XboxTask)

XboxTask(CStrRef message, CStrRef reqInitDoc = "") {
m_job = new XboxTransport(message, reqInitDoc);
XboxTask(CStrRef message) {
m_job = new XboxTransport(message);
m_job->incRefCount();
}

Expand Down Expand Up @@ -393,18 +362,12 @@ bool XboxServer::Available() {
RuntimeOption::XboxServerMaxQueueLength;
}

Object XboxServer::TaskStart(CStrRef msg, CStrRef reqInitDoc /* = "" */) {
bool xboxEnabled = (RuntimeOption::XboxServerThreadCount > 0);
if (!xboxEnabled || !Available()) {
const char* errMsg = (xboxEnabled ?
"Cannot create new Xbox task because the Xbox queue has "
"reached maximum capacity" :
"Cannot create new Xbox task because the Xbox is not enabled");
Object e = SystemLib::AllocExceptionObject(errMsg);
throw_exception(e);
return Object();
Object XboxServer::TaskStart(CStrRef message) {
if (RuntimeOption::XboxServerThreadCount <= 0 ||
!Available()) {
return null_object;
}
XboxTask *task = NEWOBJ(XboxTask)(msg, reqInitDoc);
XboxTask *task = NEWOBJ(XboxTask)(message);
Object ret(task);
XboxTransport *job = task->getJob();
job->incRefCount(); // paired with worker's decRefCount()
Expand Down
3 changes: 1 addition & 2 deletions src/runtime/base/server/xbox_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class XboxServer {
* Start or restart xbox server.
*/
static void Restart();
static void Stop();

public:
/**
Expand All @@ -52,7 +51,7 @@ class XboxServer {
/**
* Local tasklet for parallel processing.
*/
static Object TaskStart(CStrRef msg, CStrRef reqInitDoc = "");
static Object TaskStart(CStrRef message);
static bool TaskStatus(CObjRef task);
static int TaskResult(CObjRef task, int timeout_ms, Variant &ret);

Expand Down
17 changes: 0 additions & 17 deletions src/runtime/ext/ext_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,23 +183,6 @@ int64 f_xbox_task_result(CObjRef task, int64 timeout_ms, VRefParam ret) {
return XboxServer::TaskResult(task, timeout_ms, ret);
}

Variant f_xbox_process_call_message(CStrRef msg) {
Variant v = f_unserialize(msg);
if (!v.isArray()) {
raise_error("Error decoding xbox call message");
}
Array arr = v.toArray();
if (arr.size() != 2 || !arr.exists(0) || !arr.exists(1)) {
raise_error("Error decoding xbox call message");
}
Variant fn = arr.rvalAt(0);
Variant args = arr.rvalAt(1);
if (!fn.isString() || !args.isArray()) {
raise_error("Error decoding xbox call message");
}
return f_call_user_func_array(fn, args.toArray());
}

int f_xbox_get_thread_timeout() {
XboxServerInfoPtr server_info = XboxServer::GetServerInfo();
if (server_info) {
Expand Down
1 change: 0 additions & 1 deletion src/runtime/ext/ext_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ bool f_xbox_post_message(CStrRef msg, CStrRef host = "localhost");
Object f_xbox_task_start(CStrRef message);
bool f_xbox_task_status(CObjRef task);
int64 f_xbox_task_result(CObjRef task, int64 timeout_ms, VRefParam ret);
Variant f_xbox_process_call_message(CStrRef msg);
int f_xbox_get_thread_timeout();
void f_xbox_set_thread_timeout(int timeout);
void f_xbox_schedule_thread_reset();
Expand Down
5 changes: 0 additions & 5 deletions src/runtime/ext/profile/extprofile_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,6 @@ inline int64 x_xbox_task_result(CObjRef task, int64 timeout_ms, VRefParam ret) {
return f_xbox_task_result(task, timeout_ms, ret);
}

inline Variant x_xbox_process_call_message(CStrRef msg) {
FUNCTION_INJECTION_BUILTIN(xbox_process_call_message);
return f_xbox_process_call_message(msg);
}

inline int x_xbox_get_thread_timeout() {
FUNCTION_INJECTION_BUILTIN(xbox_get_thread_timeout);
return f_xbox_get_thread_timeout();
Expand Down
2 changes: 1 addition & 1 deletion src/system/function.inc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"get_defined_functions", T(Array), S(0), NULL, S(16384), "/**\n * ( excerpt from\n * http://php.net/manual/en/function.get-defined-functions.php )\n *\n * Gets an array of all defined functions.\n *\n * @return map Returns an multidimensional array containing a list\n * of all defined functions, both built-in (internal)\n * and user-defined. The internal functions will be\n * accessible via $arr[\"internal\"], and the user\n * defined ones using $arr[\"user\"] (see example below).\n */",
"function_exists", T(Boolean), S(0), "function_name", T(String), NULL, NULL, S(0), NULL, S(16384), "/**\n * ( excerpt from http://php.net/manual/en/function.function-exists.php )\n *\n * Checks the list of defined functions, both built-in (internal) and\n * user-defined, for function_name.\n *\n * @function_name\n * string The function name, as a string.\n *\n * @return bool Returns TRUE if function_name exists and is a\n * function, FALSE otherwise.\n *\n * This function will return FALSE for constructs,\n * such as include_once() and echo().\n */",
"is_callable", T(Boolean), S(0), "v", T(Some), NULL, NULL, S(0), "syntax", T(Boolean), "b:0;", "false", S(0), "name", T(Variant), "N;", "null", S(1), NULL, S(75513856), "/**\n * ( excerpt from http://php.net/manual/en/function.is-callable.php )\n *\n * Verify that the contents of a variable can be called as a function.\n * This can check that a simple variable contains the name of a valid\n * function, or that an array contains a properly encoded object and\n * function name.\n *\n * @v mixed Can be either the name of a function stored in a\n * string variable, or an object and the name of a\n * method within the object, like this:\n * array($SomeObject, 'MethodName')\n * @syntax bool If set to TRUE the function only verifies that name\n * might be a function or method. It will only reject\n * simple variables that are not strings, or an array\n * that does not have a valid structure to be used as a\n * callback. The valid ones are supposed to have only 2\n * entries, the first of which is an object or a\n * string, and the second a string.\n * @name mixed Receives the \"callable name\". In the example below\n * it is \"someClass::someMethod\". Note, however, that\n * despite the implication that someClass::SomeMethod()\n * is a callable static method, this is not the case.\n *\n * @return bool Returns TRUE if name is callable, FALSE otherwise.\n */", S(hphp_opt_is_callable),
"call_user_func_array", T(Variant), S(0), "function", T(Variant), NULL, NULL, S(0), "params", T(Array), NULL, NULL, S(0), NULL, S(109068288), "/**\n * ( excerpt from\n * http://php.net/manual/en/function.call-user-func-array.php )\n *\n * Call a user defined function with the parameters in params.\n *\n * @function mixed The function to be called.\n * @params vector The parameters to be passed to the function, as an\n * indexed array.\n *\n * @return mixed Returns the function result, or FALSE on error.\n */", S(hphp_opt_call_user_func),
"call_user_func_array", T(Variant), S(0), "function", T(Variant), NULL, NULL, S(0), "params", T(Array), NULL, NULL, S(0), NULL, S(109068288), "/**\n * ( excerpt from\n * http://php.net/manual/en/function.call-user-func-array.php )\n *\n * Call a user defined function with the parameters in param_arr.\n *\n * @function mixed The function to be called.\n * @params vector The parameters to be passed to the function, as an\n * indexed array.\n *\n * @return mixed Returns the function result, or FALSE on error.\n */", S(hphp_opt_call_user_func),
"call_user_func", T(Variant), S(0), "function", T(Variant), NULL, NULL, S(0), NULL, S(109199360), "/**\n * ( excerpt from http://php.net/manual/en/function.call-user-func.php )\n *\n * Call a user defined function given by the function parameter.\n *\n * @function mixed The function to be called. Class methods may also be\n * invoked statically using this function by passing\n * array($classname, $methodname) to this parameter.\n * Additionally class methods of an object instance may\n * be called by passing array($objectinstance,\n * $methodname) to this parameter.\n *\n * @return mixed Returns the function result, or FALSE on error.\n */", S(hphp_opt_call_user_func),
"call_user_func_array_async", T(Object), S(0), "function", T(Variant), NULL, NULL, S(0), "params", T(Array), NULL, NULL, S(0), NULL, S(67190784), "/**\n * ( HipHop specific )\n *\n * Same as call_user_func_array(), but returns an object immediately\n * without waiting for the function to finish. The object can be used with\n * end_user_func_async() to eventually retrieve function's return, if\n * needed.\n *\n * @function mixed The function to be called, same as in\n * call_user_func_array().\n * @params vector Parameters, same as in call_user_func_array().\n *\n * @return object An object end_user_func_async() uses for final\n * waiting of function's return.\n */",
"call_user_func_async", T(Object), S(0), "function", T(Variant), NULL, NULL, S(0), NULL, S(67715072), "/**\n * ( HipHop specific )\n *\n * Same as call_user_func(), but returns an object immediately without\n * waiting for the function to finish. The object can be used with\n * end_user_func_async() to eventually retrieve function's return, if\n * needed.\n *\n * @function mixed The function to be called, same as in\n * call_user_func_array().\n *\n * @return object An object end_user_func_async() uses for final\n * waiting of function's return.\n */",
Expand Down

0 comments on commit d5c2d1c

Please sign in to comment.