Skip to content

Commit

Permalink
Merge pull request #13 from Superbalist/rest-subscriber-issue
Browse files Browse the repository at this point in the history
pull return immediately, with sleep
  • Loading branch information
geyer-za committed Mar 19, 2019
2 parents 47b8be5 + 70f32ba commit d15d1f8
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 2 deletions.
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 5.2.0 - 2019-03-19

* Add ability to use returnImmediately flag when pulling messages

## 5.1.0 - 2019-02-28

* Bump up google/cloud requirement to ^0.95.0
Expand Down
57 changes: 55 additions & 2 deletions src/GoogleCloudPubSubAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ class GoogleCloudPubSubAdapter implements PubSubAdapterInterface
*/
protected $maxMessages;

/**
* @var bool
*/
protected $returnImmediately;

/**
* @var int
*/
protected $returnImmediatelyPause;

/**
* @param PubSubClient $client
* @param string $clientIdentifier
Expand All @@ -54,14 +64,18 @@ public function __construct(
$autoCreateTopics = true,
$autoCreateSubscriptions = true,
$backgroundBatching = false,
$maxMessages = 1000
$maxMessages = 1000,
$returnImmediately = false,
$returnImmediatelyPause = 500000
) {
$this->client = $client;
$this->clientIdentifier = $clientIdentifier;
$this->autoCreateTopics = $autoCreateTopics;
$this->autoCreateSubscriptions = $autoCreateSubscriptions;
$this->backgroundBatching = $backgroundBatching;
$this->maxMessages = $maxMessages;
$this->returnImmediately = $returnImmediately;
$this->returnImmediatelyPause = (int) $returnImmediatelyPause;
}

/**
Expand Down Expand Up @@ -140,6 +154,40 @@ public function areSubscriptionsAutoCreated()
return $this->autoCreateSubscriptions;
}

/**
* Set if a pull should return immediately if there are no messages
* @param bool $returnImmediately
*/
public function setReturnImmediately($returnImmediately) {
$this->returnImmediately = $returnImmediately;
}

/**
* Return the return immediately configuration
* @return bool
*/
public function getReturnImmediately() {
return $this->returnImmediately;
}

/**
* Set the amount of time to pause between attempts to pull messages if return immediately is enabled.
* Value is in microseconds
*
* @param int $returnImmediatelyPause
*/
public function setReturnImmediatelyPause($returnImmediatelyPause) {
$this->returnImmediatelyPause = (int) $returnImmediatelyPause;
}

/**
* Return the return immediately pause configuration
* @return int
*/
public function getReturnImmediatelyPause() {
return $this->returnImmediatelyPause;
}

/**
* Set whether or not background batching is enabled.
*
Expand Down Expand Up @@ -191,15 +239,20 @@ public function subscribe($channel, callable $handler)
$subscription = $this->getSubscriptionForChannel($channel);

$isSubscriptionLoopActive = true;
$isPauseEnabled = $this->returnImmediately && ($this->returnImmediatelyPause > 0);

while ($isSubscriptionLoopActive) {
$messages = $subscription->pull([
'grpcOptions' => [
'timeoutMillis' => null,
],
'maxMessages' => $this->maxMessages,
'returnImmediately' => $this->returnImmediately,
]);

if ($isPauseEnabled && empty($messages)) {
usleep($this->returnImmediatelyPause);
continue;
}
foreach ($messages as $message) {
/** @var Message $message */
$payload = Utils::unserializeMessagePayload($message->data());
Expand Down
103 changes: 103 additions & 0 deletions tests/GoogleCloudPubSubAdapterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,26 @@ public function testGetSetBackgroundBatching()
$this->assertTrue($adapter->isBackgroundBatchingEnabled());
}

public function testGetSetReturnImmediately()
{
$client = Mockery::mock(PubSubClient::class);
$adapter = new GoogleCloudPubSubAdapter($client);
$this->assertFalse($adapter->getReturnImmediately());

$adapter->setReturnImmediately(true);
$this->assertTrue($adapter->getReturnImmediately());
}

public function testGetSetReturnImmediatelyPause()
{
$client = Mockery::mock(PubSubClient::class);
$adapter = new GoogleCloudPubSubAdapter($client);
$this->assertEquals(500000, $adapter->getReturnImmediatelyPause());

$adapter->setReturnImmediatelyPause(1000000);
$this->assertEquals(1000000, $adapter->getReturnImmediatelyPause());
}

public function testPublishWhenTopicMustBeCreated()
{
$topic = Mockery::mock(Topic::class);
Expand Down Expand Up @@ -320,6 +340,7 @@ public function testSubscribeWhenSubscriptionMustBeCreated()
'timeoutMillis' => null,
],
'maxMessages' => 1000,
'returnImmediately' => false
])
->once()
->andReturn($messageBatch1);
Expand All @@ -336,6 +357,7 @@ public function testSubscribeWhenSubscriptionMustBeCreated()
'timeoutMillis' => null,
],
'maxMessages' => 1000,
'returnImmediately' => false
])
->once()
->andReturn($messageBatch2);
Expand Down Expand Up @@ -397,6 +419,7 @@ public function testSubscribeWhenSubscriptionExists()
'timeoutMillis' => null,
],
'maxMessages' => 1000,
'returnImmediately' => false
])
->once()
->andReturn($messageBatch1);
Expand All @@ -412,6 +435,7 @@ public function testSubscribeWhenSubscriptionExists()
'timeoutMillis' => null,
],
'maxMessages' => 1000,
'returnImmediately' => false
])
->once()
->andReturn($messageBatch2);
Expand Down Expand Up @@ -471,6 +495,7 @@ public function testSubscribeWhenAutoTopicCreationIsDisabled()
'timeoutMillis' => null,
],
'maxMessages' => 1000,
'returnImmediately' => false
])
->once()
->andReturn($messageBatch1);
Expand All @@ -486,6 +511,7 @@ public function testSubscribeWhenAutoTopicCreationIsDisabled()
'timeoutMillis' => null,
],
'maxMessages' => 1000,
'returnImmediately' => false
])
->once()
->andReturn($messageBatch2);
Expand Down Expand Up @@ -521,4 +547,81 @@ public function testSubscribeWhenAutoTopicCreationIsDisabled()

$adapter->subscribe('channel_name', [$handler1, 'handle']);
}

public function testSubscribeWhenReturnImmediatelyIsEnabled()
{
$message1 = new Message(['data' => '{"hello":"world"}'], ['ackId' => 1]);
$message2 = new Message(['data' => '"this is a string"'], ['ackId' => 2]);
$message3 = new Message(['data' => '"unsubscribe"'], ['ackId' => 3]);

$messageBatch1 = [
$message1,
$message2,
];

$messageBatch2 = [
$message3,
];

$subscription = Mockery::mock(Subscription::class);
$subscription->shouldReceive('exists')
->once()
->andReturn(true);
$subscription->shouldNotHaveReceived('create');

$expectedPullOptions = [
'grpcOptions' => [
'timeoutMillis' => null,
],
'maxMessages' => 1000,
'returnImmediately' => true
];

$subscription->shouldReceive('pull')
->with($expectedPullOptions)
->once()
->andReturn($messageBatch1);
$subscription->shouldReceive('acknowledge')
->with($message1)
->once();
$subscription->shouldReceive('acknowledge')
->with($message2)
->once();

$subscription->shouldReceive('pull')
->with($expectedPullOptions)
->once()
->andReturn($messageBatch2);
$subscription->shouldReceive('acknowledge')
->with($message3)
->once();

$topic = Mockery::mock(Topic::class);
$topic->shouldReceive('exists')
->once()
->andReturn(true);
$topic->shouldNotHaveReceived('create');
$topic->shouldReceive('subscription')
->with('default.channel_name')
->once()
->andReturn($subscription);

$client = Mockery::mock(PubSubClient::class);
$client->shouldReceive('topic')
->with('channel_name')
->once()
->andReturn($topic);

$handler1 = Mockery::mock(\stdClass::class);
$handler1->shouldReceive('handle')
->with(['hello' => 'world'])
->once();
$handler1->shouldReceive('handle')
->with('this is a string')
->once();

$adapter = new GoogleCloudPubSubAdapter($client);
$adapter->setReturnImmediately(true);
$adapter->subscribe('channel_name', [$handler1, 'handle']);
}
}

0 comments on commit d15d1f8

Please sign in to comment.