Skip to content

Commit

Permalink
Allow internal Threaded structures to outlive their creators, closes p…
Browse files Browse the repository at this point in the history
  • Loading branch information
dktapps committed Mar 27, 2020
1 parent 39e8ad1 commit f5f17d2
Show file tree
Hide file tree
Showing 16 changed files with 460 additions and 339 deletions.
4 changes: 2 additions & 2 deletions classes/pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,13 @@ PHP_METHOD(Pool, collect) {
RETURN_LONG(0);

ZEND_HASH_FOREACH_VAL(Z_ARRVAL_P(workers), worker) {
pthreads_object_t *thread =
pthreads_zend_object_t *thread =
PTHREADS_FETCH_FROM(Z_OBJ_P(worker));
if (!ZEND_NUM_ARGS())
PTHREADS_WORKER_COLLECTOR_INIT(call, Z_OBJ_P(worker));
collectable += pthreads_stack_collect(
&thread->std,
thread->stack,
thread->ts_obj->stack,
&call,
pthreads_worker_running_function,
pthreads_worker_collect_function);
Expand Down
14 changes: 7 additions & 7 deletions classes/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ zend_function_entry pthreads_thread_methods[] = {
$options should be a mask of inheritance constants */
PHP_METHOD(Thread, start)
{
pthreads_object_t* thread = PTHREADS_FETCH;
pthreads_zend_object_t* thread = PTHREADS_FETCH;
zend_long options = PTHREADS_INHERIT_ALL;

if (ZEND_NUM_ARGS()) {
if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &options) != SUCCESS) {
return;
}

thread->options = options;
thread->ts_obj->options = options;
}

RETURN_BOOL(pthreads_start(thread));
Expand All @@ -101,7 +101,7 @@ PHP_METHOD(Thread, start)
Will return true if a Thread has been started */
PHP_METHOD(Thread, isStarted)
{
pthreads_object_t* thread = PTHREADS_FETCH;
pthreads_object_t* thread = PTHREADS_FETCH_TS;

RETURN_BOOL(pthreads_monitor_check(thread->monitor, PTHREADS_MONITOR_STARTED));
} /* }}} */
Expand All @@ -110,7 +110,7 @@ PHP_METHOD(Thread, isStarted)
Will return true if a Thread has been joined already */
PHP_METHOD(Thread, isJoined)
{
pthreads_object_t* thread = PTHREADS_FETCH;
pthreads_object_t* thread = PTHREADS_FETCH_TS;

RETURN_BOOL(pthreads_monitor_check(thread->monitor, PTHREADS_MONITOR_JOINED));
} /* }}} */
Expand All @@ -119,7 +119,7 @@ PHP_METHOD(Thread, isJoined)
Will return a boolean indication of success */
PHP_METHOD(Thread, join)
{
pthreads_object_t* thread = PTHREADS_FETCH;
pthreads_zend_object_t* thread = PTHREADS_FETCH;

RETURN_BOOL(pthreads_join(thread));
} /* }}} */
Expand All @@ -128,7 +128,7 @@ PHP_METHOD(Thread, join)
Will return the identifier of the referenced Thread */
PHP_METHOD(Thread, getThreadId)
{
ZVAL_LONG(return_value, (PTHREADS_FETCH_FROM(Z_OBJ_P(getThis())))->local.id);
ZVAL_LONG(return_value, (PTHREADS_FETCH_TS_FROM(Z_OBJ_P(getThis())))->local.id);
} /* }}} */

/* {{{ proto long Thread::getCurrentThreadId()
Expand All @@ -149,7 +149,7 @@ PHP_METHOD(Thread, getCurrentThread)
Will return the identifier of the thread ( or process ) that created the referenced Thread */
PHP_METHOD(Thread, getCreatorId)
{
ZVAL_LONG(return_value, (PTHREADS_FETCH_FROM(Z_OBJ_P(getThis())))->creator.id);
ZVAL_LONG(return_value, (PTHREADS_FETCH_TS_FROM(Z_OBJ_P(getThis())))->creator.id);
} /* }}} */
# endif
#endif
12 changes: 6 additions & 6 deletions classes/threaded.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ PHP_METHOD(Threaded, getRefCount) { RETURN_LONG(Z_REFCOUNT_P(getThis())); } /*
Otherwise returns a boolean indication of success */
PHP_METHOD(Threaded, wait)
{
pthreads_object_t* threaded = PTHREADS_FETCH;
pthreads_object_t* threaded = PTHREADS_FETCH_TS;
zend_long timeout = 0L;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "|l", &timeout)==SUCCESS) {
Expand All @@ -141,7 +141,7 @@ PHP_METHOD(Threaded, wait)
Will return a boolean indication of success */
PHP_METHOD(Threaded, notify)
{
pthreads_object_t* threaded = PTHREADS_FETCH;
pthreads_object_t* threaded = PTHREADS_FETCH_TS;

RETURN_BOOL(pthreads_monitor_notify(threaded->monitor) == SUCCESS);
} /* }}} */
Expand All @@ -151,7 +151,7 @@ PHP_METHOD(Threaded, notify)
Will return a boolean indication of success */
PHP_METHOD(Threaded, notifyOne)
{
pthreads_object_t* threaded = PTHREADS_FETCH;
pthreads_object_t* threaded = PTHREADS_FETCH_TS;

RETURN_BOOL(pthreads_monitor_notify_one(threaded->monitor) == SUCCESS);
} /* }}} */
Expand All @@ -160,7 +160,7 @@ PHP_METHOD(Threaded, notifyOne)
Will return true while the referenced Threaded is being executed by a Worker */
PHP_METHOD(Threaded, isRunning)
{
pthreads_object_t* threaded = PTHREADS_FETCH;
pthreads_object_t* threaded = PTHREADS_FETCH_TS;

RETURN_BOOL(pthreads_monitor_check(threaded->monitor, PTHREADS_MONITOR_RUNNING));
} /* }}} */
Expand All @@ -169,7 +169,7 @@ PHP_METHOD(Threaded, isRunning)
Will return true if the referenced Threaded suffered fatal errors or uncaught exceptions */
PHP_METHOD(Threaded, isTerminated)
{
pthreads_object_t* threaded = PTHREADS_FETCH;
pthreads_object_t* threaded = PTHREADS_FETCH_TS;

RETURN_BOOL(pthreads_monitor_check(threaded->monitor, PTHREADS_MONITOR_ERROR));
} /* }}} */
Expand All @@ -182,7 +182,7 @@ PHP_METHOD(Threaded, synchronized)
pthreads_call_t call = PTHREADS_CALL_EMPTY;
uint argc = 0;
zval *argv = NULL;
pthreads_object_t* threaded= PTHREADS_FETCH;
pthreads_object_t* threaded= PTHREADS_FETCH_TS;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "f|+", &call.fci, &call.fcc, &argv, &argc) != SUCCESS) {
return;
Expand Down
34 changes: 17 additions & 17 deletions classes/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ zend_function_entry pthreads_worker_methods[] = {
Pushes an item onto the stack, returns the size of stack */
PHP_METHOD(Worker, stack)
{
pthreads_object_t* thread = PTHREADS_FETCH;
pthreads_zend_object_t* thread = PTHREADS_FETCH;
zval *work;

if (!PTHREADS_IN_CREATOR(thread) || PTHREADS_IS_CONNECTION(thread)) {
if (!PTHREADS_IN_CREATOR(thread)) {
zend_throw_exception_ex(spl_ce_RuntimeException,
0, "only the creator of this %s may call stack",
thread->std.ce->name->val);
Expand All @@ -105,34 +105,34 @@ PHP_METHOD(Worker, stack)
return;
}

RETURN_LONG(pthreads_stack_add(thread->stack, work));
RETURN_LONG(pthreads_stack_add(thread->ts_obj->stack, work));
} /* }}} */

/* {{{ proto Collectable Worker::unstack()
Removes the first item from the stack */
PHP_METHOD(Worker, unstack)
{
pthreads_object_t* thread = PTHREADS_FETCH;
pthreads_zend_object_t* thread = PTHREADS_FETCH;

if (zend_parse_parameters_none() != SUCCESS) {
return;
}

if (!PTHREADS_IN_CREATOR(thread) || PTHREADS_IS_CONNECTION(thread)) {
if (!PTHREADS_IN_CREATOR(thread)) {
zend_throw_exception_ex(spl_ce_RuntimeException,
0, "only the creator of this %s may call unstack",
thread->std.ce->name->val);
return;
}

pthreads_stack_del(thread->stack, return_value);
pthreads_stack_del(thread->ts_obj->stack, return_value);
}

/* {{{ proto int Worker::getStacked()
Returns the current size of the stack */
PHP_METHOD(Worker, getStacked)
{
pthreads_object_t* thread = PTHREADS_FETCH;
pthreads_object_t* thread = PTHREADS_FETCH_TS;

RETURN_LONG(pthreads_stack_size(thread->stack));
}
Expand All @@ -141,7 +141,7 @@ PHP_METHOD(Worker, getStacked)
Will return true if the Worker has been shutdown */
PHP_METHOD(Worker, isShutdown)
{
pthreads_object_t* thread = PTHREADS_FETCH;
pthreads_object_t* thread = PTHREADS_FETCH_TS;

RETURN_BOOL(pthreads_monitor_check(thread->monitor, PTHREADS_MONITOR_JOINED));
} /* }}} */
Expand All @@ -150,7 +150,7 @@ PHP_METHOD(Worker, isShutdown)
Will wait for execution of all Stackables to complete before shutting down the Worker */
PHP_METHOD(Worker, shutdown)
{
pthreads_object_t* thread = PTHREADS_FETCH;
pthreads_zend_object_t* thread = PTHREADS_FETCH;

RETURN_BOOL(pthreads_join(thread));
} /* }}} */
Expand All @@ -159,27 +159,27 @@ PHP_METHOD(Worker, shutdown)
Will return the identifier of the referenced Worker */
PHP_METHOD(Worker, getThreadId)
{
ZVAL_LONG(return_value, (PTHREADS_FETCH_FROM(Z_OBJ_P(getThis())))->local.id);
ZVAL_LONG(return_value, PTHREADS_FETCH_TS->local.id);
} /* }}} */

/* {{{ proto long Worker::getCreatorId()
Will return the identifier of the thread ( or process ) that created the referenced Worker */
PHP_METHOD(Worker, getCreatorId)
{
ZVAL_LONG(return_value, (PTHREADS_FETCH_FROM(Z_OBJ_P(getThis())))->creator.id);
ZVAL_LONG(return_value, PTHREADS_FETCH_TS->creator.id);
} /* }}} */

/* {{{ */
static zend_bool pthreads_worker_running_function(zend_object *std, zval *value) {
pthreads_object_t *worker = PTHREADS_FETCH_FROM(std),
pthreads_object_t *worker = PTHREADS_FETCH_TS_FROM(std),
*running = NULL,
*checking = NULL;
zend_bool result = 0;

if (pthreads_monitor_lock(worker->monitor)) {
if (*worker->running) {
running = PTHREADS_FETCH_FROM(*worker->running);
checking = PTHREADS_FETCH_FROM(Z_OBJ_P(value));
running = PTHREADS_FETCH_TS_FROM(*worker->running);
checking = PTHREADS_FETCH_TS_FROM(Z_OBJ_P(value));

if (running->monitor == checking->monitor)
result = 1;
Expand Down Expand Up @@ -232,7 +232,7 @@ PHP_METHOD(Worker, collector) {
/* {{{ proto int Worker::collect([callable collector]) */
PHP_METHOD(Worker, collect)
{
pthreads_object_t *thread = PTHREADS_FETCH;
pthreads_zend_object_t *thread = PTHREADS_FETCH;
pthreads_call_t call = PTHREADS_CALL_EMPTY;

if (!ZEND_NUM_ARGS()) {
Expand All @@ -241,14 +241,14 @@ PHP_METHOD(Worker, collect)
return;
}

if (!PTHREADS_IN_CREATOR(thread) || PTHREADS_IS_CONNECTION(thread)) {
if (!PTHREADS_IN_CREATOR(thread)) {
zend_throw_exception_ex(spl_ce_RuntimeException, 0,
"only the creator of this %s may call collect",
thread->std.ce->name->val);
return;
}

RETVAL_LONG(pthreads_stack_collect(&thread->std, thread->stack, &call, pthreads_worker_running_function, pthreads_worker_collect_function));
RETVAL_LONG(pthreads_stack_collect(&thread->std, thread->ts_obj->stack, &call, pthreads_worker_running_function, pthreads_worker_collect_function));

if (!ZEND_NUM_ARGS()) {
PTHREADS_WORKER_COLLECTOR_DTOR(call);
Expand Down
4 changes: 2 additions & 2 deletions php_pthreads.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ static inline zend_bool pthreads_verify_type(zend_execute_data *execute_data, zv
}

if (ZEND_TYPE_IS_CLASS(info->type)) {
pthreads_object_t *threaded;
pthreads_zend_object_t *threaded;

if (!var ||
Z_TYPE_P(var) != IS_OBJECT ||
Expand Down Expand Up @@ -745,7 +745,7 @@ PHP_MINIT_FUNCTION(pthreads)

memcpy(&pthreads_handlers, zend_handlers, sizeof(zend_object_handlers));

pthreads_handlers.offset = XtOffsetOf(pthreads_object_t, std);
pthreads_handlers.offset = XtOffsetOf(pthreads_zend_object_t, std);

pthreads_handlers.free_obj = pthreads_base_free;
pthreads_handlers.cast_object = pthreads_cast_object;
Expand Down
47 changes: 5 additions & 42 deletions src/globals.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ struct _pthreads_globals pthreads_globals;
# define PTHREADS_G () ? : (void***) &pthreads_globals
#endif

extern int pthreads_connect(pthreads_object_t* source, pthreads_object_t* destination);

/* {{{ */
zend_bool pthreads_globals_init(){
if (!PTHREADS_G(init)&&!PTHREADS_G(failed)) {
Expand Down Expand Up @@ -75,8 +73,8 @@ void pthreads_globals_unlock() {
} /* }}} */

/* {{{ */
void* pthreads_globals_object_alloc(size_t length) {
void *bucket = (void*) ecalloc(1, length);
pthreads_zend_object_t* pthreads_globals_object_alloc(size_t length) {
pthreads_zend_object_t *bucket = (pthreads_zend_object_t*) ecalloc(1, length);

if (pthreads_globals_lock()) {
zend_hash_index_update_ptr(
Expand All @@ -91,59 +89,24 @@ void* pthreads_globals_object_alloc(size_t length) {
} /* }}} */

/* {{{ */
zend_bool pthreads_globals_object_connect(zend_ulong address, zend_class_entry *ce, zval *object) {
zend_bool pthreads_globals_object_valid(pthreads_zend_object_t *address) {
zend_bool valid = 0;

if (!address)
return valid;

if (pthreads_globals_lock()) {
if (zend_hash_index_exists(&PTHREADS_G(objects), address)) {
if (zend_hash_index_exists(&PTHREADS_G(objects), (zend_ulong) address)) {
valid = 1;
}
pthreads_globals_unlock();
}

if (valid) {
pthreads_object_t *pthreads = (pthreads_object_t*) address;

/*
* This can be done outside of a critical section because there are only two possibilities:
* We own the object: no possible pathway to fault (read free'd memory)
* We don't own the object: possibly pathway to fault whether we use critical section or not:
* We use a critical section: we create the connection knowing that address cannot be freed while doing so
* however, as soon as we leave the section, and before the conext that called this routine can reference the connection
* object the creating context may have free'd the object.
* We don't use a critical section: the object may be freed while we are creating the connection, causing a fault.
*
* As always, it's necessary for the programmer to retain the appropriate references so that this does not fault, creating connections
* in a critical section would be unecessarily slow, not to mention recursively lock mutex (which is fine, but not ideal).
*/

if (PTHREADS_IN_CREATOR(pthreads)) {
/* we own the object in this context */
ZVAL_OBJ(object, &pthreads->std);
Z_ADDREF_P(object);
} else {
/* we do not own the object, create a connection */
if (!ce) {
/* we may not know the class, can't use ce directly
from zend_object because it is from another context */
PTHREADS_ZG(hard_copy_interned_strings) = 1;
ce = pthreads_prepared_entry(pthreads, pthreads->std.ce);
PTHREADS_ZG(hard_copy_interned_strings) = 0;
}
object_init_ex(object, ce);
pthreads_connect(pthreads, PTHREADS_FETCH_FROM(Z_OBJ_P(object)));
}
}

return valid;
} /* }}} */


/* {{{ */
zend_bool pthreads_globals_object_delete(void *address) {
zend_bool pthreads_globals_object_delete(pthreads_zend_object_t *address) {
zend_bool deleted = 0;

if (!address)
Expand Down
6 changes: 3 additions & 3 deletions src/globals.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ ZEND_EXTERN_MODULE_GLOBALS(pthreads)
/* }}} */

/* {{{ */
zend_bool pthreads_globals_object_delete(void *address); /* }}} */
zend_bool pthreads_globals_object_valid(pthreads_zend_object_t *address); /* }}} */

/* {{{ */
zend_bool pthreads_globals_object_connect(zend_ulong address, zend_class_entry *ce, zval *object); /* }}} */
zend_bool pthreads_globals_object_delete(pthreads_zend_object_t *address); /* }}} */

/* {{{ */
void* pthreads_globals_object_alloc(size_t length); /* }}} */
pthreads_zend_object_t* pthreads_globals_object_alloc(size_t length); /* }}} */

/* {{{ initialize (true) globals */
zend_bool pthreads_globals_init(); /* }}} */
Expand Down
Loading

0 comments on commit f5f17d2

Please sign in to comment.