Skip to content

Commit

Permalink
[multithreading] diff facebook#2-2, implemented fiberMarshal/Unmarsha…
Browse files Browse the repository at this point in the history
…l() coding

Summary:
So i'm correctly handling references and objects now. I'll need diff facebook#2-3 to
have globals, which really isn't different from params with all references and
objects. So, all marshaling unmarshaling code are done, just hooking up with
global states and thread locals.

Also fixed call_user_func() to respect references!

Test Plan:
the new unit tests

DiffCamp Revision: 122452
Reviewed By: iproctor
Commenters: qixin
CC: hphp-diffs@lists, iproctor, hzhao, qixin
Tasks:
#141384: add multi-threading support to PHP

Revert Plan:
OK
  • Loading branch information
haiping authored and macvicar committed Jun 12, 2010
1 parent 40b7cac commit 969132b
Show file tree
Hide file tree
Showing 22 changed files with 528 additions and 108 deletions.
6 changes: 3 additions & 3 deletions src/idl/function.idl.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@

f('call_user_func', Variant,
array('function' => Variant),
VariableArguments);
ReferenceVariableArguments);

f('call_user_func_array_async', Object,
array('function' => Variant,
'params' => VariantVec));

f('call_user_func_async', Object,
array('function' => Variant),
VariableArguments);
ReferenceVariableArguments);

