Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent busy-loop when message are larger than available socket-buffer #92

Closed
wants to merge 5 commits into from

Conversation

AndreKlang
Copy link

Fixes #91

Please consider this a proof of concept, I've confirmed that this fixes my issue as described in #91. The example there now works, and the facilities.ts sample also work as expected. I've done a short 15min test-flight, with no issue. (That's unfortunately the extent of my testing so far)

The _onReadable method is a bit long and complicated now in my opinion, so I'd refactor that, but I don't have the time right now. And maybe you'd like to do that yourself any way. So I figured I'd keep this PR simple and to the point.

src/SimConnectSocket.ts Outdated Show resolved Hide resolved
@EvenAR
Copy link
Owner

EvenAR commented Dec 9, 2023

Thank you for looking into this 🙌 Great idea using the this._socket.readableLength.

I'm wondering if we could simplify this class even more. Instead of manually managing a separate buffer, what if we leveraged the internal buffer of the Socket? We could simply check this._socket.readableLength and, if it's insufficient, return to wait for the next _onReadable event. When it's long enough, we could read the data. This might streamline the logic further. What are your thoughts on this approach? 🤔 Like this (I added a check for header length too - just in case):

    private _onReadable() {
        while (!this._readingPaused) {
            if (this._incomingBodyLength === null) {
                if (HEADER_LENGTH > this._socket.readableLength) return; // Wait for more data

                // Read message length header
                const lenBuf = this._socket.read(HEADER_LENGTH);
                this._incomingBodyLength = lenBuf.readInt32LE() - HEADER_LENGTH;
            }

            // If expected body is longer than readable buffer
            if (this._incomingBodyLength > this._socket.readableLength) return; // Wait for more data

            // Read message body
            const body = this._socket.read(this._incomingBodyLength);

            this._pushCompleteMessage(body);

            this._incomingBodyLength = null;
        }
    }

@AndreKlang
Copy link
Author

AndreKlang commented Dec 11, 2023

That is a much cleaner solution, I was under the impression that the buffer wasn't appended until it was cleared. Which seam to be wrong.

I tried your solution, but ran in to some issues..

Your code (slightly modified):

    _incomingBodyLength: number|null = null;

    _onReadable() {
        while (!this._readingPaused) {
            if (this._incomingBodyLength === null) {
                if (HEADER_LENGTH > this._socket.readableLength) return; // Wait for more data

                // Read message length header
                const lenBuf = this._socket.read(HEADER_LENGTH);
                this._incomingBodyLength = lenBuf.readInt32LE() - HEADER_LENGTH;
            }

            // If expected body is longer than readable buffer
            console.log(this._incomingBodyLength, this._socket.readableLength);
            if (this._incomingBodyLength > this._socket.readableLength) return; // Wait for more data
            console.log("-- got a message --")

            // Read message body
            const body = this._socket.read(this._incomingBodyLength);

            this._pushMessage(body);

            this._incomingBodyLength = null;
        }
    }

gives this output:

npx ts-node samples/typescript/facilities-waypoints.ts
304 304
-- got a message --
Connected to sim!
41057 14476
41057 40540

Then the script exists with exit-code 0, no error, no output, no noting..

But ONCE out of ~10, it worked (and script stayed alive).. So I have no idea what's happening now.. (And my lunch is almost over for today)

I'll look into it more when I get a chance, but this is probably the correct path to go.

@EvenAR
Copy link
Owner

EvenAR commented Dec 12, 2023

Hmm, it seems the 'readable' event might stop when the socket's buffer reaches a certain length/“watermark” (Node.js Stream Documentation). Perhaps that it what happens. In that case it might be as simple as raising the watermark so it can buffer any SimConnect message size - the only concern is I don’t know if there’s any documented limit to how big a SimConnect message body can be 🤔

@AndreKlang
Copy link
Author

That looked really promising, finally got a few minutes tonight to try it out, no dice though..

First tried reading it: console.log(this.readableHighWaterMark), turns out the default is 16 (!), then tried to set it super({ objectMode: true, readableHighWaterMark: 1000000 }); (and verify that it got set), to any values between 1 and 1 000 000. Didn't notice any difference at all.

Odd thing though, tonight the "batches" tended to be smaller, ie. the _onReadable was generally (but not always) called more times (regardless of waterMark), like this:

304 304
-- got a message --
current waterMark: 1000000
Connected to sim!
41057 14476
41057 15924
41057 28956

It also tended to work correctly more than last time I tried this, maybe 4/10..

(Sidenote, the version currently in PR has worked without incident for several hours, even with the full list of 40 000 airports)

I'll look into it more when I get a chance!

... Couldn't let it go quite yet,, had a faint memory of NODE_DEBUG.. Not often I need to go that deep..

NODE_DEBUG=cluster,net,http,fs,tls,stream npx ts-node samples/typescript/facilities-waypoints.ts
<snip snip>
NET 33126: afterConnect
NET 33126: _read
NET 33126: Socket._handle.readStart
STREAM 33126: readableAddChunk <Buffer 34 01 00 00 05 00 00 00 02 00 00 00 4b 69 74 74 79 48 61 77 6b 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ... 258 more bytes>
STREAM 33126: emitReadable true false
STREAM 33126: emitReadable false
STREAM 33126: emitReadable_ false 308 false
STREAM 33126: read 4
STREAM 33126: need readable false
STREAM 33126: length less than watermark true
STREAM 33126: do read
NET 33126: _read
304 304
-- got a message --
current waterMark: 16
STREAM 33126: read 304
STREAM 33126: need readable false
STREAM 33126: length less than watermark true
STREAM 33126: reading, ended or constructing false
STREAM 33126: readableAddChunk {
  protocolVersion: 5,
  packetTypeId: 2,
  data: RawBuffer {
    readInt: [Function: readInt32],
    writeInt: [Function: writeInt32],
    readLong: [Function: readInt64],
    writeLong: [Function: writeInt64],
    readFloat: [Function: readFloat32],
    writeFloat: [Function: writeFloat32],
    readDouble: [Function: readFloat64],
    writeDouble: [Function: writeFloat64],
    buffer: ByteBuffer {
      buffer: <Buffer 4b 69 74 74 79 48 61 77 6b 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ... 246 more bytes>,
      offset: 0,
      markedOffset: -1,
      limit: 296,
      littleEndian: true,
      noAssert: false
    }
  }
}
STREAM 33126: flow false
STREAM 33126: maybeReadMore read 0
STREAM 33126: read 0
STREAM 33126: need readable true
STREAM 33126: length less than watermark true
STREAM 33126: do read
Connected to sim!
STREAM 33126: readableAddChunk <Buffer 65 a0 00 00 05 00 00 00 15 00 00 00 2a 00 00 00 54 04 00 00 00 00 00 00 03 00 00 00 56 50 43 54 45 00 4b 37 00 00 00 40 e2 9b 27 3a 40 00 00 60 7b 14 ... 14430 more bytes>
STREAM 33126: emitReadable true false
STREAM 33126: emitReadable false
STREAM 33126: emitReadable_ false 14480 false
STREAM 33126: read 4
STREAM 33126: need readable false
STREAM 33126: length less than watermark true
STREAM 33126: do read
NET 33126: _read
41057 14476
STREAM 33126: flow false
STREAM 33126: readableAddChunk <Buffer fd 53 c0 00 00 00 00 00 00 00 00 33 33 e3 40 50 54 4d 41 4e 00 4b 37 00 00 00 a0 93 d7 9b 3c 40 00 00 a0 5a 1d 4c 54 c0 00 00 00 00 00 00 00 00 66 66 ... 17326 more bytes>
STREAM 33126: emitReadable true false
STREAM 33126: emitReadable false
STREAM 33126: emitReadable_ false 31852 false
41057 31852
STREAM 33126: flow false
-- Dies --

I'll look into it more when I get a chance! (for real this time)

@EvenAR
Copy link
Owner

EvenAR commented Dec 16, 2023

Okay, I finally tried connecting to the sim from an external laptop, and I got the same result as you.

I'm leaning back towards your idea of keeping a local copy of the received data after all. It might be a safer solution than relying on the internal magic happening in the Socket class 😅

I tried to simplify it a bit - does this code work for you?

_onReadable() {
    while (!this._readingPaused) {
        const chunk: Buffer | null = this._socket.read();
        if (chunk === null) break;

        this._dataBuffer = Buffer.concat([this._dataBuffer, chunk]);

        while (this._dataBuffer.length >= HEADER_LENGTH) {
            const totalMessageSize: number = this._dataBuffer.readInt32LE(0);

            if (this._dataBuffer.length >= totalMessageSize) {
                const messageBody: Buffer = this._dataBuffer.slice(
                    HEADER_LENGTH,
                    totalMessageSize
                );

                const simConnectMessage: SimConnectMessage = {
                    protocolVersion: messageBody.readInt32LE(0),
                    packetTypeId: messageBody.readInt32LE(4),
                    data: new RawBuffer(messageBody.slice(8)),
                };

                const pushOk = this.push(simConnectMessage);

                if (!pushOk) {
                    this._readingPaused = true;
                    break; // Pause reading if consumer is slow
                }

                this._dataBuffer = this._dataBuffer.slice(totalMessageSize); // Remove processed message from the buffer
            } else {
                break; // Not enough data for a complete SimConnect message, break out of the loop
            }
        }
    }
}

@AndreKlang
Copy link
Author

Finally got some time to test this! Yes that seem to be working nicely! Haven't had a chance to do any testflights but my application works nicely with it.

I suggest you create a new branch/PR with that, then we'll close this one.

@EvenAR
Copy link
Owner

EvenAR commented Dec 22, 2023

@AndreKlang , thanks for your feedback! I have created #96 which uses that last snippet.

@EvenAR EvenAR closed this Dec 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Busy-loop caused by subscribeToFacilitiesEx1
2 participants