Skip to content

Commit

Permalink
Added method for accessing the graph directly
Browse files Browse the repository at this point in the history
  • Loading branch information
jroper committed Jul 5, 2018
1 parent a26cbad commit faa2dde
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.eclipse.microprofile.reactive.streams;

import org.eclipse.microprofile.reactive.streams.spi.Graph;
import org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine;

import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -58,6 +59,10 @@ public CompletionStage<T> run() {
* @return A completion stage that will be redeemed with the result of the stream, or an error if the stream fails.
*/
public CompletionStage<T> run(ReactiveStreamsEngine engine) {
return engine.buildCompletion(graphBuilder.build(false, false));
return engine.buildCompletion(toGraph());
}

Graph toGraph() {
return graphBuilder.build(false, false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*******************************************************************************
* Copyright (c) 2018 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/

package org.eclipse.microprofile.reactive.streams;

import org.eclipse.microprofile.reactive.streams.spi.Graph;

/**
* Exists to allow access to the {@code toGraph} methods on each builder, so that these methods don't have to be
* exposed publicly in the API.
*
* This is intended only for use by implementations of the API, to get direct access to the graphs without having to
* build a publisher, processor or subscriber. This is particularly useful for cases where an implementation would
* like to do additional manipulation using its own API to the stream, but have those manipulations fused to the
* graph being manipulated.
*/
public class GraphAccessor {

private GraphAccessor() {
}

/**
* Build the graph for the given {@link PublisherBuilder}.
*
* @param publisherBuilder The builder to build the graph for.
* @return The built graph.
*/
public static Graph buildGraphFor(PublisherBuilder<?> publisherBuilder) {
return publisherBuilder.toGraph();
}

/**
* Build the graph for the given {@link ProcessorBuilder}.
*
* @param processorBuilder The builder to build the graph for.
* @return The built graph.
*/
public static Graph buildGraphFor(ProcessorBuilder<?, ?> processorBuilder) {
return processorBuilder.toGraph();
}

/**
* Build the graph for the given {@link SubscriberBuilder}.
*
* @param subscriberBuilder The builder to build the graph for.
* @return The built graph.
*/
public static Graph buildGraphFor(SubscriberBuilder<?, ?> subscriberBuilder) {
return subscriberBuilder.toGraph();
}

/**
* Build the graph for the given {@link CompletionRunner}.
*
* @param completionRunner The runner to build the graph for.
* @return The built graph.
*/
public static Graph buildGraphFor(CompletionRunner<?> completionRunner) {
return completionRunner.toGraph();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.eclipse.microprofile.reactive.streams;

import org.eclipse.microprofile.reactive.streams.spi.Graph;
import org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine;
import org.eclipse.microprofile.reactive.streams.spi.Stage;
import org.reactivestreams.Processor;
Expand Down Expand Up @@ -432,7 +433,11 @@ public Processor<T, R> buildRs() {
* @return A {@link Processor} that will run this stream.
*/
public Processor<T, R> buildRs(ReactiveStreamsEngine engine) {
return engine.buildProcessor(graphBuilder.build(true, true));
return engine.buildProcessor(toGraph());
}

Graph toGraph() {
return graphBuilder.build(true, true);
}

private <S> ProcessorBuilder<T, S> addStage(Stage stage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.eclipse.microprofile.reactive.streams;

import org.eclipse.microprofile.reactive.streams.spi.Graph;
import org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine;
import org.eclipse.microprofile.reactive.streams.spi.Stage;

Expand Down Expand Up @@ -62,7 +63,11 @@ public CompletionSubscriber<T, R> build() {
* @return A {@link CompletionSubscriber} that will run this stream.
*/
public CompletionSubscriber<T, R> build(ReactiveStreamsEngine engine) {
return engine.buildSubscriber(graphBuilder.build(true, false));
return engine.buildSubscriber(toGraph());
}

Graph toGraph() {
return graphBuilder.build(true, false);
}

ReactiveStreamsGraphBuilder getGraphBuilder() {
Expand Down

0 comments on commit faa2dde

Please sign in to comment.