Skip to content

Commit

Permalink
[FLINK-2525]Add configuration support in Storm-compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
ffbin committed Aug 25, 2015
1 parent fb1235d commit 5dcf493
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
public class FlinkClient {

/** The client's configuration */
@SuppressWarnings("unused")
private final Map<?,?> conf;
/** The jobmanager's host name */
private final String jobManagerHost;
Expand Down Expand Up @@ -183,6 +182,11 @@ public void submitTopologyWithOpts(final String name, final String uploadedJarLo

final Configuration configuration = jobGraph.getJobConfiguration();

/* set storm configuration */
if (this.conf != null) {
configuration.putStormConf(this.conf);
}

final Client client;
try {
client = new Client(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort), configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.SubmitOptions;
import backtype.storm.generated.TopologyInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.util.ClusterUtil;

import java.util.Map;
Expand All @@ -41,8 +43,14 @@ public void submitTopology(final String topologyName, final Map<?, ?> conf, fina

public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
final SubmitOptions submitOpts) throws Exception {
ClusterUtil
.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
Configuration jobConfiguration = jobGraph.getJobConfiguration();

if (conf != null) {
jobConfiguration.putStormConf(((Map<Object, Object>) conf));
}

ClusterUtil.startOnMiniCluster(jobGraph, topology.getNumberOfTasks());
}

public void killTopology(final String topologyName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@

package org.apache.flink.stormcompatibility.wrappers;

import java.util.Collection;
import java.util.HashMap;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.IRichSpout;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

/**
* A {@link AbstractStormSpoutWrapper} wraps an {@link IRichSpout} in order to execute the Storm bolt within a Flink
* Streaming program. It takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see
Expand Down Expand Up @@ -57,6 +59,10 @@ public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceF
* Indicates, if the source is still running or was canceled.
*/
protected volatile boolean isRunning = true;
/**
* The job configuration which include storm configuration.
*/
protected Map stormConf;

/**
* Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such
Expand Down Expand Up @@ -96,10 +102,24 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(spout, rawOutputs);
}

@Override
public void open(Configuration parameters) throws Exception {
/* parameters is task configuration, we can get storm configuration only from job configuration */
RuntimeContext ctx = super.getRuntimeContext();
if (ctx instanceof StreamingRuntimeContext) {
Configuration jobConfiguration = ((StreamingRuntimeContext) ctx).getJobConfiguration();

if (jobConfiguration != null) {
stormConf = new HashMap<Object, Object>();
stormConf.putAll(jobConfiguration.getStormConf());
}
}
}

@Override
public final void run(final SourceContext<OUT> ctx) throws Exception {
this.collector = new StormSpoutCollector<OUT>(this.numberOfAttributes, ctx);
this.spout.open(null,
this.spout.open(stormConf,
StormWrapperSetupHelper
.convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true),
new SpoutOutputCollector(this.collector));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.tuple.Fields;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
Expand All @@ -36,6 +38,7 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import com.google.common.collect.Sets;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;

/**
* A {@link StormBoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming
Expand Down Expand Up @@ -205,7 +208,18 @@ public void open(final Configuration parameters) throws Exception {
this.numberOfAttributes, flinkCollector));
}

this.bolt.prepare(null, topologyContext, stormCollector);
Map stormConf = null;
RuntimeContext ctx = super.getRuntimeContext();
if (ctx instanceof StreamingRuntimeContext) {
Configuration jobConfiguration = ((StreamingRuntimeContext) ctx).getJobConfiguration();

if (jobConfiguration != null) {
stormConf = new HashMap<Object, Object>();
stormConf.putAll(jobConfiguration.getStormConf());
}
}

this.bolt.prepare(stormConf, topologyContext, stormCollector);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,26 @@
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.runtime.taskmanager.RuntimeEnvironment;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.apache.flink.stormcompatibility.util.SplitStreamType;
import org.apache.flink.streaming.api.operators.Output;
Expand All @@ -40,6 +57,7 @@
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

Expand Down Expand Up @@ -220,6 +238,36 @@ public void testOpenSink() throws Exception {
verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNull(OutputCollector.class));
}

@SuppressWarnings("unchecked")
@Test
public void testOpenWithStormConf() throws Exception {
final IRichBolt bolt = mock(IRichBolt.class);
final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);

Map stormConf = new HashMap();
stormConf.put(new String("path"), new String("/home/user/file.txt"));
stormConf.put(1, 1024);
Configuration jobConfiguration = new Configuration();
jobConfiguration.putStormConf(stormConf);
Environment env = new RuntimeEnvironment(new JobID(), new JobVertexID(), new ExecutionAttemptID(),
new String(), new String(), 1, 2, jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
mock(MemoryManager.class), mock(IOManager.class), mock(BroadcastVariableManager.class),
mock(AccumulatorRegistry.class), mock(InputSplitProvider.class), mock(Map.class),
new ResultPartitionWriter[1], new InputGate[1], mock(ActorGateway.class),
mock(TaskManagerRuntimeInfo.class));
StreamingRuntimeContext ctx = new StreamingRuntimeContext(env, new ExecutionConfig(),
mock(KeySelector.class),
mock(StateHandleProvider.class), mock(Map.class));

wrapper.setup(mock(Output.class), ctx);
wrapper.open(mock(Configuration.class));

Map mapExpect = new HashMap();
mapExpect.put(new String("path"), new String("/home/user/file.txt"));
mapExpect.put(1, 1024);
verify(bolt).prepare(eq(mapExpect), any(TopologyContext.class), any(OutputCollector.class));
}

@SuppressWarnings("unchecked")
@Test
public void testClose() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,43 @@

package org.apache.flink.stormcompatibility.wrappers;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.runtime.taskmanager.RuntimeEnvironment;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

Expand All @@ -51,6 +76,38 @@ public void testRunExecuteCancelInfinite() throws Exception {
Assert.assertEquals(new LinkedList<Tuple1<Integer>>(), collector.result);
}

@Test
public void testOpen() throws Exception {
final IRichSpout spout = mock(IRichSpout.class);
final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);

Map stormConf = new HashMap();
stormConf.put(new String("path"), new String("/home/user/file.txt"));
stormConf.put(1, 1024);
Configuration jobConfiguration = new Configuration();
jobConfiguration.putStormConf(stormConf);
Environment env = new RuntimeEnvironment(new JobID(), new JobVertexID(), new ExecutionAttemptID(),
new String(), new String(), 1, 2, jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
mock(MemoryManager.class), mock(IOManager.class), mock(BroadcastVariableManager.class),
mock(AccumulatorRegistry.class), mock(InputSplitProvider.class), mock(Map.class),
new ResultPartitionWriter[1], new InputGate[1], mock(ActorGateway.class),
mock(TaskManagerRuntimeInfo.class));
StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext(env, new ExecutionConfig(),
mock(KeySelector.class),
mock(StateHandleProvider.class), mock(Map.class));

spoutWrapper.setRuntimeContext(runtimeContext);
spoutWrapper.open(mock(Configuration.class));
final SourceFunction.SourceContext ctx = mock(SourceFunction.SourceContext.class);
spoutWrapper.cancel();
spoutWrapper.run(ctx);

Map mapExpect = new HashMap();
mapExpect.put(new String("path"), new String("/home/user/file.txt"));
mapExpect.put(1, 1024);
verify(spout).open(eq(mapExpect), any(TopologyContext.class), any(SpoutOutputCollector.class));
}

@Test
public void testClose() throws Exception {
final IRichSpout spout = mock(IRichSpout.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,18 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters

/** Stores the concrete key/value pairs of this configuration object. */
private final HashMap<String, Object> confData;


/** Stores the configuration of storm. */
private final Map stormConf;

// --------------------------------------------------------------------------------------------

/**
* Creates a new empty configuration.
*/
public Configuration() {
this.confData = new HashMap<String, Object>();
this.stormConf = new HashMap<Object, Object>();
}

/**
Expand All @@ -72,6 +76,7 @@ public Configuration() {
*/
public Configuration(Configuration other) {
this.confData = new HashMap<String, Object>(other.confData);
this.stormConf = new HashMap<Object, Object>();
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -418,6 +423,31 @@ public Set<String> keySet() {
}
}

/**
* Add storm configuration.
*
* @param map
* The map which store storm configuration.
*/
public void putStormConf(Map map) {
synchronized (this.stormConf) {
if (map != null) {
stormConf.putAll(map);
}
}
}

/**
* Returns the storm configuration stored inside this configuration object.
*
* @return the storm configuration stored inside this configuration object
*/
public Map getStormConf() {
synchronized (this.stormConf) {
return new HashMap<Object, Object>(this.stormConf);
}
}

public void addAll(Configuration other) {
synchronized (this.confData) {
synchronized (other.confData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,16 @@ public InputSplitProvider getInputSplitProvider() {
public Configuration getTaskStubParameters() {
return new TaskConfig(env.getTaskConfiguration()).getStubParameters();
}


/**
* Returns the job configuration.
*
* @return The job configuration.
*/
public Configuration getJobConfiguration() {
return env.getJobConfiguration();
}

public StateHandleProvider<Serializable> getStateHandleProvider() {
return provider;
}
Expand Down

0 comments on commit 5dcf493

Please sign in to comment.