Skip to content

Commit

Permalink
readCallback may be called on EOF
Browse files Browse the repository at this point in the history
Due to a limitation in PHP, the code in readCallback may be called when
dealing with network sockets and EOF is received (ie. the peer disconnected).

To work around this, when fread() is called in readCallback and the stream
is actually unusable due to EOF/disconnection, a special exception is thrown.
This exception is caught by the manager and closeCallback will be called.
  • Loading branch information
fpoirotte committed Aug 22, 2017
1 parent d9d9f79 commit 0751c85
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 20 deletions.
47 changes: 36 additions & 11 deletions src/StreamManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -140,24 +140,49 @@ public function loopOnce()
}

foreach ($r as $name => $stream) {
$cb = feof($stream) ? 'closeCallback' : 'readCallback';
$ctx = stream_context_get_options($streams[$name]);

if (!array_key_exists($cb, $ctx[self::WRAPPER_NAME])) {
throw new \RuntimeException("Invalid $cb");
// If the event supposedly says there is
// incoming data to read, try to do so.
if (!feof($stream)) {
if (!array_key_exists('readCallback', $ctx[self::WRAPPER_NAME])) {
throw new \RuntimeException("Undefined 'readCallback'");
}

if (null !== $ctx[self::WRAPPER_NAME]['readCallback']) {
if (!is_callable($ctx[self::WRAPPER_NAME]['readCallback'])) {
throw new \RuntimeException("Invalid 'readCallback'");
}

try {
call_user_func($ctx[self::WRAPPER_NAME]['readCallback'], $this, $streams[$name], $name);
continue;
} catch (\fpoirotte\StreamManager\EOFException $e) {
// Catch the exception:
// the closeCallback will be called instead
}
}
}

if (null !== $ctx[self::WRAPPER_NAME][$cb]) {
if (!is_callable($ctx[self::WRAPPER_NAME][$cb])) {
throw new \RuntimeException("Invalid $cb");
// If EOF had been signaled or no data could be read
// (meaning that EOF was reached), call 'closeCallback'.
if (!array_key_exists('closeCallback', $ctx[self::WRAPPER_NAME])) {
throw new \RuntimeException("Missing 'closeCallback'");
}

if (null !== $ctx[self::WRAPPER_NAME]['closeCallback']) {
if (!is_callable($ctx[self::WRAPPER_NAME]['closeCallback'])) {
throw new \RuntimeException("Invalid 'closeCallback'");
}
call_user_func($ctx[self::WRAPPER_NAME]['readCallback'], $this, $streams[$name], $name);
} elseif ('closeCallback' === $cb) {
// By default, close the stream upon EOF.

call_user_func($ctx[self::WRAPPER_NAME]['closeCallback'], $this, $streams[$name], $name);
} else {
// The default behaviour is to simply close the stream
fclose($streams[$name]);
unset($this->streams[$name]);
}

// Make sure the stream is not managed anymore
unset($this->streams[$name]);
}

foreach (array_keys($w) as $name) {
Expand Down
7 changes: 7 additions & 0 deletions src/StreamManager/EOFException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

namespace fpoirotte\StreamManager;

class EOFException extends \Exception
{
}
5 changes: 5 additions & 0 deletions src/StreamManager/StreamWrapper.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace fpoirotte\StreamManager;

use fpoirotte\StreamManager;
use fpoirotte\StreamManager\EOFException;

class StreamWrapper implements \Countable
{
Expand Down Expand Up @@ -155,6 +156,10 @@ public function stream_read($count)
return false;
}

if ('' === $data && feof($this->rawStream)) {
throw new EOFException();
}

while (strlen($data) > 0) {
$written = fwrite($this->filteredStream[1], $data);
if (false === $written) {
Expand Down
37 changes: 28 additions & 9 deletions tests/StreamsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

class StreamsTest extends TestCase
{
protected $buffer = array();
protected $receivedData = array();
protected $closedStream = array();

public function testSingleStreamAndSingleFilter()
public function testSimpleScenario()
{
$data = 'Hello world!';

Expand All @@ -27,14 +28,14 @@ public function testSingleStreamAndSingleFilter()
$this->assertSame(str_rot13($data), fread($rawStream, 32));
}

public function inc($manager, $stream, $name)
public function onDataReceived($manager, $stream, $name)
{
$data = fread($stream, 32);

if (!isset($this->buffer[$name])) {
$this->buffer[$name] = array();
if (!isset($this->receivedData[$name])) {
$this->receivedData[$name] = array();
}
$this->buffer[$name][] = $data;
$this->receivedData[$name][] = $data;

$value = (int) $data;
if ($value < 9) {
Expand All @@ -48,11 +49,17 @@ public function inc($manager, $stream, $name)
}

if ($value >= 9) {
fclose($stream);
unset($manager[$name]);
}
}

public function testBidirectionalCommunications()
public function onStreamClosed($manager, $stream, $name)
{
$this->closedStream[] = $name;
}

public function testComplexScenario()
{
if (!in_array('convert.*', stream_get_filters())) {
$this->markTestSkipped('The convert.* stream filters are not available');
Expand All @@ -61,7 +68,18 @@ public function testBidirectionalCommunications()
$manager = new StreamManager();
$sock = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
foreach (array(0, 1) as $i) {
stream_context_set_option($sock[$i], StreamManager::WRAPPER_NAME, 'readCallback', array($this, 'inc'));
stream_context_set_option(
$sock[$i],
StreamManager::WRAPPER_NAME,
'readCallback',
array($this, 'onDataReceived')
);
stream_context_set_option(
$sock[$i],
StreamManager::WRAPPER_NAME,
'closeCallback',
array($this, 'onStreamClosed')
);
stream_set_blocking($sock[$i], false);
}

Expand Down Expand Up @@ -96,6 +114,7 @@ public function testBidirectionalCommunications()
"b2a" => array('0 ', '2 ', '4 ', '6 ', '8 '),
"a2b" => array('1 ', '3 ', '5 ', '7 ', '9 '),
);
$this->assertSame($expected, $this->buffer);
$this->assertSame($expected, $this->receivedData);
$this->assertSame(array('b2a'), $this->closedStream);
}
}

0 comments on commit 0751c85

Please sign in to comment.