Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-4775] mv function pkg from fn-harness to java-core #7866

Merged
merged 2 commits into from Feb 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -28,7 +28,7 @@
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.fn.function.ThrowingFunction;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
Expand Down
Expand Up @@ -47,8 +47,8 @@
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.function.ThrowingFunction;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
Expand Down
Expand Up @@ -40,8 +40,8 @@
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.construction.graph.PipelineValidator;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.sdk.fn.function.ThrowingConsumer;
import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
import org.apache.beam.sdk.function.ThrowingConsumer;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status;
Expand Down Expand Up @@ -74,7 +74,7 @@ public class InMemoryJobService extends JobServiceGrpc.JobServiceImplBase implem
public static InMemoryJobService create(
Endpoints.ApiServiceDescriptor stagingServiceDescriptor,
Function<String, String> stagingServiceTokenProvider,
ThrowingConsumer<String> cleanupJobFn,
ThrowingConsumer<Exception, String> cleanupJobFn,
JobInvoker invoker) {
return new InMemoryJobService(
stagingServiceDescriptor, stagingServiceTokenProvider, cleanupJobFn, invoker);
Expand All @@ -85,13 +85,13 @@ public static InMemoryJobService create(
private final ConcurrentMap<String, String> stagingSessionTokens;
private final Endpoints.ApiServiceDescriptor stagingServiceDescriptor;
private final Function<String, String> stagingServiceTokenProvider;
private final ThrowingConsumer<String> cleanupJobFn;
private final ThrowingConsumer<Exception, String> cleanupJobFn;
private final JobInvoker invoker;

private InMemoryJobService(
Endpoints.ApiServiceDescriptor stagingServiceDescriptor,
Function<String, String> stagingServiceTokenProvider,
ThrowingConsumer<String> cleanupJobFn,
ThrowingConsumer<Exception, String> cleanupJobFn,
JobInvoker invoker) {
this.stagingServiceDescriptor = stagingServiceDescriptor;
this.stagingServiceTokenProvider = stagingServiceTokenProvider;
Expand Down
Expand Up @@ -15,7 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.fn.function;
package org.apache.beam.sdk.function;

/** A {@link ThrowingConsumer} that can be closed. */
public interface CloseableThrowingConsumer<T> extends AutoCloseable, ThrowingConsumer<T> {}
public interface CloseableThrowingConsumer<ExceptionT extends Exception, T>
extends AutoCloseable, ThrowingConsumer<ExceptionT, T> {}
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.fn.function;
package org.apache.beam.sdk.function;

import java.util.function.BiConsumer;

Expand Down
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.fn.function;
package org.apache.beam.sdk.function;

import java.util.function.BiFunction;

Expand Down
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.fn.function;
package org.apache.beam.sdk.function;

import java.util.function.Consumer;

Expand All @@ -26,6 +26,6 @@
* interfaces.
*/
@FunctionalInterface
public interface ThrowingConsumer<T> {
void accept(T t) throws Exception;
public interface ThrowingConsumer<ExceptionT extends Exception, T> {
void accept(T t) throws ExceptionT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use case for this change? It looks like you only pass Exception as ExceptionT. Do you want to define contracts for other subclasses of Exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a note at the end of this comment about that part.

It's not used here, but I do have a motivating case for a ThrowingConsumer<IOException, T> in #7823.

}
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.fn.function;
package org.apache.beam.sdk.function;

import java.util.function.Function;

Expand Down
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.fn.function;
package org.apache.beam.sdk.function;

/**
* A {@link Runnable} which can throw {@link Exception}s.
Expand Down
Expand Up @@ -17,4 +17,4 @@
*/

/** Java 8 functional interface extensions. */
package org.apache.beam.sdk.fn.function;
package org.apache.beam.sdk.function;
Expand Up @@ -28,7 +28,7 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.sdk.fn.function.ThrowingFunction;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
Expand Down
Expand Up @@ -36,7 +36,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.function.ThrowingFunction;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.util.SerializableUtils;
Expand Down
Expand Up @@ -36,8 +36,8 @@
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.function.ThrowingFunction;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down
Expand Up @@ -31,7 +31,7 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.function.ThrowingFunction;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
Expand Down
Expand Up @@ -25,7 +25,7 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.core.construction.PCollectionViewTranslation;
import org.apache.beam.sdk.fn.function.ThrowingFunction;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.values.KV;
Expand Down
Expand Up @@ -29,7 +29,7 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.sdk.fn.function.ThrowingFunction;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowFn.MergeContext;
Expand Down
Expand Up @@ -30,8 +30,8 @@
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.function.ThrowingFunction;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Uninterruptibles;
Expand Down
Expand Up @@ -56,7 +56,7 @@
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.fn.function.ThrowingRunnable;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Message;
Expand Down
Expand Up @@ -28,7 +28,7 @@
import org.apache.beam.runners.core.metrics.SimpleExecutionState;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.runners.core.metrics.SimpleStateRegistry;
import org.apache.beam.sdk.fn.function.ThrowingRunnable;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.metrics.MetricsEnvironment;

/**
Expand Down
Expand Up @@ -32,7 +32,7 @@
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.fn.function.ThrowingRunnable;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
Expand Down
Expand Up @@ -38,7 +38,7 @@
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.function.ThrowingFunction;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
Expand Down
Expand Up @@ -35,7 +35,7 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.fn.function.ThrowingFunction;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
Expand Down
Expand Up @@ -23,7 +23,7 @@
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.fn.function.ThrowingFunction;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
Expand Down
Expand Up @@ -26,7 +26,7 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.sdk.fn.function.ThrowingFunction;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
Expand Down
Expand Up @@ -39,10 +39,10 @@
import org.apache.beam.model.fnexecution.v1.BeamFnApi.RegisterRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.function.ThrowingFunction;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.inprocess.InProcessServerBuilder;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.CallStreamObserver;
Expand Down
Expand Up @@ -47,7 +47,7 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy;
import org.apache.beam.sdk.fn.function.ThrowingConsumer;
import org.apache.beam.sdk.function.ThrowingConsumer;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.WindowedValue;
Expand Down Expand Up @@ -75,7 +75,7 @@ public class ProcessBundleHandlerTest {
@Rule public ExpectedException thrown = ExpectedException.none();

@Mock private BeamFnDataClient beamFnDataClient;
@Captor private ArgumentCaptor<ThrowingConsumer<WindowedValue<String>>> consumerCaptor;
@Captor private ArgumentCaptor<ThrowingConsumer<Exception, WindowedValue<String>>> consumerCaptor;

@Before
public void setUp() {
Expand Down
Expand Up @@ -24,7 +24,7 @@

import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.fn.function.ThrowingRunnable;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down