Skip to content

Commit

Permalink
[multithreading] diff facebook#2-1, implemented call_user_func_async(…
Browse files Browse the repository at this point in the history
…) with deep copy

Summary:
I'll have diff facebook#2-2 to handle global variables and ThreadLocal<T>s, including
ExecutionContext and extension's thread locals.

Test Plan:
the new unit tests, nothing existing is altered

DiffCamp Revision: 120935
Reviewed By: iproctor
CC: hphp-diffs@lists, iproctor, hzhao
Tasks:
#141384: add multi-threading support to PHP

Revert Plan:
OK
  • Loading branch information
haiping authored and macvicar committed Jun 7, 2010
1 parent ff0a9a8 commit 5cae76d
Show file tree
Hide file tree
Showing 23 changed files with 502 additions and 34 deletions.
10 changes: 10 additions & 0 deletions doc/options.compiled
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,16 @@ A pagelet server is essentially the same as local CURL, except it's more
efficient. This allows parallel execution of a web page, preparing two panels
or iframes at the same time.

Fiber {
ThreadCount = 0
}

- Fiber Asynchronous Functions

A "fiber" is a sub-thread of a request thread, mainly for implementing
call_user_func_async(). This thread count specifies totally number of physical
threads allocated for executing fiber asynchronous function calls.

= Proxy Server

Proxy {
Expand Down
9 changes: 4 additions & 5 deletions src/idl/function.idl.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@
array('function' => Variant),
VariableArguments);

