Skip to content

Commit

Permalink
Merge branch '2.0.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
byjg committed Aug 2, 2016
2 parents 64a44ad + 4cf3f99 commit ce5e3e9
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 81 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -2,3 +2,4 @@ nbproject/private/
composer.lock
vendor
config/cacheconfig.php
.idea/
3 changes: 2 additions & 1 deletion composer.json
Expand Up @@ -8,7 +8,8 @@
}
],
"require": {
"byjg/cache-engine": "~1.0"
"byjg/cache-engine": "~1.0",
"php": ">5.2.7"
},
"suggest": {
"ext-pcntl": "*",
Expand Down
33 changes: 19 additions & 14 deletions example.php
@@ -1,29 +1,34 @@
<?php
require_once('vendor/autoload.php');

// Method to be executed in a thread
function Foo($t)
class Foo
{
echo "Starting thread #$t" . PHP_EOL;

sleep(1 * rand(1, 5));
for ($i = 0; $i < 10; $i++) {
echo "Hello from thread #$t, i=$i" . PHP_EOL;
sleep(1);
// Method to be executed in a thread
public function bar($t)
{
echo "Starting thread #$t" . PHP_EOL;

sleep(1 * rand(1, 5));
for ($i = 0; $i < 10; $i++) {
echo "Hello from thread #$t, i=$i" . PHP_EOL;
sleep(1);
}
echo "Ending thread #$t" . PHP_EOL;

// Note: this line below require the file "config/cacheconfig.php" exists
return "$t: [[[[[[" . time() . "]]]]]]";
}
echo "Ending thread #$t" . PHP_EOL;

// Note: this line below require the file "config/cacheconfig.php" exists
return "$t: [[[[[[" . time() . "]]]]]]";
}

try {
$t = array();

$foo = new Foo();

// Create the threads
for ($i = 0; $i < 10; $i++) {
// Create a new instance of the Thread class, pointing to "Foo" function
$thr = new \ByJG\PHPThread\Thread('Foo');
$thr = new \ByJG\PHPThread\Thread([$foo, 'bar']);

// Started the method "Foo" in a tread
$thr->execute($i);
Expand All @@ -44,7 +49,7 @@ function Foo($t)
foreach ($t as $thread) {
echo "Result: " . $thread->getResult() . "\n";
}

} catch (Exception $e) {
echo 'Exception: ' . $e . PHP_EOL;
}
27 changes: 16 additions & 11 deletions example_pool.php
Expand Up @@ -2,29 +2,34 @@

require_once('vendor/autoload.php');

// Method to be executed in a thread
function Foo($t)
class Foo
{
echo "Starting thread #$t" . PHP_EOL;;
sleep(1 * rand(1, 5));
for ($i = 0; $i < 10; $i++) {
echo "Hello from thread #$t, i=$i" . PHP_EOL;
sleep(1);
// Method to be executed in a thread
public function bar($t)
{
echo "Starting thread #$t" . PHP_EOL;
sleep(1 * rand(1, 5));
for ($i = 0; $i < 10; $i++) {
echo "Hello from thread #$t, i=$i" . PHP_EOL;
sleep(1);
}
echo "Ending thread #$t" . PHP_EOL;

return uniqid("Thread_{$t}_");
}
echo "Ending thread #$t" . PHP_EOL;

return uniqid("Thread_{$t}_");
}


try {
// Create a instance of the ThreadPool
$threadPool = new \ByJG\PHPThread\ThreadPool();

$foo = new Foo();

// Create the threads
for ($i = 0; $i < 10; $i++) {
// Queue a worker pointing to "Foo" function and pass the required parameters
$threadPool->queueWorker('Foo', [$i]);
$threadPool->queueWorker([$foo, 'bar'], [$i]);
}

// Starts all the threads in the queue
Expand Down
56 changes: 35 additions & 21 deletions src/Handler/ForkHandler.php
Expand Up @@ -16,9 +16,9 @@
*/
class ForkHandler implements ThreadInterface
{
protected $_threadKey;
protected $threadKey;
private $callable;
private $_pid;
private $pid;

/**
* constructor method
Expand Down Expand Up @@ -54,33 +54,41 @@ public function setCallable(callable $callable)
*/
public function execute()
{
$this->_threadKey = 'thread_' . rand(1000, 9999) . rand(1000, 9999) . rand(1000, 9999) . rand(1000, 9999);
$this->threadKey = 'thread_' . rand(1000, 9999) . rand(1000, 9999) . rand(1000, 9999) . rand(1000, 9999);

if (($this->_pid = pcntl_fork()) == -1) {
if (($this->pid = pcntl_fork()) == -1) {
throw new RuntimeException('Couldn\'t fork the process');
}

if ($this->_pid) {
if ($this->pid) {
// Parent
//pcntl_wait($status); //Protect against Zombie children
} else {
// Child.
pcntl_signal(SIGTERM, array($this, 'signalHandler'));
$args = func_get_args();
if (!empty($args)) {
$return = call_user_func_array($this->callable, $args);
} else {
$return = call_user_func($this->callable);

$callable = $this->callable;
if (!is_string($callable)) {
$callable = (array) $this->callable;
}

if (!is_null($return)) {
$this->saveResult($return);
try {
$return = call_user_func_array($callable, (array)$args);

if (!is_null($return)) {
$this->saveResult($return);
}
// Executed only in PHP 7, will not match in PHP 5.x
} catch (\Throwable $t) {
$this->saveResult($t);
// Executed only in PHP 5. Remove when PHP 5.x is no longer necessary.
} catch (\Exception $ex) {
$this->saveResult($ex);
}

exit(0);
}

// Parent.
}

/**
Expand All @@ -91,27 +99,33 @@ public function execute()
protected function saveResult($object)
{
$cache = CacheContext::factory('phpthread');
$cache->set($this->_threadKey, $object);
$cache->set($this->threadKey, $object);
}

/**
* Get the thread result from the shared memory block and erase it
*
* @return mixed
* @throws \Error
* @throws object
*/
public function getResult()
{
if (is_null($this->_threadKey)) {
if (is_null($this->threadKey)) {
return null;
}

$key = $this->_threadKey;
$this->_threadKey = null;
$key = $this->threadKey;
$this->threadKey = null;

$cache = CacheContext::factory('phpthread');
$result = $cache->get($key);
$cache->release($key);

if (is_object($result) && (is_subclass_of($result, '\\Error') || is_subclass_of($result, '\\Exception'))) {
throw $result;
}

return $result;
}

Expand All @@ -124,11 +138,11 @@ public function getResult()
public function stop($signal = SIGKILL, $wait = false)
{
if ($this->isAlive()) {
posix_kill($this->_pid, $signal);
posix_kill($this->pid, $signal);

$status = null;
if ($wait) {
pcntl_waitpid($this->_pid, $status);
pcntl_waitpid($this->pid, $status);
}
}
}
Expand All @@ -140,7 +154,7 @@ public function stop($signal = SIGKILL, $wait = false)
public function isAlive()
{
$status = null;
return (pcntl_waitpid($this->_pid, $status, WNOHANG) === 0);
return (pcntl_waitpid($this->pid, $status, WNOHANG) === 0);
}

/**
Expand All @@ -158,6 +172,6 @@ private function signalHandler($signal)

public function waitFinish()
{
while ($this->isAlive()) {}
pcntl_wait($status);
}
}
41 changes: 20 additions & 21 deletions src/Handler/PThreadHandler.php
Expand Up @@ -20,8 +20,6 @@ class PThreadHandler extends \Thread implements ThreadInterface

private $result;

private $hasError;

/**
* Thread constructor.
*/
Expand Down Expand Up @@ -51,21 +49,27 @@ public function getLoader()
return $this->loader;
}

protected function threadError()
{
$this->hasError = error_get_last();
}

/**
* Here you are in a threaded environment
*/
public function run()
{
register_shutdown_function([$this, 'threadError']);

$this->getLoader()->register();

$this->result = call_user_func_array($this->callable, $this->args);
$callable = $this->callable;
if (!is_string($callable)) {
$callable = (array) $this->callable;
}

try {
$this->result = call_user_func_array($callable, (array)$this->args);
// Executed only in PHP 7, will not match in PHP 5.x
} catch (\Throwable $ex) {
$this->result = $ex;
// Executed only in PHP 5. Remove when PHP 5.x is no longer necessary.
} catch (\Exception $ex) {
$this->result = $ex;
}
}

/**
Expand All @@ -83,21 +87,16 @@ public function execute()
* Get the thread result
*
* @return mixed
* @throws \RuntimeException
* @throws \Exception
*/
public function getResult()
{
if ($this->hasError && ( $this->hasError['type'] == E_ERROR || $this->hasError['type'] == E_USER_ERROR )) {
throw new \RuntimeException(
sprintf(
'Thread error: "%s", in "%s" at line %d. <<--- ',
$this->hasError['message'],
$this->hasError['file'],
$this->hasError['line']
)
);
$result = $this->result;
if (is_object($result) && (is_subclass_of($result, '\\Error') || is_subclass_of($result, '\\Exception'))) {
throw $result;
}
return $this->result;

return $result;
}

/**
Expand Down
5 changes: 4 additions & 1 deletion src/Thread.php
Expand Up @@ -49,7 +49,10 @@ public function getThreadInstance()
} elseif (function_exists('pcntl_fork')) {
$this->threadInstance = new ForkHandler();
} else {
throw new RuntimeException('PHP need to be compiled with ZTS extension or compiled with the --enable-pcntl. Windows is not supported.');
throw new RuntimeException(
'PHP need to be compiled with ZTS extension or compiled with the --enable-pcntl. ' .
'Windows is not supported.'
);
}

return $this->threadInstance;
Expand Down

0 comments on commit ce5e3e9

Please sign in to comment.