Skip to content

Commit

Permalink
Add support for initializing processors with Hazelcast Managed Context
Browse files Browse the repository at this point in the history
  • Loading branch information
cangencer committed Dec 8, 2017
1 parent 7787b83 commit 29283df
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 1 deletion.
Expand Up @@ -114,6 +114,7 @@ private OutboxImpl createOutbox(OutboundCollector ssCollector) {

@Override
public void init() {
context.getSerializationService().getManagedContext().initialize(processor);
processor.init(outbox, context);
}

Expand Down
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2008-2017, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.jet.core;

import com.hazelcast.core.ManagedContext;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.core.processor.SinkProcessors;
import com.hazelcast.jet.stream.IStreamList;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import static com.hazelcast.jet.core.Edge.between;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;

@Category(QuickTest.class)
@RunWith(HazelcastParallelClassRunner.class)
public class ManagedContextTest extends JetTestSupport {

public static final String INJECTED_VALUE = "injectedValue";
private JetInstance jet;

@Before
public void setup() {
JetConfig jetConfig = new JetConfig();
jetConfig.getHazelcastConfig().setManagedContext(new MockManagedContext());
jet = this.createJetMember(jetConfig);
}

@Test
public void when_managedContextSet_then_processorsInitWithContext() {
// Given
DAG dag = new DAG();
Vertex p = dag.newVertex("p", TestProcessor::new).localParallelism(1);
Vertex sink = dag.newVertex("sink", SinkProcessors.writeListP("sink"));
dag.edge(between(p, sink));

// When
jet.newJob(dag).join();

// Then
IStreamList<Object> list = jet.getList("sink");
assertEquals(singletonList(INJECTED_VALUE), list.subList(0, list.size()));
}


private static class MockManagedContext implements ManagedContext {

@Override
public Object initialize(Object obj) {
if (obj instanceof TestProcessor) {
return ((TestProcessor) obj).injectedValue = INJECTED_VALUE;
}
return obj;
}
}

private static class TestProcessor extends AbstractProcessor {

private String injectedValue;

@Override
public boolean complete() {
return tryEmit(injectedValue);
}
}
}
Expand Up @@ -16,11 +16,13 @@

package com.hazelcast.jet.impl.execution;

import com.hazelcast.core.ManagedContext;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.impl.execution.init.Contexts.ProcCtx;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.Before;
Expand Down Expand Up @@ -48,6 +50,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@Category(QuickTest.class)
@RunWith(HazelcastParallelClassRunner.class)
Expand All @@ -66,7 +69,10 @@ public class ProcessorTaskletTest {
public void setUp() {
this.mockInput = IntStream.range(0, MOCK_INPUT_SIZE).boxed().collect(toList());
this.processor = new PassThroughProcessor();
this.context = new ProcCtx(null, null, null, null, 0, NONE);
SerializationService service = mock(SerializationService.class);
ManagedContext managedContext = mock(ManagedContext.class);
when(service.getManagedContext()).thenReturn(managedContext);
this.context = new ProcCtx(null, service, null, null, 0, NONE);
this.instreams = new ArrayList<>();
this.outstreams = new ArrayList<>();
}
Expand Down

0 comments on commit 29283df

Please sign in to comment.