f('call_user_func_array_async', Variant,
f('call_user_func_array_async', Object,
array('function' => Variant,
'params' => VariantVec));

f('call_user_func_async', Variant,
f('call_user_func_async', Object,
array('function' => Variant),
VariableArguments);

f('end_user_func_async', Variant,
array('handle' => Variant,
array('handle' => Object,
'strategy' => array(Int32, 'k_GLOBAL_STATE_OVERWRITE'),
'resolver' => array(Variant, 'null')),
VariableArguments);
'resolver' => array(Variant, 'null')));

f('forward_static_call_array', Variant,
array('function' => Variant,
Expand Down
245 changes: 245 additions & 0 deletions src/runtime/base/fiber_async_func.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
/*
+----------------------------------------------------------------------+
| HipHop for PHP |
+----------------------------------------------------------------------+
| Copyright (c) 2010 Facebook, Inc. (http://www.facebook.com) |
+----------------------------------------------------------------------+
| This source file is subject to version 3.01 of the PHP license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.php.net/license/3_01.txt |
| If you did not receive a copy of the PHP license and are unable to |
| obtain it through the world-wide-web, please send a note to |
| license@php.net so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
*/

#include <runtime/base/fiber_async_func.h>
#include <runtime/base/builtin_functions.h>
#include <runtime/base/resource_data.h>
#include <util/job_queue.h>
#include <util/lock.h>
#include <util/logger.h>

using namespace std;

namespace HPHP {
///////////////////////////////////////////////////////////////////////////////

class FiberJob : public Synchronizable {
public:
FiberJob(CVarRef function, CArrRef params)
: m_function(function), m_params(params), m_refCount(0),
m_async(true), m_ready(false), m_done(false), m_exit(false) {
}

void waitForReady() {
Lock lock(this);
while (!m_ready) wait();
}

bool isDone() {
return m_done;
}

void run(bool async) {
m_async = async;

// make local copy of m_function and m_params
if (async) {
m_function = m_function.fiberCopy();
m_params = m_params.fiberCopy();

Lock lock(this);
m_ready = true;
notify();
}

try {
m_return = f_call_user_func_array(m_function, m_params);
} catch (const ExitException &e) {
m_exit = true;
} catch (const Exception &e) {
m_fatal = String(e.getMessage());
} catch (Object e) {
m_exception = e;
} catch (...) {
m_fatal = String("unknown exception was thrown");
}

Lock lock(this);
m_done = true;
notify();
}

Variant getResults(FiberAsyncFunc::Strategy strategy, CVarRef resolver) {
{
Lock lock(this);
while (!m_done) wait();
}

if (m_exit) {
throw ExitException(0);
}
if (!m_fatal.isNull()) {
throw FatalErrorException("%s", m_fatal.data());
}
if (!m_exception.isNull()) {
if (m_async) {
throw m_exception.fiberCopy();
} else {
throw m_exception;
}
}
if (m_async) {
return m_return.fiberCopy();
}
return m_return;
}

// ref counting
void incRefCount() {
Lock lock(m_mutex);
++m_refCount;
}
void decRefCount() {
{
Lock lock(m_mutex);
--m_refCount;
}
if (m_refCount == 0) {
delete this;
}
}

private:
Variant m_function;
Array m_params;

Mutex m_mutex;
int m_refCount;

bool m_async;
bool m_ready;
bool m_done;

bool m_exit;
String m_fatal;
Object m_exception;
Variant m_return;
};

///////////////////////////////////////////////////////////////////////////////

class FiberWorker : public JobQueueWorker<FiberJob*> {
public:
virtual void doJob(FiberJob *job) {
job->run(true);
job->decRefCount();
}
};

///////////////////////////////////////////////////////////////////////////////

class FiberAsyncFuncHandle : public ResourceData {
public:
DECLARE_OBJECT_ALLOCATION(FiberAsyncFuncHandle);

FiberAsyncFuncHandle(CVarRef function, CArrRef params) {
m_job = new FiberJob(function, params);
m_job->incRefCount();
}

~FiberAsyncFuncHandle() {
m_job->decRefCount();
}

FiberJob *getJob() { return m_job;}

// overriding ResourceData
virtual const char *o_getClassName() const { return "FiberAsyncFuncHandle";}

private:
FiberJob *m_job;
};

IMPLEMENT_OBJECT_ALLOCATION(FiberAsyncFuncHandle);

///////////////////////////////////////////////////////////////////////////////
// implementing PageletServer

static JobQueueDispatcher<FiberJob*, FiberWorker> *s_dispatcher;

void FiberAsyncFunc::Restart() {
if (s_dispatcher) {
s_dispatcher->stop();
delete s_dispatcher;
s_dispatcher = NULL;
}
if (RuntimeOption::FiberCount > 0) {
s_dispatcher = new JobQueueDispatcher<FiberJob*, FiberWorker>
(RuntimeOption::FiberCount, NULL);
Logger::Info("fiber job dispatcher started");
s_dispatcher->start();
}
}

Object FiberAsyncFunc::Start(CVarRef function, CArrRef params) {
FiberAsyncFuncHandle *handle = NEW(FiberAsyncFuncHandle)(function, params);
Object ret(handle);

FiberJob *job = handle->getJob();
if (s_dispatcher) {
job->incRefCount(); // paired with worker's decRefCount()
s_dispatcher->enqueue(job);
job->waitForReady(); // until job data are copied into fiber
} else {
job->run(false); // immediately executing the job
}

return ret;
}

bool FiberAsyncFunc::Status(CObjRef func) {
FiberAsyncFuncHandle *handle = func.getTyped<FiberAsyncFuncHandle>();
return handle->getJob()->isDone();
}

Variant FiberAsyncFunc::Result(CObjRef func, Strategy strategy,
CVarRef resolver) {
FiberAsyncFuncHandle *handle = func.getTyped<FiberAsyncFuncHandle>();
return handle->getJob()->getResults(strategy, resolver);
}

///////////////////////////////////////////////////////////////////////////////

static IMPLEMENT_THREAD_LOCAL(PointerMap, s_forward_references);
static IMPLEMENT_THREAD_LOCAL(PointerMap, s_reverse_references);

void *FiberReferenceMap::Lookup(void *src) {
PointerMap::iterator iter = s_forward_references->find(src);
if (iter != s_forward_references->end()) {
return iter->second;
}
return NULL;
}

void *FiberReferenceMap::ReverseLookup(void *copy) {
PointerMap::iterator iter = s_reverse_references->find(copy);
if (iter != s_reverse_references->end()) {
return iter->second;
}
return NULL;
}

void FiberReferenceMap::Insert(void *src, void *copy) {
ASSERT(Lookup(src) == NULL);
ASSERT(copy == NULL || ReverseLookup(copy) == NULL);
(*s_forward_references)[src] = copy;
if (copy) {
(*s_reverse_references)[copy] = src;
}
}

///////////////////////////////////////////////////////////////////////////////
}
72 changes: 72 additions & 0 deletions src/runtime/base/fiber_async_func.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
+----------------------------------------------------------------------+
| HipHop for PHP |
+----------------------------------------------------------------------+
| Copyright (c) 2010 Facebook, Inc. (http://www.facebook.com) |
+----------------------------------------------------------------------+
| This source file is subject to version 3.01 of the PHP license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.php.net/license/3_01.txt |
| If you did not receive a copy of the PHP license and are unable to |
| obtain it through the world-wide-web, please send a note to |
| license@php.net so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
*/

#ifndef __HPHP_FIBER_H__
#define __HPHP_FIBER_H__

#include <runtime/base/complex_types.h>

namespace HPHP {
///////////////////////////////////////////////////////////////////////////////

class FiberAsyncFunc {
public:
// These values have to be consistent with what's in system/globals/
// constants.php, which was generated by bin/gen_constants.php
enum Strategy {
GlobalStateIgnore = 0,
GlobalStateOverwrite = 1,
GlobalStateSkip = 2,
GlobalStateResolveConflict = 3,
};

public:
static void Restart();

/**
* Create an asynchronous function call. This always returns a handle.
*/
static Object Start(CVarRef function, CArrRef params);

/**
* Query if an async call is finished. This is non-blocking and can be
* called as many times as desired.
*/
static bool Status(CObjRef func);

/**
* Get results of an async call. This is blocking until task is finished.
*/
static Variant Result(CObjRef func, Strategy strategy, CVarRef resolver);
};

///////////////////////////////////////////////////////////////////////////////

/**
* Referenced pointer (strongly bound variants and objects) mapping between
* mother thread and fiber.
*/
class FiberReferenceMap {
public:
static void *Lookup(void *src);
static void *ReverseLookup(void *copy);
static void Insert(void *src, void *copy);
};

///////////////////////////////////////////////////////////////////////////////
}

#endif // __HPHP_FIBER_H__
1 change: 0 additions & 1 deletion src/runtime/base/resource_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ String ResourceData::t___tostring() {
}

ObjectData* ResourceData::cloneImpl() {
ASSERT(false);
return NULL;
}

Expand Down
2 changes: 2 additions & 0 deletions src/runtime/base/runtime_option.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ std::string RuntimeOption::ServerPrimaryIP;
int RuntimeOption::ServerPort;
int RuntimeOption::ServerThreadCount = 50;
int RuntimeOption::PageletServerThreadCount = 0;
int RuntimeOption::FiberCount = 0;
int RuntimeOption::RequestTimeoutSeconds = 0;
int RuntimeOption::RequestMemoryMaxBytes = 0;
int RuntimeOption::ImageMemoryMaxBytes = 0;
Expand Down Expand Up @@ -629,6 +630,7 @@ void RuntimeOption::Load(Hdf &config) {
}
{
PageletServerThreadCount = config["PageletServer.ThreadCount"].getInt32(0);
FiberCount = config["Fiber.ThreadCount"].getInt32(0);
}
{
Hdf content = config["StaticFile"];
Expand Down
1 change: 1 addition & 0 deletions src/runtime/base/runtime_option.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class RuntimeOption {
static int ServerPort;
static int ServerThreadCount;
static int PageletServerThreadCount;
static int FiberCount;
static int RequestTimeoutSeconds;
static int RequestMemoryMaxBytes;
static int ImageMemoryMaxBytes;
Expand Down
Loading

0 comments on commit 5cae76d

Please sign in to comment.