Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reading a stream returns 0 bytes when stream continuously hasBytesAvailable #1

Closed
leisurehound opened this issue Sep 1, 2021 · 9 comments

Comments

@leisurehound
Copy link

I'm in the process of trying to (fork &) port https://github.com/arnecs/MQTTKit to Linux with Swift 5.4 on arm64 (Raspberry Pi). It uses Stream.getStreamsToHost(withName:port:inputStream:OutputStream) that I've translated into Stream.CC_getStreamPair(to:) { result in } and this works great on macOS, full MQTT functionality of the library is working on macOS. However, when I run it on Linux, the input stream has bytes available but the read always returns 0 bytes and the buffer is never populated with data.

The offending call is at https://github.com/arnecs/MQTTKit/blob/ec894640f1ea27f758b3f4d22722d83159773cd0/Sources/MQTTSession.swift#L248
Any idea on how to get this to work on Linux? Yes I know NIO might be more appropriate for linux, but for other reasons I can't use NIO.

TIA

@mickeyl
Copy link
Contributor

mickeyl commented Sep 1, 2021

Thanks for opening this issue. I'm not familiar with MQTT at all. Could you perhaps upload your current state of MQTTKit somewhere plus a short recipe how I can reproduce this bug on Linux? I'd like to have a look then.

@leisurehound
Copy link
Author

leisurehound commented Sep 1, 2021

Thanks @mickeyl Here is the fork I'm working with and here are my diffs. The material change is in MQTTSession.swift line 187

https://github.com/leisurehound/MQTTKit

Diff

To exercise the library add this to a main.swift:

import Foundation
import MQTTKit


var options = MQTTOptions(host: "029a34ae9dc449f89afd6e9deee13530.s1.eu.hivemq.cloud", port: 8883)

options.username = "TestUser"
options.password = "" //I'll PM this you through slack
options.useTLS = true

let client = MQTTSession(options: options)

client.didConnect = { connected in
  print("didConnect was called \(connected)")
}

client.didRecieveConack = { connect in
  print("didReceiveConnack was called")
}


print("About to connect")
client.connect { a in
  print("connect completion was called \(a) ")
}

RunLoop.current.run()

and the following to your Package.swift

.package(url: "https://github.com/leisurehound/MQTTKit.git", .branch("master")),

and add it as a dependency to the target.

On macOS, the read in MQTTSession.swfit line 187 works fine (tho why the original author is reading 1 byte at a time I'm not sure). The read gets bytes, a packet is built and the connection delegates get called in main.swift.

On arm64 with swift 5.4.1-RELEASE hasAvailableBytes is set but reading always returns zero bytes so just loops with bytes available but never reading them. Thus the packet never gets assembled and delegates are never called.

DM me if that is more convenient, or on the swift-arm slack channel.

TIA

@mickeyl
Copy link
Contributor

mickeyl commented Sep 2, 2021

Thanks for your explanation. I could reproduce this problem. I also have a theory what could be the problem:

In https://github.com/Cornucopia-Swift/CornucopiaStreams/blob/master/Sources/CornucopiaStreams/FileHandleStreams/FileHandleInputStream.swift, we can see the following code:

override var hasBytesAvailable: Bool { true }

which unconditionally signalizes true to anyone who is actively polling for whether bytes are available.

Now most of the time, this is not a problem, since most stream delegates do not proactively check for hasBytesAvailable, but rather wait until the StreamEvent HasBytesAvailable comes in.

Obviously the MQTT library works differently. It seems it doesn't wait for the stream event to come in, but tries to read beforehand.

This might be fixable in the MQTT library, but then again I wonder whether it might be beneficial to rewrite hasBytesAvailable to check with the underlying socket (i.e. by calling select with a very short timeout).

Or – even more simple… just return false until we actually send out that event in

self._delegate?.stream(self, handle: .hasBytesAvailable)

@leisurehound
Copy link
Author

Thanks for your investigation @mickeyl, this is helpful.

In https://github.com/Cornucopia-Swift/CornucopiaStreams/blob/master/Sources/CornucopiaStreams/FileHandleStreams/FileHandleInputStream.swift, we can see the following code:

override var hasBytesAvailable: Bool { true }

which unconditionally signalizes true to anyone who is actively polling for whether bytes are available.

Is removing this override an option (in a fork, obviously)? I see its only referenced in a few places anyways.

Now most of the time, this is not a problem, since most stream delegates do not proactively check for hasBytesAvailable, but rather wait until the StreamEvent HasBytesAvailable comes in.

Obviously the MQTT library works differently. It seems it doesn't wait for the stream event to come in, but tries to read beforehand.

This might be fixable in the MQTT library, but then again I wonder whether it might be beneficial to rewrite hasBytesAvailable to check with the underlying socket (i.e. by calling select with a very short timeout).

so, he is starting the read upon the event, but then he's reading one byte at a time for some reason, here is his event handler.

    // MARK: - Stream Delegate
    public func stream(_ aStream: Stream, handle eventCode: Stream.Event) {
        switch eventCode {
        case .hasBytesAvailable:
            if let input = aStream as? InputStream {
                readStream(input: input)
            }
        case .errorOccurred:
            //options.autoReconnect ? autoReconnect() : disconnect()
            break
        default:
            break
        }
    }

So another option would be to read the entire bytes available and then parse the buffer into the MQTT message chunks.

Or – even more simple… just return false until we actually send out that event in

self._delegate?.stream(self, handle: .hasBytesAvailable)

I'm not sure I follow here. I get that returning false until we send out the event is semantically equivalent to what Streams do and clients expect, but what change would effect that in CCStreams? just return super.hasBytesAvailable instead?

Would another option is to just get the pair with the code in CC_getStreamsToHost and otherwise defer to rest of the Foundation impl?

@mickeyl
Copy link
Contributor

mickeyl commented Sep 3, 2021

Thanks for your investigation @mickeyl, this is helpful.

Glad to help, I'm confident we will solve that in one way or another.

In https://github.com/Cornucopia-Swift/CornucopiaStreams/blob/master/Sources/CornucopiaStreams/FileHandleStreams/FileHandleInputStream.swift, we can see the following code:

override var hasBytesAvailable: Bool { true }

which unconditionally signalizes true to anyone who is actively polling for whether bytes are available.

Is removing this override an option (in a fork, obviously)? I see its only referenced in a few places anyways.

Unfortunately not. It is one of the methods that do not have a base class implementation in InputStream, so subclasses have to implement it.

So another option would be to read the entire bytes available and then parse the buffer into the MQTT message chunks.

Or – even more simple… just return false until we actually send out that event in

self._delegate?.stream(self, handle: .hasBytesAvailable)

I'm not sure I follow here. I get that returning false until we send out the event is semantically equivalent to what Streams do and clients expect, but what change would effect that in CCStreams? just return super.hasBytesAvailable instead?

No, we can't do that. But since we know what is actually going on on the underlying socket, we can definitely do better than returning true unconditionally. Let me try to come up with something.

Would another option is to just get the pair with the code in CC_getStreamsToHost and otherwise defer to rest of the Foundation impl?

I would be glad if that worked, but the whole socket stream infrastructure is not implemented on Foundation for Linux – hence I had to do the whole CSocketHelper & FileHandleStream dance.

mickeyl added a commit that referenced this issue Sep 3, 2021
@mickeyl
Copy link
Contributor

mickeyl commented Sep 3, 2021

@leisurehound Could you please check whether this branch improves anything on your end?

@leisurehound
Copy link
Author

leisurehound commented Sep 3, 2021

@leisurehound Could you please check whether this branch improves anything on your end?

Unfortunately no, note in this fork the guard on the fileHandle.read(upToCount:) is always failing (on arm64 raspbian buster).

Could this be due to the MQTT upgrading the socket to TLS?

        if options.useTLS {
            input.setProperty(StreamSocketSecurityLevel.tlSv1, forKey: .socketSecurityLevelKey)
            output.setProperty(StreamSocketSecurityLevel.tlSv1, forKey: .socketSecurityLevelKey)
        }

@leisurehound
Copy link
Author

I think this is unrelated to the issue, but is reading only 1 byte in FileHandleInputStream.read(buffer:maxLenght:) the correct intention?

    override func read(_ buffer: UnsafeMutablePointer<UInt8>, maxLength len: Int) -> Int {
        guard _streamStatus == .open else { return 0 }
        guard let data = try? self.fileHandle.read(upToCount: 1) else {

source

@mickeyl
Copy link
Contributor

mickeyl commented Sep 28, 2022

Since we last talked, I finally fixed reading, however the actual issue of missing SSL support is still open.

I'd love to see this eventually, but I don't have any immediate need for it. It's unfortunately not trivial to implement, since ­– at least for !APPLE systems ­– this would mean depending on a (hopefully small) TLS library.

Since I believe the 0-byte read and the inefficient reads are fixed, I'll close this issue now, but create a new one for the SSL issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants