Skip to content

Commit

Permalink
Hotfix 4 (#38)
Browse files Browse the repository at this point in the history
* hotfix Nr.4

* Apply fixes from StyleCI

* .

* Apply fixes from StyleCI

---------

Co-authored-by: StyleCI Bot <bot@styleci.io>
  • Loading branch information
ngaspari and StyleCIBot committed May 24, 2024
1 parent 31a0fee commit dc51d10
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 33 deletions.
12 changes: 9 additions & 3 deletions config/stomp.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@
'auto_tries' => env('STOMP_AUTO_TRIES', true),
'auto_backoff' => env('STOMP_AUTO_BACKOFF', true),

/*
* Will failed job be re-queued ?
* We experienced issues with pushing Jobs back to the topic/queue, so we're turning this OFF
*/
'fail_job_requeue' => env('STOMP_FAILED_JOB_REQUEUE', false),

/** If all messages should fail on timeout. Set to false in order to revert to default (looking in event payload) */
'fail_on_timeout' => env('STOMP_FAIL_ON_TIMEOUT', true),
/** Maximum time in seconds for job execution. This value must be less than send heartbeat in order to run correctly. */
'timeout' => env('STOMP_TIMEOUT', 10),
'timeout' => env('STOMP_TIMEOUT', 45),

/**
* Incremental multiplier for failed job redelivery.
Expand Down Expand Up @@ -70,15 +76,15 @@
/**
* Heartbeat which we will be sending to server at given millisecond period.
*/
'send_heartbeat' => env('STOMP_SEND_HEARTBEAT', 20000),
'send_heartbeat' => env('STOMP_SEND_HEARTBEAT', 50000),

/**
* Setting consumer-window-size to a value greater than 0 will allow it to receive messages until
* the cumulative bytes of those messages reaches the configured size.
* Once that happens the client will not receive any more messages until it sends the appropriate ACK or NACK
* frame for the messages it already has.
*/
'consumer_window_size' => env('STOMP_CONSUMER_WIN_SIZE', 819200),
'consumer_window_size' => env('STOMP_CONSUMER_WIN_SIZE', 8192000),

/**
* Subscribe mode: auto, client.
Expand Down
18 changes: 5 additions & 13 deletions src/Queue/Jobs/StompJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use Illuminate\Queue\Jobs\JobName;
use Illuminate\Support\Arr;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Str;
use Psr\Log\LoggerInterface;
use Stomp\Transport\Frame;
Expand Down Expand Up @@ -108,7 +107,6 @@ public function fire()
{
$this->log->info("$this->session [STOMP] Executing event...");
$this->isNativeLaravelJob() ? $this->fireLaravelJob() : $this->fireExternalJob();
$this->ackIfNecessary();
}

protected function isNativeLaravelJob(): bool
Expand Down Expand Up @@ -202,9 +200,10 @@ public function release($delay = 0)
{
parent::release($delay);

$payload = $this->createStompPayload($delay);

$this->stompQueue->pushRaw($payload, $this->queue, []);
if (Config::get('fail_job_requeue')) {
$payload = $this->createStompPayload($delay);
$this->stompQueue->pushRaw($payload, $this->queue, []);
}
}

protected function createStompPayload(int $delay): Message
Expand Down Expand Up @@ -245,8 +244,6 @@ protected function getBackoff(int $attempts): int
*/
protected function failed($e)
{
$this->ackIfNecessary();

// External events don't have failed method to call.
if (!$this->payload || !$this->isNativeLaravelJob()) {
return;
Expand All @@ -259,12 +256,7 @@ protected function failed($e)
$this->instance->failed($this->payload['data'], $e, $this->payload['uuid']);
}
} catch (\Exception $e) {
Log::error('Exception in job failing: ' . $e->getMessage());
$this->log->error('Exception in job failing: ' . $e->getMessage());
}
}

protected function ackIfNecessary()
{
$this->stompQueue->ackLastFrameIfNecessary();
}
}
33 changes: 18 additions & 15 deletions src/Queue/StompQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class StompQueue extends Queue implements QueueInterface
protected array $subscribedTo = [];

