Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams3 adaptor(s) #35

Closed
wants to merge 12 commits into from
Closed

Streams3 adaptor(s) #35

wants to merge 12 commits into from

Conversation

Fishrock123
Copy link
Owner

@Fishrock123 Fishrock123 commented Jun 13, 2019

It works!!

ReadableSink to turn BOB flow to streams3, WritableSource to turn streams3 flow into BOB.

Implements #18

Test cases: run npm test!
(manually verify the output file, readme_ or readme.gz (there is an ungzip helper in tests/).
(may require Node 13)

Use NODE_DEBUG=bob to see what is happening.

Replace readme.md with your choice of file.

R=@mcollina

Copy link

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work!!!! I've left some notes.

this.cork()
}

_write(chunk, encoding, callback) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add support for _writev as well.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that should probably wait for if bob supports multi buffer next()s or something. Otherwise it seems more reasonable to just have WritableStream manage it?

Copy link

@mcollina mcollina Jun 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. We should add multi buffed support.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be discussed at #30

helpers/bob-duplex.js Outdated Show resolved Hide resolved
helpers/bob-duplex.js Outdated Show resolved Hide resolved
helpers/bob-duplex.js Outdated Show resolved Hide resolved
if (this.source !== null) {
this.source.pull(err)
this[kDestroyCallback] = cb
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need an else here. If there is no source BUT sink exists then we are responsible for calling this.sink.next(error, status.error, ...)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually you have to destroy both the sink and the source always and wait for them both to be finished destroying ...

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should just explode if a source doesn't exist, tbh.

helpers/bob-duplex.js Outdated Show resolved Hide resolved
const fileSource = new FileSource(process.argv[2])
const bobDuplex = new BobDuplex({ highWaterMark: 1024, name: '1' })

new Stream(fileSource, bobDuplex)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This BobDuplex API is confusing to me. I think a more common API would be new BobDuplex({ hwm, name, sink, source }).

Calling bindSource() should error because it should be in the constructor instead. Calling bindSink() is fine because that's part of the implementors API and it's necessary to be a valid bob.Source

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agree it's not ideal

helpers/bob-duplex.js Outdated Show resolved Hide resolved
this.sink.next(status_type.continue, null, chunk, chunk.length)
}

_read (size) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried _read() will not get called again if we do not call push() with enough data ...

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Raynos I don't really understand... Could you elaborate?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation of Readable
Seems to imply that if you do not push at least size bytes it will not call _read again.

The current implementation seems to be doing one pull call and it might return <size bytes

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcollina could you possibly confirm this?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs seem to say:

The size argument is advisory.


// Regular data case.
if (status === status_type.continue) {
this.push(buffer.slice(0, bytes))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the return type of push() is indicating whether we need to call pull() again and it's currently ignored. I'm worried this stream will get stuck in a weird state where Readable will not call _read again because you didn't push enough data.

The behavior of Readable is different in the use case where pipe() is called vs read() + readable.on('readable', ...) etc

this.sink = sink
}

next (status, error, buffer, bytes) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure that if bytes < buffer.length we are supposed to call pull() again for this to be a correctly implemented Readable with _read

}

