diff --git a/hazelcast-jet-core/src/main/java/com/hazelcast/jet/impl/execution/ProcessorTasklet.java b/hazelcast-jet-core/src/main/java/com/hazelcast/jet/impl/execution/ProcessorTasklet.java index 36d9ddf5df6..2ff7748bc9f 100644 --- a/hazelcast-jet-core/src/main/java/com/hazelcast/jet/impl/execution/ProcessorTasklet.java +++ b/hazelcast-jet-core/src/main/java/com/hazelcast/jet/impl/execution/ProcessorTasklet.java @@ -114,6 +114,7 @@ private OutboxImpl createOutbox(OutboundCollector ssCollector) { @Override public void init() { + context.getSerializationService().getManagedContext().initialize(processor); processor.init(outbox, context); } diff --git a/hazelcast-jet-core/src/test/java/com/hazelcast/jet/core/ManagedContextTest.java b/hazelcast-jet-core/src/test/java/com/hazelcast/jet/core/ManagedContextTest.java new file mode 100644 index 00000000000..1e06a4eb194 --- /dev/null +++ b/hazelcast-jet-core/src/test/java/com/hazelcast/jet/core/ManagedContextTest.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2008-2017, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.jet.core; + +import com.hazelcast.core.ManagedContext; +import com.hazelcast.jet.JetInstance; +import com.hazelcast.jet.config.JetConfig; +import com.hazelcast.jet.core.processor.SinkProcessors; +import com.hazelcast.jet.stream.IStreamList; +import com.hazelcast.test.HazelcastParallelClassRunner; +import com.hazelcast.test.annotation.QuickTest; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import static com.hazelcast.jet.core.Edge.between; +import static java.util.Collections.singletonList; +import static org.junit.Assert.assertEquals; + +@Category(QuickTest.class) +@RunWith(HazelcastParallelClassRunner.class) +public class ManagedContextTest extends JetTestSupport { + + public static final String INJECTED_VALUE = "injectedValue"; + private JetInstance jet; + + @Before + public void setup() { + JetConfig jetConfig = new JetConfig(); + jetConfig.getHazelcastConfig().setManagedContext(new MockManagedContext()); + jet = this.createJetMember(jetConfig); + } + + @Test + public void when_managedContextSet_then_processorsInitWithContext() { + // Given + DAG dag = new DAG(); + Vertex p = dag.newVertex("p", TestProcessor::new).localParallelism(1); + Vertex sink = dag.newVertex("sink", SinkProcessors.writeListP("sink")); + dag.edge(between(p, sink)); + + // When + jet.newJob(dag).join(); + + // Then + IStreamList list = jet.getList("sink"); + assertEquals(singletonList(INJECTED_VALUE), list.subList(0, list.size())); + } + + + private static class MockManagedContext implements ManagedContext { + + @Override + public Object initialize(Object obj) { + if (obj instanceof TestProcessor) { + return ((TestProcessor) obj).injectedValue = INJECTED_VALUE; + } + return obj; + } + } + + private static class TestProcessor extends AbstractProcessor { + + private String injectedValue; + + @Override + public boolean complete() { + return tryEmit(injectedValue); + } + } +} diff --git a/hazelcast-jet-core/src/test/java/com/hazelcast/jet/impl/execution/ProcessorTaskletTest.java b/hazelcast-jet-core/src/test/java/com/hazelcast/jet/impl/execution/ProcessorTaskletTest.java index 92dd3d96c6b..049415f4ceb 100644 --- a/hazelcast-jet-core/src/test/java/com/hazelcast/jet/impl/execution/ProcessorTaskletTest.java +++ b/hazelcast-jet-core/src/test/java/com/hazelcast/jet/impl/execution/ProcessorTaskletTest.java @@ -16,11 +16,13 @@ package com.hazelcast.jet.impl.execution; +import com.hazelcast.core.ManagedContext; import com.hazelcast.jet.core.Inbox; import com.hazelcast.jet.core.Outbox; import com.hazelcast.jet.core.Processor; import com.hazelcast.jet.impl.execution.init.Contexts.ProcCtx; import com.hazelcast.jet.impl.util.ProgressState; +import com.hazelcast.spi.serialization.SerializationService; import com.hazelcast.test.HazelcastParallelClassRunner; import com.hazelcast.test.annotation.QuickTest; import org.junit.Before; @@ -48,6 +50,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; @Category(QuickTest.class) @RunWith(HazelcastParallelClassRunner.class) @@ -66,7 +69,10 @@ public class ProcessorTaskletTest { public void setUp() { this.mockInput = IntStream.range(0, MOCK_INPUT_SIZE).boxed().collect(toList()); this.processor = new PassThroughProcessor(); - this.context = new ProcCtx(null, null, null, null, 0, NONE); + SerializationService service = mock(SerializationService.class); + ManagedContext managedContext = mock(ManagedContext.class); + when(service.getManagedContext()).thenReturn(managedContext); + this.context = new ProcCtx(null, service, null, null, 0, NONE); this.instreams = new ArrayList<>(); this.outstreams = new ArrayList<>(); }