protected LoggerInterface $log;
protected static int $circuitBreaker = 0;
protected int $circuitBreaker = 0;
protected string $session;

protected $_lastFrame = null;
Expand Down Expand Up @@ -238,7 +238,7 @@ protected function writeToMultipleQueues(array $writeQueues, Message $payload):
return $allEventsSent;
}

protected function write($queue, Message $payload): bool
protected function write($queue, Message $payload, $tryAgain = true): bool
{
// This will write all the events received in a single batch, then send disconnect frame
try {
Expand All @@ -247,13 +247,17 @@ protected function write($queue, Message $payload): bool
$this->log->info("$this->session [STOMP] Message sent successfully? " . ($sent ? 't' : 'f'));

return $sent;
} catch (Exception) {
$this->log->error("$this->session [STOMP] PUSH failed. Reconnecting...");
} catch (Exception $e) {
$this->log->error("$this->session [STOMP] PUSH failed. Reconnecting... " . $e->getMessage());
$this->reconnect(false);

$this->log->info("$this->session [STOMP] Trying to send again...");
if ($tryAgain) {
$this->log->info("$this->session [STOMP] Trying to send again...");

return $this->write($queue, $payload, false);
}

return $this->write($queue, $payload);
return false;
}
}

Expand Down Expand Up @@ -453,25 +457,25 @@ protected function reconnect(bool $subscribe = true)

try {
$this->client->getClient()->connect();

$this->log->info("$this->session [STOMP] Reconnected successfully.");
} catch (Exception $e) {
self::$circuitBreaker++;
$cb = self::$circuitBreaker;
$this->circuitBreaker++;

$this->log->error("$this->session [STOMP] Failed reconnecting (tries: $cb), retrying in 2s..." . print_r($e->getMessage(), true));
$this->log->error("$this->session [STOMP] Failed reconnecting (tries: {$this->circuitBreaker}),
retrying..." . print_r($e->getMessage(), true));

if (self::$circuitBreaker <= 5) {
if ($this->circuitBreaker <= 5) {
usleep(100);
$this->reconnect($subscribe);
}

$this->log->error("$this->session [STOMP] Circuit breaker executed after $cb tries, exiting.");
$this->log->error("$this->session [STOMP] Circuit breaker executed after {$this->circuitBreaker} tries, exiting.");

return;
}

// By this point it should be connected, so it is safe to subscribe
if ($subscribe) {
if ($this->client->getClient()->isConnected() && $subscribe) {
$this->log->info("$this->session [STOMP] Connected, subscribing...");
$this->subscribedTo = [];
$this->subscribeToQueues();
Expand All @@ -486,7 +490,6 @@ public function disconnect()

try {
$this->ackLastFrameIfNecessary();

$this->log->info("$this->session [STOMP] Disconnecting...");
$this->client->getClient()->disconnect();
} catch (Exception $e) {
Expand All @@ -503,7 +506,7 @@ protected function subscribeToQueues(): void
continue;
}

$winSize = Config::get('consumer_window_size') ?? 819200;
$winSize = Config::get('consumer_window_size') ?: 8192000;
if ($this->_ackMode != self::ACK_MODE_CLIENT) {
$winSize = -1;
}
Expand Down
2 changes: 0 additions & 2 deletions src/StompServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class StompServiceProvider extends ServiceProvider
public function register()
{
$this->mergeConfigFrom(__DIR__ . '/../config/asseco-stomp.php', 'asseco-stomp');

$this->mergeConfigFrom(__DIR__ . '/../config/stomp.php', 'queue.connections.stomp');
}

Expand All @@ -35,7 +34,6 @@ public function boot()
app()->singleton(Config::class);
app()->singleton(ConnectionWrapper::class);
app()->singleton(ClientWrapper::class);

app()->singleton(StompQueue::class);

/** @var QueueManager $queue */
Expand Down

0 comments on commit dc51d10

Please sign in to comment.