f('end_user_func_async', Variant,
array('handle' => Object,
Expand All @@ -41,7 +41,7 @@

f('forward_static_call', Variant,
array('function' => Variant),
VariableArguments);
ReferenceVariableArguments);

f('create_function', String,
array('args' => String,
Expand Down
182 changes: 130 additions & 52 deletions src/runtime/base/fiber_async_func.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,54 @@
using namespace std;

namespace HPHP {
///////////////////////////////////////////////////////////////////////////////
/**
* This class provides synchronization between request thread and fiber thread
* so to make sure when fiber job finishes after request is finished, which
* means end_user_func_async() is forgotten, fiber job will not touch request
* thread's data. There is no need to restore any states in this case.
*/
class FiberAsyncFuncData {
public:
FiberAsyncFuncData() : m_reqId(0) {}
Mutex m_mutex;
int64 m_reqId;
};
static IMPLEMENT_THREAD_LOCAL(FiberAsyncFuncData, s_fiber_data);

void FiberAsyncFunc::OnRequestExit() {
Lock lock(s_fiber_data->m_mutex);
++s_fiber_data->m_reqId;
};

///////////////////////////////////////////////////////////////////////////////

class FiberJob : public Synchronizable {
public:
FiberJob(CVarRef function, CArrRef params)
: m_function(function), m_params(params), m_refCount(0),
m_async(true), m_ready(false), m_done(false), m_exit(false) {
FiberJob(FiberAsyncFuncData *thread, CVarRef function, CArrRef params)
: m_thread(thread),
m_unmarshaled_function(NULL), m_unmarshaled_params(NULL),
m_function(function), m_params(params), m_refCount(0),
m_async(true), m_ready(false), m_done(false), m_delete(false),
m_exit(false) {
m_reqId = m_thread->m_reqId;
}

~FiberJob() {
}

void cleanup() {
if (m_unmarshaled_function) {
Lock lock(m_thread->m_mutex);
if (m_thread->m_reqId == m_reqId) {
DELETE(Variant)(m_unmarshaled_function);
DELETE(Variant)(m_unmarshaled_params);
m_unmarshaled_function = NULL;
m_unmarshaled_params = NULL;
}
// else not safe to touch these members because thread has moved to
// next request after deleting/collecting all these dangling ones
}
}

void waitForReady() {
Expand All @@ -42,13 +83,22 @@ class FiberJob : public Synchronizable {
return m_done;
}

bool canDelete() {
return m_delete || m_refCount == 1;
}

void run(bool async) {
m_async = async;

// make local copy of m_function and m_params
if (async) {
m_function = m_function.fiberCopy();
m_params = m_params.fiberCopy();
m_unmarshaled_function = NEW(Variant)();
*m_unmarshaled_function = m_function;
m_function = m_function.fiberMarshal(m_refMap);

m_unmarshaled_params = NEW(Variant)();
*m_unmarshaled_params = m_params;
m_params = m_params.fiberMarshal(m_refMap);

Lock lock(this);
m_ready = true;
Expand All @@ -72,31 +122,59 @@ class FiberJob : public Synchronizable {
notify();
}

Variant getResults(FiberAsyncFunc::Strategy strategy, CVarRef resolver) {
{
Lock lock(this);
while (!m_done) wait();
}

Variant syncGetResults() {
if (m_exit) {
throw ExitException(0);
}
if (!m_fatal.isNull()) {
throw FatalErrorException("%s", m_fatal.data());
}
if (!m_exception.isNull()) {
if (m_async) {
throw m_exception.fiberCopy();
} else {
throw m_exception;
}
}
if (m_async) {
return m_return.fiberCopy();
throw m_exception;
}
return m_return;
}

Variant getResults(FiberAsyncFunc::Strategy strategy, CVarRef resolver) {
if (!m_async) return syncGetResults();

{
Lock lock(this);
while (!m_done) wait();
}

// these are needed in case they have references or objects
if (!m_refMap.empty()) {
m_function.fiberUnmarshal(m_refMap);
m_params.fiberUnmarshal(m_refMap);
}

Object unmarshaled_exception =
m_exception.fiberUnmarshal(m_refMap);
Variant unmarshaled_return =
m_return.fiberUnmarshal(m_refMap);

try {
if (m_exit) {
throw ExitException(0);
}
if (!m_fatal.isNull()) {
throw FatalErrorException("%s", m_fatal.data());
}
if (!m_exception.isNull()) {
throw unmarshaled_exception;
}
} catch (...) {
cleanup();
m_delete = true;
throw;
}

cleanup();
m_delete = true;
return unmarshaled_return;
}

// ref counting
void incRefCount() {
Lock lock(m_mutex);
Expand All @@ -113,6 +191,15 @@ class FiberJob : public Synchronizable {
}

private:
FiberAsyncFuncData *m_thread;

// holding references to them, so we can later restore their states safely
Variant *m_unmarshaled_function;
Variant *m_unmarshaled_params;

FiberReferenceMap m_refMap;
int64 m_reqId;

Variant m_function;
Array m_params;

Expand All @@ -122,6 +209,7 @@ class FiberJob : public Synchronizable {
bool m_async;
bool m_ready;
bool m_done;
bool m_delete;

bool m_exit;
String m_fatal;
Expand All @@ -133,10 +221,31 @@ class FiberJob : public Synchronizable {

class FiberWorker : public JobQueueWorker<FiberJob*> {
public:
~FiberWorker() {
cleanup();
}

virtual void doJob(FiberJob *job) {
job->run(true);
job->decRefCount();
m_jobs.push_back(job);
cleanup();
}

void cleanup() {
list<FiberJob*>::iterator iter = m_jobs.begin();
while (iter != m_jobs.end()) {
FiberJob *job = *iter;
if (job->canDelete()) {
job->decRefCount();
iter = m_jobs.erase(iter);
continue;
}
++iter;
}
}

private:
list<FiberJob*> m_jobs;
};

///////////////////////////////////////////////////////////////////////////////
Expand All @@ -146,7 +255,7 @@ class FiberAsyncFuncHandle : public ResourceData {
DECLARE_OBJECT_ALLOCATION(FiberAsyncFuncHandle);

FiberAsyncFuncHandle(CVarRef function, CArrRef params) {
m_job = new FiberJob(function, params);
m_job = new FiberJob(s_fiber_data.get(), function, params);
m_job->incRefCount();
}

Expand All @@ -166,7 +275,6 @@ class FiberAsyncFuncHandle : public ResourceData {
IMPLEMENT_OBJECT_ALLOCATION(FiberAsyncFuncHandle);

///////////////////////////////////////////////////////////////////////////////
// implementing PageletServer

static JobQueueDispatcher<FiberJob*, FiberWorker> *s_dispatcher;

Expand Down Expand Up @@ -211,35 +319,5 @@ Variant FiberAsyncFunc::Result(CObjRef func, Strategy strategy,
return handle->getJob()->getResults(strategy, resolver);
}

///////////////////////////////////////////////////////////////////////////////

static IMPLEMENT_THREAD_LOCAL(PointerMap, s_forward_references);
static IMPLEMENT_THREAD_LOCAL(PointerMap, s_reverse_references);

void *FiberReferenceMap::Lookup(void *src) {
PointerMap::iterator iter = s_forward_references->find(src);
if (iter != s_forward_references->end()) {
return iter->second;
}
return NULL;
}

void *FiberReferenceMap::ReverseLookup(void *copy) {
PointerMap::iterator iter = s_reverse_references->find(copy);
if (iter != s_reverse_references->end()) {
return iter->second;
}
return NULL;
}

void FiberReferenceMap::Insert(void *src, void *copy) {
ASSERT(Lookup(src) == NULL);
ASSERT(copy == NULL || ReverseLookup(copy) == NULL);
(*s_forward_references)[src] = copy;
if (copy) {
(*s_reverse_references)[copy] = src;
}
}

///////////////////////////////////////////////////////////////////////////////
}
31 changes: 15 additions & 16 deletions src/runtime/base/fiber_async_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,27 @@ class FiberAsyncFunc {
// These values have to be consistent with what's in system/globals/
// constants.php, which was generated by bin/gen_constants.php
enum Strategy {
GlobalStateIgnore = 0,
GlobalStateIgnore = 0,
GlobalStateOverwrite = 1,
GlobalStateSkip = 2,
GlobalStateResolveConflict = 3,
GlobalStateSkip = 2,
GlobalStateResolve = 3,
};

public:
/**
* Restart fiber job dispatcher. This is needed for any fiber job to run.
* Otherwise, everything runs in single thread.
*/
static void Restart();

/**
* hphp_session_exit() tells fiber engine that current request is complete,
* and do not try to access any leftover SmartAllocated pointers.
*/
static void OnRequestExit();

public:

/**
* Create an asynchronous function call. This always returns a handle.
*/
Expand All @@ -53,19 +65,6 @@ class FiberAsyncFunc {
static Variant Result(CObjRef func, Strategy strategy, CVarRef resolver);
};

///////////////////////////////////////////////////////////////////////////////

/**
* Referenced pointer (strongly bound variants and objects) mapping between
* mother thread and fiber.
*/
class FiberReferenceMap {
public:
static void *Lookup(void *src);
static void *ReverseLookup(void *copy);
static void Insert(void *src, void *copy);
};

///////////////////////////////////////////////////////////////////////////////
}

Expand Down
48 changes: 48 additions & 0 deletions src/runtime/base/fiber_reference_map.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
+----------------------------------------------------------------------+
| HipHop for PHP |
+----------------------------------------------------------------------+
| Copyright (c) 2010 Facebook, Inc. (http://www.facebook.com) |
+----------------------------------------------------------------------+
| This source file is subject to version 3.01 of the PHP license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.php.net/license/3_01.txt |
| If you did not receive a copy of the PHP license and are unable to |
| obtain it through the world-wide-web, please send a note to |
| license@php.net so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
*/

#include <runtime/base/fiber_reference_map.h>

namespace HPHP {
///////////////////////////////////////////////////////////////////////////////

void FiberReferenceMap::insert(void *src, void *copy) {
ASSERT(lookup(src) == NULL);
ASSERT(copy == NULL || reverseLookup(copy) == NULL);
m_forward_references[src] = copy;
if (copy) {
m_reverse_references[copy] = src;
}
}

void *FiberReferenceMap::lookup(void *src) {
PointerMap::iterator iter = m_forward_references.find(src);
if (iter != m_forward_references.end()) {
return iter->second;
}
return NULL;
}

void *FiberReferenceMap::reverseLookup(void *copy) {
PointerMap::iterator iter = m_reverse_references.find(copy);
if (iter != m_reverse_references.end()) {
return iter->second;
}
return NULL;
}

///////////////////////////////////////////////////////////////////////////////
}
Loading

0 comments on commit 969132b

Please sign in to comment.