diff --git a/src/main/asciidoc/streams.adoc b/src/main/asciidoc/streams.adoc
index 94aac890d71..7912b4b0bac 100644
--- a/src/main/asciidoc/streams.adoc
+++ b/src/main/asciidoc/streams.adoc
@@ -170,3 +170,15 @@ returns `true` if the write queue is considered full.
Will be called if an exception occurs on the `WriteStream`.
- {@link io.vertx.core.streams.WriteStream#drainHandler}:
The handler will be called if the `WriteStream` is considered no longer full.
+
+=== Reducing streams
+
+Java collectors can reduce a `ReadStream` to a result in a similar fashion `java.util.Stream` does, yet in an asynchronous
+fashion.
+
+[source,$lang]
+----
+{@link examples.StreamsExamples#reduce1}
+----
+
+Note that `collect` overrides any previously handler set on the stream.
diff --git a/src/main/java/examples/StreamsExamples.java b/src/main/java/examples/StreamsExamples.java
index 1d13fb3a82d..a130eeb0c9d 100644
--- a/src/main/java/examples/StreamsExamples.java
+++ b/src/main/java/examples/StreamsExamples.java
@@ -11,7 +11,7 @@
package examples;
-import io.vertx.core.Handler;
+import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
@@ -20,6 +20,9 @@
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.streams.Pipe;
+import io.vertx.core.streams.ReadStream;
+
+import java.util.stream.Collectors;
/**
* @author Julien Viet
@@ -164,4 +167,11 @@ public void pipe9(AsyncFile src, AsyncFile dst) {
dst.end(Buffer.buffer("done"));
});
}
+
+ public void reduce1(ReadStream stream) {
+ // Count the number of elements
+ Future result = stream.collect(Collectors.counting());
+
+ result.onSuccess(count -> System.out.println("Stream emitted " + count + " elements"));
+ }
}
diff --git a/src/main/java/io/vertx/core/streams/ReadStream.java b/src/main/java/io/vertx/core/streams/ReadStream.java
index 731919d16f7..50eca7a4e21 100644
--- a/src/main/java/io/vertx/core/streams/ReadStream.java
+++ b/src/main/java/io/vertx/core/streams/ReadStream.java
@@ -12,14 +12,17 @@
package io.vertx.core.streams;
import io.vertx.codegen.annotations.Fluent;
+import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.codegen.annotations.VertxGen;
-import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
+import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.streams.impl.PipeImpl;
+import java.util.function.BiConsumer;
+
/**
* Represents a stream of items that can be read from.
*