Skip to content

Commit

Permalink
Ran cs
Browse files Browse the repository at this point in the history
  • Loading branch information
Donal Byrne committed Nov 22, 2017
1 parent d51f6a6 commit ad7e076
Show file tree
Hide file tree
Showing 17 changed files with 109 additions and 117 deletions.
11 changes: 6 additions & 5 deletions src/NatsStreaming/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ public function connect($timeout = null)
$this->timeout = $timeout;
$hbInbox = NatsHelper::newInboxSubject();

$this->natsCon->subscribe($hbInbox, function($message) { $this->processHeartbeat($message);});
$this->natsCon->subscribe($hbInbox, function ($message) {
$this->processHeartbeat($message);
});

$discoverPrefix = $this->options->getDiscoverPrefix() ? $this->options->getDiscoverPrefix() : self::DEFAULT_DISCOVER_PREFIX;

Expand Down Expand Up @@ -219,7 +221,7 @@ public function publish($subject, $data)
$ackSubject = NatsHelper::newInboxSubject(self::DEFAULT_ACK_PREFIX . '.');


$natsReq = new TrackedNatsRequest($this, $natsSubject, $bytes, function($message){
$natsReq = new TrackedNatsRequest($this, $natsSubject, $bytes, function ($message) {
/**
* @var $message Message
*/
Expand Down Expand Up @@ -402,7 +404,8 @@ public function close()
/**
* Wait until timeout. Dispatches any pending messages first.
*/
public function wait() {
public function wait()
{
$this->waiting = true;
foreach ($this->subs as $sub) {
/**
Expand Down Expand Up @@ -453,6 +456,4 @@ public function __destruct()
$this->close();
}
}


}
4 changes: 2 additions & 2 deletions src/NatsStreaming/Exceptions/ConnectException.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ class ConnectException extends Exception
*/
public function __construct($message = "error while connecting", $code = 0, Throwable $previous = null)
{
return parent::__construct($message,$code, $previous);
return parent::__construct($message, $code, $previous);
}
}
}
5 changes: 2 additions & 3 deletions src/NatsStreaming/Exceptions/DisconnectException.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ class DisconnectException extends Exception
*/
public function __construct($message = "error while disconnecting", $code = 0, Throwable $previous = null)
{
return parent::__construct($message,$code, $previous);
return parent::__construct($message, $code, $previous);
}

}
}
5 changes: 2 additions & 3 deletions src/NatsStreaming/Exceptions/PublishException.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ class PublishException extends Exception
*/
public function __construct($message = "error while publishing", $code = 0, Throwable $previous = null)
{
return parent::__construct($message,$code, $previous);
return parent::__construct($message, $code, $previous);
}

}
}
5 changes: 2 additions & 3 deletions src/NatsStreaming/Exceptions/SubscribeException.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ class SubscribeException extends Exception
*/
public function __construct($message = "error while subscribing", $code = 0, Throwable $previous = null)
{
return parent::__construct($message,$code, $previous);
return parent::__construct($message, $code, $previous);
}

}
}
4 changes: 2 additions & 2 deletions src/NatsStreaming/Exceptions/TimeoutException.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ class TimeoutException extends Exception
*/
public function __construct($message = "timeout occured waiting for response", $code = 0, Throwable $previous = null)
{
return parent::__construct($message,$code, $previous);
return parent::__construct($message, $code, $previous);
}
}
}
5 changes: 2 additions & 3 deletions src/NatsStreaming/Exceptions/UnsubscribeException.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ class UnsubscribeException extends Exception
*/
public function __construct($message = "error while unsubscribing", $code = 0, Throwable $previous = null)
{
return parent::__construct($message,$code, $previous);
return parent::__construct($message, $code, $previous);
}

}
}
11 changes: 5 additions & 6 deletions src/NatsStreaming/Helpers/NatsHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

namespace NatsStreaming\Helpers;


use Nats\Connection;

class NatsHelper
Expand All @@ -13,7 +12,8 @@ class NatsHelper
* @param string $prefix
* @return string
*/
public static function newInboxSubject($prefix = '_INBOX.') {
public static function newInboxSubject($prefix = '_INBOX.')
{
return uniqid($prefix);
}

Expand All @@ -22,7 +22,8 @@ public static function newInboxSubject($prefix = '_INBOX.') {
* @param $natsCon Connection
* @return bool
*/
public static function socketInGoodHealth($natsCon){
public static function socketInGoodHealth($natsCon)
{

$streamSocket = $natsCon->getStreamSocket();
if (!$streamSocket) {
Expand All @@ -31,7 +32,5 @@ public static function socketInGoodHealth($natsCon){
$info = stream_get_meta_data($streamSocket);
$ok = is_resource($streamSocket) === true && feof($streamSocket) === false && empty($info['timed_out']) === true;
return $ok;

}

}
}
7 changes: 3 additions & 4 deletions src/NatsStreaming/Helpers/TimeHelpers.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

namespace NatsStreaming\Helpers;


class TimeHelpers
{

Expand All @@ -12,11 +11,11 @@ class TimeHelpers
* could use `date +%S` instead, need to benchmark
* @return int
*/
public static function unixTimeNanos(){
public static function unixTimeNanos()
{
list ($micro, $secs) = explode(" ", microtime());
$nanosOffset = $micro * 1000000000;
$totalNanos = $secs * 1000000000 + $nanosOffset;
return (int) $totalNanos;
}

}
}
11 changes: 5 additions & 6 deletions src/NatsStreaming/MessageCache.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

namespace NatsStreaming;


class MessageCache
{


private static $msgsBySidMap = null;

public static function popMessages($sid, $numMessages = 0){
public static function popMessages($sid, $numMessages = 0)
{
if (!isset(self::$msgsBySidMap[$sid])) {
return [];
}
Expand All @@ -25,13 +25,12 @@ public static function popMessages($sid, $numMessages = 0){
}


public static function pushMessage($sid, $msg) {
public static function pushMessage($sid, $msg)
{
if (!is_array(self::$msgsBySidMap)) {
self::$msgsBySidMap = [];
}

self::$msgsBySidMap[$sid][] = $msg;
}


}
}
4 changes: 1 addition & 3 deletions src/NatsStreaming/Msg.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

namespace NatsStreaming;


use NatsStreamingProtos\Ack;
use NatsStreamingProtos\MsgProto;
use Protobuf\Configuration;
Expand Down Expand Up @@ -57,5 +56,4 @@ public function ack()
$stanCon = $this->sub->getStanCon();
$stanCon->natsCon()->publish($this->sub->getAckInbox(), $data);
}

}
}
31 changes: 13 additions & 18 deletions src/NatsStreaming/Subscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public function __construct($subject, $qGroup, $inbox, $opts, $msgCb, $stanCon)
$this->qGroup = $qGroup;
$this->inbox = $inbox;
$this->opts = $opts;
$this->cb = function($message)use($msgCb){
$this->cb = function ($message) use ($msgCb) {
$this->processedMessages ++;
$msgCb($message);
};
Expand All @@ -96,7 +96,6 @@ public function __construct($subject, $qGroup, $inbox, $opts, $msgCb, $stanCon)
$subRequest->setStartSequence($this->opts->getStartSequence());
break;
case StartPosition::TimeDeltaStart_VALUE:

$nowNano = TimeHelpers::unixTimeNanos();
$subRequest->setStartTimeDelta($nowNano - $this->opts->getStartMicroTime() * 1000);
break;
Expand All @@ -114,7 +113,6 @@ public function __construct($subject, $qGroup, $inbox, $opts, $msgCb, $stanCon)
});

$natsReq->wait();

} catch (\Exception $e) {
$this->stanCon->natsCon()->unsubscribe($this->getSid());
throw $e;
Expand All @@ -132,7 +130,6 @@ public function __construct($subject, $qGroup, $inbox, $opts, $msgCb, $stanCon)


$this->setAckInbox($resp->getAckInbox());

}


Expand Down Expand Up @@ -232,7 +229,8 @@ public function getStanCon()
* the server. Restarting a durable with the same name will not resume
* the subscription, it will be considered a new one.
*/
public function unsubscribe(){
public function unsubscribe()
{

$this->closeOrUnsubscribe(false);
}
Expand All @@ -243,10 +241,10 @@ public function unsubscribe(){
* for which this feature is not available, Close() will return a ErrNoServerSupport
* error.
*/
public function close() {
public function close()
{

$this->closeOrUnsubscribe(true);

}

/**
Expand All @@ -255,7 +253,8 @@ public function close() {
* @throws TimeoutException
* @throws UnsubscribeException
*/
private function closeOrUnsubscribe($doClose) {
private function closeOrUnsubscribe($doClose)
{

$this->stanCon->natsCon()->unsubscribe($this->sid);

Expand All @@ -279,7 +278,7 @@ private function closeOrUnsubscribe($doClose) {
*/
$resp = null;

$natsReq = new TrackedNatsRequest($this->stanCon, $reqSubject, $req->toStream()->getContents(), function($message) use (&$resp) {
$natsReq = new TrackedNatsRequest($this->stanCon, $reqSubject, $req->toStream()->getContents(), function ($message) use (&$resp) {
/**
* @var $message Message
*/
Expand Down Expand Up @@ -326,18 +325,18 @@ private function processMsg($rawMessage)
}
}

public function dispatchCachedMessages($messages = 0) {
public function dispatchCachedMessages($messages = 0)
{

$cachedMsgs = MessageCache::popMessages($this->getSid(), $messages);
$msgsDone = 0;

if ($cachedMsgs){
if ($cachedMsgs) {
$cb = $this->getCb();
foreach($cachedMsgs as $msg) {
foreach ($cachedMsgs as $msg) {
$cb($msg);
$msgsDone ++;
}

}

return $msgsDone;
Expand All @@ -353,23 +352,19 @@ public function wait($messages = 1)
$this->active = true;

if ($msgsDone < $messages) {

$messagesLeft = $messages - $msgsDone;

$quota = $this->processedMessages + $messagesLeft;

while(NatsHelper::socketInGoodHealth($this->stanCon->natsCon()) && $this->active) {

while (NatsHelper::socketInGoodHealth($this->stanCon->natsCon()) && $this->active) {
$this->stanCon->natsCon()->wait(1);

if ($this->processedMessages >= $quota) {
break;
}
}

}

$this->active = false;
}

}

0 comments on commit ad7e076

Please sign in to comment.