Skip to content

Commit

Permalink
Added return value to TrackedNatsRequest::wait()
Browse files Browse the repository at this point in the history
  • Loading branch information
Donal Byrne committed Nov 23, 2017
1 parent 55bd37f commit 0534485
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 17 deletions.
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ $c->connect();
$r = $c->publish('special.subject', 'some serialized payload...');

// optionally wait for the ack
$r->wait();
$gotAck = $r->wait();
if (!$gotAck) {
...
}

$c->close();

Expand All @@ -69,7 +72,10 @@ If publishing many messages at a time, you might at first do this:
```php
foreach ($req as $data){
$r = $c->publish(...);
$r->wait();
$gotAck = $r->wait();
if (!$gotAck) {
...
}
}
```

Expand Down
10 changes: 8 additions & 2 deletions src/NatsStreaming/TrackedNatsRequest.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ private function dispatchCachedMessages()
if ($cachedMsgs) {
foreach ($cachedMsgs as $msg) {
$cb($msg);
$this->consumed = true;
}
// should only get 1 so get out
return true;
Expand All @@ -79,16 +80,19 @@ private function dispatchCachedMessages()
return false;
}

public function gotAck() {
return $this->consumed > 0;
}

public function wait()
{

if ($this->consumed) {
return;
return $this->gotAck();
}

if ($this->dispatchCachedMessages()) {
return;
return $this->gotAck();
} else {
$this->waiting = true;

Expand All @@ -102,8 +106,10 @@ public function wait()
}

$this->waiting = false;
return $this->gotAck();
}


/**
* @return mixed
*/
Expand Down
5 changes: 4 additions & 1 deletion tests/Integration/Commands/Producer
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ $subOpts = new \NatsStreaming\SubscriptionOptions();
$count = 0;
for(; $count < $numMessages; $count++) {
$ack = $con->publish($subject, 'foobar '.$count);
$ack->wait();
$gotAck = $ack->wait();
echo "count : $count\n";
if (!$gotAck) {
error_log("ERROR: failed to get ACK");
}
}


Expand Down
2 changes: 1 addition & 1 deletion tests/Integration/MultipleConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class MultipleConsumerTest extends PHPUnit_Framework_TestCase

public function setUp()
{
parent::setUp(); // TODO: Change the autogenerated stub
parent::setUp();
$this->phpCmd = str_replace("\n", '', `which php`);
$this->logPrefix = $this->getName();
}
Expand Down
33 changes: 22 additions & 11 deletions tests/Unit/ConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public function testPublish()
{
$this->c->reconnect();
$r = $this->c->publish('foo', 'bar');
$r->wait();
$ack = $r->wait();
$this->assertTrue($ack);
$count = $this->c->pubsCount();
$this->assertInternalType('int', $count);
$this->assertGreaterThan(0, $count);
Expand Down Expand Up @@ -121,7 +122,8 @@ public function testSubscribe()
}

foreach ($rs as $r) {
$r->wait();
$gotAck = $r->wait();
$this->assertTrue($gotAck);
}


Expand Down Expand Up @@ -162,7 +164,8 @@ public function testGlobalWait()
}

foreach ($rs as $r) {
$r->wait();
$gotAck = $r->wait();
$this->assertTrue($gotAck);
}


Expand Down Expand Up @@ -230,7 +233,8 @@ public function testMultipleSubscriptions()
}

foreach ($rs as $r) {
$r->wait(1);
$gotAck = $r->wait(1);
$this->assertTrue($gotAck);
}


Expand Down Expand Up @@ -279,7 +283,8 @@ public function testDurableSubscription()
}

foreach ($rs as $r) {
$r->wait();
$gotAck = $r->wait();
$this->assertTrue($gotAck);
}

$sub->wait($toSend);
Expand All @@ -290,7 +295,8 @@ public function testDurableSubscription()

$this->c->connect();
$r = $this->c->publish($subject, 'foobarnew');
$r->wait();
$gotAck = $r->wait();
$this->assertTrue($gotAck);

$subOptions = new \NatsStreaming\SubscriptionOptions();

Expand Down Expand Up @@ -367,7 +373,8 @@ public function testMultipleDurableSubscription()

// quicker
foreach ($rs as $r) {
$r->wait();
$gotAck = $r->wait();
$this->assertTrue($gotAck);
}

$sub1->wait($toSend);
Expand All @@ -381,7 +388,8 @@ public function testMultipleDurableSubscription()
$this->c->connect();

$r = $this->c->publish($subject, 'foobarnew');
$r->wait();
$gotAck = $r->wait();
$this->assertTrue($gotAck);


$subOptions = new \NatsStreaming\SubscriptionOptions();
Expand Down Expand Up @@ -452,7 +460,8 @@ public function testQueueGroupSubscribe()
}

foreach ($rs as $r) {
$r->wait();
$gotAck = $r->wait();
$this->assertTrue($gotAck);
}

$sub->wait($toSend);
Expand Down Expand Up @@ -487,7 +496,8 @@ public function testUnsubscribe($close = false)
}, $subOptions);

$r = $this->c->publish($subject, 'foobar');
$r->wait();
$gotAck = $r->wait();
$this->assertTrue($gotAck);

$sub->wait(1);

Expand All @@ -501,7 +511,8 @@ public function testUnsubscribe($close = false)

$this->c->natsCon->setStreamTimeout(5);
$r = $this->c->publish($subject, 'foobar');
$r->wait();
$gotAck = $r->wait();
$this->assertTrue($gotAck);


$this->assertEquals(1, $got);
Expand Down

0 comments on commit 0534485

Please sign in to comment.