// If we have an error and have not already been in error state, emit it Streams3 style.
if (error !== null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking status === status_type.error is nicer then check if error !== null

@Raynos
Copy link
Collaborator

Raynos commented Jun 19, 2019

I started reading this, readable-stream, fs-sink, fs-source & pull-stream-to-stream

I read some comments in readable-stream that I knew were there 5 years ago and I read them in isaacs voice, there are many mode and many tests to write.

Please use https://github.com/dominictarr/stream-spec and friends. These data back to 2015 and was in the streams npm era where readable-stream did not exist and we were out of luck, we had to implement Readable, Writable & Duplex by hand. We had to implement the really old streams1 api of 'data', 'end', write, end, pipe, etc.

There is no stream-spec module for streams2 or streams3 that I can find.

@Raynos
Copy link
Collaborator

Raynos commented Jun 19, 2019

Talking of spec, can we port these to bob

I believe Verify is a partial implementation of it.

@Fishrock123
Copy link
Owner Author

This BobDuplex API is confusing to me. I think a more common API would be new BobDuplex({ hwm, name, sink, source }).

(#35 (comment))

@Raynos What if this extended / was the (bob) Stream() utility?

It looks like e,g, const stream = new Stream(streamSource, new PassThrough(), streamSink) and maybe it or some version could be a streams3-able? (Note: Idk if that would actually work but it's worth thinking about).

@Fishrock123 Fishrock123 changed the base branch from master to automated-tests July 11, 2019 22:50
@Fishrock123
Copy link
Owner Author

Updated with added automated tests!

@Fishrock123
Copy link
Owner Author

Ok so since there's some comments here about the API design in general, here's The Dream:

  • get rid of BobDuplex
  • build core streams on top of bob
  • make 'the old' core Stream() api take source, ...sinks arguments

Basically this isn't ideal but it is an important between step.

@Fishrock123 Fishrock123 changed the base branch from master to verification-passthrough July 16, 2019 20:27
Fishrock123 added a commit that referenced this pull request Jul 16, 2019
@Raynos
Copy link
Collaborator

Raynos commented Jul 17, 2019

We will need BobDuplex for backcompat.

I imagine for node core we want to expose new APIs that use bob streams and deprecate streams3 APIs.

I don't think we can make a breaking change, we will always need streams3 and thus BobDuplex.

Also for things like http it might just be too little, too late and we can only implement bob streams APIs for http2

@Fishrock123
Copy link
Owner Author

Fishrock123 commented Jul 17, 2019

I think you're maybe missing the idea.

The idea is and always has been to implement Readable and Writable on top of "bob streams".

Which is probably the only way we can actually get a reasonable transition to happen.

@Raynos
Copy link
Collaborator

Raynos commented Jul 17, 2019

The idea is and always has been to implement Readable and Writable on top of "bob streams".

BobDuplex implements the Readable and Writable API; i guess you can implement a BobReadable and BobWritable if we wanted to.

@Fishrock123
Copy link
Owner Author

Correct, but it is technically then the wrong way around - it takes a Duplex and implements Bob connectors on top.

Using that then, you can stick it "inbetween" a bob stream and a streams3 stream to connect them.

Ideally though, that wouldn't be necessary - you'd just set your stream up and it both would be a streams3 stream and also streams3 streams would have the bob api "underneath".

@Fishrock123 Fishrock123 changed the base branch from verification-passthrough to master July 17, 2019 17:47
Fishrock123 added a commit that referenced this pull request Jul 17, 2019
Fishrock123 added a commit that referenced this pull request Jul 18, 2019
Tests pass, seems unnecessary as per comments.

Just in case, though, I want to keep these in-place.
yeah, there's no standard way to do this
for version pinning stability
@Fishrock123
Copy link
Owner Author

Fishrock123 commented Jul 22, 2019

@Raynos @mcollina Updated, and split from BobDuplex into ReadableSink and WritableSource.

Hopefully the api is more clear and less confusing, and also more separated and reasonable to understand. I still need to do some state cleanup, I think.

Should handle things a bit better.
This error handling seems more correct to me.
@Fishrock123 Fishrock123 requested a review from jasnell July 23, 2019 19:24
@Fishrock123
Copy link
Owner Author

Ok, clean up pushed. Error handling and some other state stuff should be more correct now, I think.

const writeCb = this[kWriteCallback]
this[kWriteCallback] = null

// If we don't yet have a write callback from an attempted Streams3 write, just bail.
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I don't know what to do about though, is this.

I can't get it to work without bailing in this case no matter how the streams are composed in user code...

Copy link
Collaborator

@Raynos Raynos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Readable class looks legit, a bunch of comments for the Writable class.

this[kPulling] = false

// Send data to our sink.
this.sink.next(status_type.continue, null, chunk, chunk.length)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to implement buffering here ? Same in the read case. We may already be "piping" data before we have hooked up the sinks and sources.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not; add a check for this.sink and throw some kind of readable error about writing to stream before new Stream() or source.bindSink() was called.

const encoding = this._readableState.encoding || this._readableState.defaultEncoding
chunk = Buffer.from(chunk, encoding)
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this needs an if check here if (this[kPulling]) { cb() } else { this[kWriteCallbac] = callback }

this[kPulling] = false

// Send data to our sink.
this.sink.next(status_type.continue, null, chunk, chunk.length)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this[kPulling] is false here then we do not want to call this.sink.next() yet; we want to store the current chunk in this[kPendingChunk]

@Fishrock123 Fishrock123 changed the title Streams3 adaptor Streams3 adaptor(s) Dec 11, 2019
Fishrock123 added a commit that referenced this pull request Dec 11, 2019
These two classes are a way of interop between BOB and Streams3.

Closes: #18
PR-URL: #35
@Fishrock123
Copy link
Owner Author

Landed ReadableSink and WritableSource in 57a78d1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants