Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add asInputStream to ByteString #1085

Merged
merged 17 commits into from
Feb 5, 2024

Conversation

pjfanning
Copy link
Contributor

@pjfanning pjfanning commented Jan 30, 2024

See #995

This needs tests but raising it for discussion.

This is probably not that useful for Pekko usage but general ByteString users might appreciate it.

This method gets an InputStream without cloning the array but is not unsafe like toArrayUnsafe which exposes the underlying array data in a way that it can be unsafely changed. The InputStream wraps this unsafe array in a way that doesn't allow it to be modified.

I've done some experimentation and ByteArrayInputStream works just as well as alternative implementations in commons-io and fastutil.

apache/pekko-http#424 is a good enough solution for Pekko HTTP.

@pjfanning pjfanning marked this pull request as draft January 30, 2024 15:31
@pjfanning pjfanning changed the title [DRAFT] add getInputStream to ByteString [DRAFT] add asInputStream to ByteString Jan 30, 2024
* @see [[asByteBuffer]]
* @since 1.1.0
*/
final def asInputStream: InputStream = new ByteArrayInputStream(toArrayUnsafe())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The toArrayUnsafe documentation states:

If the ByteString is backed by a single array it is returned without any copy. If it is backed by a rope
of multiple ByteString instances a new array will be allocated and the contents will be copied
into it before returning it

So we could avoid that allocation either by:

  • defining def asInputStream: InputStream as an abstract method, and add different implementations that avoid allocations
    • using new ByteArrayInputStream(bytes) for ByteString1C,
    • using new ByteArrayInputStream(bytes, start, length) for ByteString1,
    • using new SequenceInputStream(bytestrings.map(_.asInputStream)) for ByteStrings
  • Implement in the super class using SequenceInputStream(asByteBuffers.map(bb => new ByteBufferBackedInputStream(bb))

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was about to write the same, this should be the implementation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point - I have committed a change based on @jtjeferreira's suggestions

Copy link
Contributor

@jtjeferreira jtjeferreira left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but 2 notes:

  • mima is complaining
  • should some benchmarks be written?

@mdedetrich
Copy link
Contributor

  • mima is complaining
[error] pekko-actor: Failed binary compatibility check against org.apache.pekko:pekko-actor_2.12:1.0.0! Found 1 potential problems
[error]  * abstract method asInputStream()java.io.InputStream in class org.apache.pekko.util.ByteString is present only in current version
[error]    filter with: ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.util.ByteString.asInputStream")

This is a forward compatibility error and is expected, i.e. MiMa complains when you add a new method to a public interface

  • should some benchmarks be written?

Yes they should

@pjfanning
Copy link
Contributor Author

this is still WIP

  • more test coverage needed
  • basic benchmarks added

@mdedetrich
Copy link
Contributor

If it's not too pushy it would be good to get this out for M1, I suggested on mailing list we should do M1 around mid next week

@pjfanning pjfanning changed the title [DRAFT] add asInputStream to ByteString add asInputStream to ByteString Feb 1, 2024
@pjfanning pjfanning marked this pull request as ready for review February 1, 2024 09:46
@pjfanning
Copy link
Contributor Author

I have a basic benchmark that shows that ByteString.asInputStream is faster than new ByteArrayInputStream(ByteString.toArray). The latter is the current safe way to create an InputStream for a ByteString.

I have also tested ByteArrayInputStream(ByteString.toArrayUnsafe) which is about as fast as ByteString.asInputStream for simple ByteStrings (single arrays) but is slow like ByteArrayInputStream(ByteString.toArray) when you have byte strings composed of many smaller byte strings.

@@ -566,6 +572,9 @@ object ByteString {

def asByteBuffers: scala.collection.immutable.Iterable[ByteBuffer] = bytestrings.map { _.asByteBuffer }

override def asInputStream: InputStream =
new SequenceInputStream(bytestrings.map(_.asInputStream).iterator.asJavaEnumeration)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bytestrings.iterator.map(_.asInputString).asJavaEnumeration

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@pjfanning I will send a suggestion tomorrow about this line, currently on my phone.

@He-Pin
Copy link
Member

He-Pin commented Feb 1, 2024

I just checked in Netty, there is a public class ByteBufInputStream extends InputStream implements DataInput, Do you think we can have a class ByteStringInputString extends InputStream implements DataInput?

@pjfanning
Copy link
Contributor Author

I just checked in Netty, there is a public class ByteBufInputStream extends InputStream implements DataInput, Do you think we can have a class ByteStringInputString extends InputStream implements DataInput?

I checked that already. Unfortunately, Scala conversion to JavaEnumeration only seem to work on Scala Iterators and not on Scala Collections.

With this change:

[error] /Users/pj.fanning/code/incubator-pekko/actor/src/main/scala-2.13/org/apache/pekko/util/ByteString.scala:586:64: value asJavaEnumeration is not a member of scala.collection.immutable.Vector[java.io.InputStream]
[error]       new SequenceInputStream(bytestrings.map(_.asInputStream).asJavaEnumeration)

@He-Pin
Copy link
Member

He-Pin commented Feb 1, 2024

I checked that already. Unfortunately, Scala conversion to JavaEnumeration only seem to work on Scala Iterators and not on Scala Collections.

With this change:

[error] /Users/pj.fanning/code/incubator-pekko/actor/src/main/scala-2.13/org/apache/pekko/util/ByteString.scala:586:64: value asJavaEnumeration is not a member of scala.collection.immutable.Vector[java.io.InputStream]
[error]       new SequenceInputStream(bytestrings.map(_.asInputStream).asJavaEnumeration)

bytestrings.iterator.map(_.asInputStream).asJavaEnumeration

@pjfanning
Copy link
Contributor Author

I just checked in Netty, there is a public class ByteBufInputStream extends InputStream implements DataInput, Do you think we can have a class ByteStringInputString extends InputStream implements DataInput?

We are dealing with wrapping byte arrays - the perfect InputStream for wrapping a byte array in ByteArrayInputStream.

@pjfanning
Copy link
Contributor Author

@He-Pin I think bytestrings.iterator.map(_.asInputStream).asJavaEnumeration will have the same overhead as bytestrings.map(_.asInputStream).iterator.asJavaEnumeration. I'd prefer not to make changes like this and then have to rerun all the benchmarks.

@He-Pin
Copy link
Member

He-Pin commented Feb 1, 2024

It will defer the mapping if you don't read.

There is no need to rerun the benchmark.

Copy link
Member

@He-Pin He-Pin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm

@pjfanning
Copy link
Contributor Author

It will defer the mapping if you don't read.

it is being read - there is nothing gained here by thinking there is lazy eval anywhere here. SequenceInputStream takes an evaluated JavaEnumeration. For me, if we adjust this, we could write a custom function that maps a Vector to a JavaEnumeration without requiring the intermediate iterator.

@@ -579,6 +583,9 @@ object ByteString {

def asByteBuffers: scala.collection.immutable.Iterable[ByteBuffer] = bytestrings.map { _.asByteBuffer }

override def asInputStream: InputStream =
new SequenceInputStream(bytestrings.map(_.asInputStream).iterator.asJavaEnumeration)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
new SequenceInputStream(bytestrings.map(_.asInputStream).iterator.asJavaEnumeration)
new SequenceInputStream(bytestrings.iterator.map(_.asInputStream).asJavaEnumeration)

Copy link
Member

@He-Pin He-Pin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm

@pjfanning pjfanning added this to the 1.1.0-M1 milestone Feb 5, 2024
@pjfanning pjfanning merged commit ef628ea into apache:main Feb 5, 2024
17 of 18 checks passed
@pjfanning pjfanning deleted the byte-string-input-stream branch February 5, 2024 15:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants