From d088f900c15b683c795520322ca10203108f2077 Mon Sep 17 00:00:00 2001 From: Matt Bonneau Date: Mon, 27 Oct 2025 14:49:45 -0400 Subject: [PATCH] Fix keepalive from hanging the connection when input completes. If input completes prior to close frame, emit error. --- src/MessageSubject.php | 11 +++++++++-- test/MessageSubjectTest.php | 35 ++++++++++++++++++++++++++++++++++- 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/src/MessageSubject.php b/src/MessageSubject.php index ba73191..9c1f263 100644 --- a/src/MessageSubject.php +++ b/src/MessageSubject.php @@ -14,6 +14,7 @@ use Rx\Disposable\CompositeDisposable; use Rx\DisposableInterface; use Rx\Exception\TimeoutException; +use Rx\Notification\OnNextNotification; use Rx\Observable; use Rx\ObserverInterface; use Rx\Subject\Subject; @@ -100,7 +101,12 @@ function (FrameInterface $frame) { }); }) ->switch() - ->flatMapTo(Observable::never()); + ->flatMapTo(Observable::never()) + // This detects close or error notifications from the raw data input and stops the keepalive + ->takeUntil($this->rawDataIn + ->materialize() + ->filter(function($notification) { return ! $notification instanceof OnNextNotification; }) + ->take(1)); } $this->rawDataDisp = $this->rawDataIn @@ -108,7 +114,8 @@ function (FrameInterface $frame) { ->subscribe( [$messageBuffer, 'onData'], function (\Throwable $e) { parent::onError($e); }, - function () { parent::onCompleted(); } + // onCompleted needs to send an error. If a close frame comes in, this should be disposed already + function () { parent::onError(new WebsocketErrorException(Frame::CLOSE_ABNORMAL)); } ); $this->subProtocol = $subProtocol; diff --git a/test/MessageSubjectTest.php b/test/MessageSubjectTest.php index 1076c7d..1967f3f 100644 --- a/test/MessageSubjectTest.php +++ b/test/MessageSubjectTest.php @@ -183,4 +183,37 @@ public function testDisposeOnMessageSubjectClosesConnection() onCompleted(300) ], $dataOut->getMessages()); } -} \ No newline at end of file + + public function testMessageSubjectErrorsIfDataInStreamEndsClosesOrErrors() { + $dataIn = $this->createHotObservable([ + onNext(201, (new Frame('', true, Frame::OP_TEXT))->getContents()), + onCompleted(205) + ]); + + $dataOut = new MockObserver($this->scheduler); + + $ms = new MessageSubject( + $dataIn, + $dataOut, + true, + false, + '', + new Request('GET', '/ws'), + new Response(), + 300 + ); + + $result = $this->scheduler->startWithDispose(function () use ($ms) { + return $ms; + }, 500); + + $this->assertMessages([ + onNext(201, ''), + onError(205, new WebsocketErrorException(Frame::CLOSE_ABNORMAL)) + ], $result->getMessages()); + + $this->assertSubscriptions([ + subscribe(0,205) + ], $dataIn->getSubscriptions()); + } +}