From 676cb29b60c7b13645a780f8633b3a5a7c4a034d Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 31 Oct 2016 19:26:38 -0700 Subject: [PATCH] Add OnTimerInvoker(s), for invoking DoFn @OnTimer methods OnTimerInvoker encapsulates the dispatch from onTimer() to a call to the DoFn method annotated with @OnTimer(). --- .../sdk/transforms/reflect/DoFnInvokers.java | 4 +- .../transforms/reflect/OnTimerInvoker.java | 27 ++ .../transforms/reflect/OnTimerInvokers.java | 271 ++++++++++++++++++ .../reflect/OnTimerInvokersTest.java | 109 +++++++ 4 files changed, 409 insertions(+), 2 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index b9757118c3f7c..086ae7f61c23d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -377,7 +377,7 @@ private static Implementation delegateWithDowncastOrThrow(DoFnSignature.DoFnMeth * Implements a method of {@link DoFnInvoker} (the "instrumented method") by delegating to a * "target method" of the wrapped {@link DoFn}. */ - private static class DoFnMethodDelegation implements Implementation { + static class DoFnMethodDelegation implements Implementation { /** The {@link MethodDescription} of the wrapped {@link DoFn}'s method. */ protected final MethodDescription targetMethod; /** Whether the target method returns non-void. */ @@ -529,7 +529,7 @@ private static StackManipulation simpleExtraContextParameter( MethodInvocation.invoke(getExtraContextFactoryMethodDescription(methodName))); } - private static StackManipulation getExtraContextParameter( + static StackManipulation getExtraContextParameter( DoFnSignature.Parameter parameter, final StackManipulation pushExtraContextFactory) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java new file mode 100644 index 0000000000000..de9d6677231b8 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.reflect; + +import org.apache.beam.sdk.transforms.DoFn; + +/** Interface for invoking the {@link DoFn.OnTimer} method for a particular timer. */ +interface OnTimerInvoker { + + /** Invoke the {@link DoFn.OnTimer} method in the provided context. */ + void invokeOnTimer(DoFn.ExtraContextFactory extra); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java new file mode 100644 index 0000000000000..b2bace289b0b3 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.reflect; + +import com.google.common.base.CharMatcher; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.concurrent.ExecutionException; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.NamingStrategy; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.modifier.FieldManifestation; +import net.bytebuddy.description.modifier.Visibility; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.dynamic.DynamicType; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.dynamic.scaffold.InstrumentedType; +import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy; +import net.bytebuddy.implementation.Implementation; +import net.bytebuddy.implementation.bytecode.ByteCodeAppender; +import net.bytebuddy.implementation.bytecode.StackManipulation; +import net.bytebuddy.implementation.bytecode.member.FieldAccess; +import net.bytebuddy.implementation.bytecode.member.MethodInvocation; +import net.bytebuddy.implementation.bytecode.member.MethodReturn; +import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess; +import net.bytebuddy.jar.asm.MethodVisitor; +import net.bytebuddy.matcher.ElementMatchers; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.OnTimer; +import org.apache.beam.sdk.transforms.DoFn.TimerId; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers.DoFnMethodDelegation; + +/** + * Dynamically generates {@link OnTimerInvoker} instances for invoking a particular {@link TimerId} + * on a particular {@link DoFn}. + */ +class OnTimerInvokers { + public static final OnTimerInvokers INSTANCE = new OnTimerInvokers(); + + private OnTimerInvokers() {} + + /** + * The field name for the delegate of {@link DoFn} subclass that a bytebuddy invoker will call. + */ + private static final String FN_DELEGATE_FIELD_NAME = "delegate"; + + /** + * A cache of constructors of generated {@link OnTimerInvoker} classes, keyed by {@link DoFn} + * class and then by {@link TimerId}. + * + *

Needed because generating an invoker class is expensive, and to avoid generating an + * excessive number of classes consuming PermGen memory in Java's that still have PermGen. + */ + private final LoadingCache>, LoadingCache>> + constructorCache = + CacheBuilder.newBuilder() + .build( + new CacheLoader< + Class>, LoadingCache>>() { + @Override + public LoadingCache> load( + final Class> fnClass) throws Exception { + return CacheBuilder.newBuilder().build(new OnTimerConstructorLoader(fnClass)); + } + }); + + /** Creates invoker. */ + public OnTimerInvoker forTimer( + DoFn fn, String timerId) { + + @SuppressWarnings("unchecked") + Class> fnClass = (Class>) fn.getClass(); + + try { + Constructor constructor = constructorCache.get(fnClass).get(timerId); + @SuppressWarnings("unchecked") + OnTimerInvoker invoker = + (OnTimerInvoker) constructor.newInstance(fn); + return invoker; + } catch (InstantiationException + | IllegalAccessException + | IllegalArgumentException + | InvocationTargetException + | SecurityException + | ExecutionException e) { + throw new RuntimeException( + String.format( + "Unable to construct @%s invoker for %s", + DoFn.OnTimer.class.getSimpleName(), fn.getClass().getName()), + e); + } + } + + /** + * A cache loader fixed to a particular {@link DoFn} class that loads constructors for the + * invokers for its {@link OnTimer @OnTimer} methods. + */ + private static class OnTimerConstructorLoader extends CacheLoader> { + + private final DoFnSignature signature; + + public OnTimerConstructorLoader(Class> clazz) { + this.signature = DoFnSignatures.INSTANCE.getSignature(clazz); + } + + @Override + public Constructor load(String timerId) throws Exception { + Class> invokerClass = + generateOnTimerInvokerClass(signature, timerId); + try { + return invokerClass.getConstructor(signature.fnClass()); + } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Generates a {@link OnTimerInvoker} class for the given {@link DoFnSignature} and {@link + * TimerId}. + */ + private static Class> generateOnTimerInvokerClass( + DoFnSignature signature, String timerId) { + Class> fnClass = signature.fnClass(); + + final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(fnClass); + + final String className = + "auxiliary_OnTimer_" + CharMatcher.JAVA_LETTER_OR_DIGIT.retainFrom(timerId); + + DynamicType.Builder builder = + new ByteBuddy() + // Create subclasses inside the target class, to have access to + // private and package-private bits + .with( + new NamingStrategy.SuffixingRandom(className) { + @Override + public String subclass(TypeDescription.Generic superClass) { + return super.name(clazzDescription); + } + }) + // class implements OnTimerInvoker { + .subclass(OnTimerInvoker.class, ConstructorStrategy.Default.NO_CONSTRUCTORS) + + // private final delegate; + .defineField( + FN_DELEGATE_FIELD_NAME, fnClass, Visibility.PRIVATE, FieldManifestation.FINAL) + + // ( delegate) { this.delegate = delegate; } + .defineConstructor(Visibility.PUBLIC) + .withParameter(fnClass) + .intercept(new InvokerConstructor()) + + // public invokeOnTimer(ExtraContextFactory) { + // this.delegate.<@OnTimer method>(... pass the right args ...) + // } + .method(ElementMatchers.named("invokeOnTimer")) + .intercept(new InvokeOnTimerDelegation(signature.onTimerMethods().get(timerId))); + + DynamicType.Unloaded unloaded = builder.make(); + + @SuppressWarnings("unchecked") + Class> res = + (Class>) + unloaded + .load( + OnTimerInvokers.class.getClassLoader(), ClassLoadingStrategy.Default.INJECTION) + .getLoaded(); + return res; + } + + /** + * An "invokeOnTimer" method implementation akin to @ProcessElement, but simpler because no + * splitting-related parameters need to be handled. + */ + private static class InvokeOnTimerDelegation extends DoFnMethodDelegation { + + private final DoFnSignature.OnTimerMethod signature; + + public InvokeOnTimerDelegation(DoFnSignature.OnTimerMethod signature) { + super(signature.targetMethod()); + this.signature = signature; + } + + @Override + protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) { + // Parameters of the wrapper invoker method: + // ExtraContextFactory. + // Parameters of the wrapped DoFn method: + // a dynamic set of allowed "extra" parameters in any order subject to + // validation prior to getting the DoFnSignature + ArrayList parameters = new ArrayList<>(); + // Push the extra arguments in their actual order. + StackManipulation pushExtraContextFactory = MethodVariableAccess.REFERENCE.loadOffset(1); + for (DoFnSignature.Parameter param : signature.extraParameters()) { + parameters.add(DoFnInvokers.getExtraContextParameter(param, pushExtraContextFactory)); + } + return new StackManipulation.Compound(parameters); + } + } + + /** + * A constructor {@link Implementation} for a {@link DoFnInvoker class}. Produces the byte code + * for a constructor that takes a single argument and assigns it to the delegate field. + */ + private static final class InvokerConstructor implements Implementation { + @Override + public InstrumentedType prepare(InstrumentedType instrumentedType) { + return instrumentedType; + } + + @Override + public ByteCodeAppender appender(final Target implementationTarget) { + return new ByteCodeAppender() { + @Override + public Size apply( + MethodVisitor methodVisitor, + Context implementationContext, + MethodDescription instrumentedMethod) { + StackManipulation.Size size = + new StackManipulation.Compound( + // Load the this reference + MethodVariableAccess.REFERENCE.loadOffset(0), + // Invoke the super constructor (default constructor of Object) + MethodInvocation.invoke( + new TypeDescription.ForLoadedType(Object.class) + .getDeclaredMethods() + .filter( + ElementMatchers.isConstructor() + .and(ElementMatchers.takesArguments(0))) + .getOnly()), + // Load the this reference + MethodVariableAccess.REFERENCE.loadOffset(0), + // Load the delegate argument + MethodVariableAccess.REFERENCE.loadOffset(1), + // Assign the delegate argument to the delegate field + FieldAccess.forField( + implementationTarget + .getInstrumentedType() + .getDeclaredFields() + .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME)) + .getOnly()) + .putter(), + // Return void. + MethodReturn.VOID) + .apply(methodVisitor, implementationContext); + return new Size(size.getMaximalSize(), instrumentedMethod.getStackSize()); + } + }; + } + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java new file mode 100644 index 0000000000000..f8275fa98b508 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.reflect; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.theInstance; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.when; + +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerSpec; +import org.apache.beam.sdk.util.TimerSpecs; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link DoFnInvokers}. */ +@RunWith(JUnit4.class) +public class OnTimerInvokersTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Mock private BoundedWindow mockWindow; + + @Mock private ExtraContextFactory mockExtraContextFactory; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + when(mockExtraContextFactory.window()).thenReturn(mockWindow); + } + + private void invokeOnTimer(DoFn fn, String timerId) { + OnTimerInvokers.INSTANCE.forTimer(fn, timerId).invokeOnTimer(mockExtraContextFactory); + } + + @Test + public void testOnTimerHelloWord() throws Exception { + final String timerId = "my-timer-id"; + + class SimpleTimerDoFn extends DoFn { + + public String status = "not yet"; + + @TimerId(timerId) + private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @ProcessElement + public void process(ProcessContext c) {} + + @OnTimer(timerId) + public void onMyTimer() { + status = "OK now"; + } + } + + SimpleTimerDoFn fn = new SimpleTimerDoFn(); + + invokeOnTimer(fn, timerId); + assertThat(fn.status, equalTo("OK now")); + } + + @Test + public void testOnTimerWithWindow() throws Exception { + WindowedTimerDoFn fn = new WindowedTimerDoFn(); + + invokeOnTimer(fn, WindowedTimerDoFn.TIMER_ID); + assertThat(fn.window, theInstance(mockWindow)); + } + + private static class WindowedTimerDoFn extends DoFn { + public static final String TIMER_ID = "my-timer-id"; + + public BoundedWindow window = null; + + @TimerId(TIMER_ID) + private final TimerSpec myTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @ProcessElement + public void process(ProcessContext c) {} + + @OnTimer(TIMER_ID) + public void onMyTimer(BoundedWindow window) { + this.window = window; + } + } +}