Skip to content

Commit

Permalink
[FLINK-2525]Add configuration support in Storm-compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
ffbin committed Aug 31, 2015
1 parent 91069d1 commit 28c81dc
Show file tree
Hide file tree
Showing 16 changed files with 437 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;

import org.apache.flink.util.InstantiationUtil;
import scala.Some;
import scala.concurrent.Await;
import scala.concurrent.Future;
Expand All @@ -67,7 +68,6 @@
public class FlinkClient {

/** The client's configuration */
@SuppressWarnings("unused")
private final Map<?,?> conf;
/** The jobmanager's host name */
private final String jobManagerHost;
Expand Down Expand Up @@ -183,6 +183,16 @@ public void submitTopologyWithOpts(final String name, final String uploadedJarLo

final Configuration configuration = jobGraph.getJobConfiguration();

/* set storm configuration */
if (this.conf != null) {
try {
byte[] bytes = InstantiationUtil.serializeObject(this.conf);
configuration.setBytes(ConfigConstants.STORM_DEFAULT_CONFIG, bytes);
} catch (final IOException e) {
throw new RuntimeException("Problem with serialize storm configuration", e);
}
}

final Client client;
try {
client = new Client(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort), configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.SubmitOptions;
import backtype.storm.generated.TopologyInfo;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.util.ClusterUtil;
import org.apache.flink.util.InstantiationUtil;

import java.util.Map;

Expand All @@ -41,7 +45,15 @@ public void submitTopology(final String topologyName, final Map<?, ?> conf, fina

public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
final SubmitOptions submitOpts) throws Exception {
ClusterUtil.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
Configuration jobConfiguration = jobGraph.getJobConfiguration();

if (conf != null) {
byte[] bytes = InstantiationUtil.serializeObject(conf);
jobConfiguration.setBytes(ConfigConstants.STORM_DEFAULT_CONFIG, bytes);
}

ClusterUtil.startOnMiniCluster(jobGraph, topology.getNumberOfTasks());
}

public void killTopology(final String topologyName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@

package org.apache.flink.stormcompatibility.wrappers;

import java.util.Collection;
import java.util.HashMap;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.IRichSpout;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.apache.flink.util.InstantiationUtil;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

/**
* A {@link AbstractStormSpoutWrapper} wraps an {@link IRichSpout} in order to execute the Storm bolt within a Flink
Expand Down Expand Up @@ -57,6 +62,10 @@ public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceF
* Indicates, if the source is still running or was canceled.
*/
protected volatile boolean isRunning = true;
/**
* The job configuration which include storm configuration.
*/
protected Map stormConf;

/**
* Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such
Expand Down Expand Up @@ -96,10 +105,43 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(spout, rawOutputs);
}

@Override
public void open(Configuration parameters) throws Exception {
stormConf = new HashMap();

/* parameters is task configuration, we can get storm configuration only from job configuration */
RuntimeContext ctx = super.getRuntimeContext();
if (ctx instanceof StreamingRuntimeContext) {
Configuration jobConfiguration = ((StreamingRuntimeContext) ctx).getJobConfiguration();

if (jobConfiguration != null) {
byte[] bytes = jobConfiguration.getBytes(ConfigConstants.STORM_DEFAULT_CONFIG, null);

/* topologies mode */
if (bytes != null) {
Map map = (Map)InstantiationUtil.deserializeObject(bytes, Map.class.getClassLoader());
stormConf.putAll(map);
}
/* embedded mode */
else {
bytes = (byte[])jobConfiguration.getBytes(ExecutionConfig.CONFIG_KEY, null);
if (bytes != null) {
ExecutionConfig executionConfig;
executionConfig = (ExecutionConfig)InstantiationUtil.deserializeObject(bytes, Map.class.getClassLoader());
Configuration jobParameters = (Configuration)executionConfig.getGlobalJobParameters();
if (jobParameters != null) {
stormConf.putAll(jobParameters.getConfDataClone());
}
}
}
}
}
}

@Override
public final void run(final SourceContext<OUT> ctx) throws Exception {
this.collector = new StormSpoutCollector<OUT>(this.numberOfAttributes, ctx);
this.spout.open(null,
this.spout.open(stormConf,
StormWrapperSetupHelper
.convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true),
new SpoutOutputCollector(this.collector));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.tuple.Fields;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.stormcompatibility.util.SplitStreamType;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
Expand All @@ -36,6 +40,8 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import com.google.common.collect.Sets;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.apache.flink.util.InstantiationUtil;

/**
* A {@link StormBoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming
Expand Down Expand Up @@ -205,7 +211,35 @@ public void open(final Configuration parameters) throws Exception {
this.numberOfAttributes, flinkCollector));
}

this.bolt.prepare(null, topologyContext, stormCollector);
Map stormConf = new HashMap();
RuntimeContext ctx = super.getRuntimeContext();
if (ctx instanceof StreamingRuntimeContext) {
Configuration jobConfiguration = ((StreamingRuntimeContext) ctx).getJobConfiguration();

if (jobConfiguration != null) {
byte[] bytes = jobConfiguration.getBytes(ConfigConstants.STORM_DEFAULT_CONFIG, null);

/* topologies mode */
if (bytes != null) {
Map map = (Map) InstantiationUtil.deserializeObject(bytes, Map.class.getClassLoader());
stormConf.putAll(map);
}
/* embedded mode */
else {
bytes = (byte[])jobConfiguration.getBytes(ExecutionConfig.CONFIG_KEY, null);
if (bytes != null) {
ExecutionConfig executionConfig;
executionConfig = (ExecutionConfig)InstantiationUtil.deserializeObject(bytes, Map.class.getClassLoader());
Configuration jobParameters = (Configuration)executionConfig.getGlobalJobParameters();
if (jobParameters != null) {
stormConf.putAll(jobParameters.getConfDataClone());
}
}
}
}
}

this.bolt.prepare(stormConf, topologyContext, stormCollector);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,41 @@
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.runtime.taskmanager.RuntimeEnvironment;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.apache.flink.stormcompatibility.util.SplitStreamType;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.apache.flink.util.InstantiationUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

Expand Down Expand Up @@ -220,6 +240,64 @@ public void testOpenSink() throws Exception {
verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNull(OutputCollector.class));
}

@SuppressWarnings("unchecked")
@Test
public void testOpenWithStormConf() throws Exception {
final IRichBolt bolt = mock(IRichBolt.class);
final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);

Map stormConf = new HashMap();
stormConf.put(new String("path"), new String("/home/user/file.txt"));
stormConf.put(1, 1024);
byte[] bytes = InstantiationUtil.serializeObject(stormConf);
Configuration jobConfiguration = new Configuration();
jobConfiguration.setBytes(ConfigConstants.STORM_DEFAULT_CONFIG, bytes);
jobConfiguration.setInteger(new String("port"), 5566);
Environment env = new RuntimeEnvironment(new JobID(), new JobVertexID(), new ExecutionAttemptID(),
new String(), new String(), 1, 2, jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
mock(MemoryManager.class), mock(IOManager.class), mock(BroadcastVariableManager.class),
mock(AccumulatorRegistry.class), mock(InputSplitProvider.class), mock(Map.class),
new ResultPartitionWriter[1], new InputGate[1], mock(ActorGateway.class),
mock(TaskManagerRuntimeInfo.class));
StreamingRuntimeContext ctx = new StreamingRuntimeContext(env, new ExecutionConfig(),
mock(KeySelector.class),
mock(StateHandleProvider.class), mock(Map.class));

wrapper.setup(mock(Output.class), ctx);
wrapper.open(mock(Configuration.class));

verify(bolt).prepare(eq(stormConf), any(TopologyContext.class), any(OutputCollector.class));
}

@SuppressWarnings("unchecked")
@Test
public void testOpenWithJobConf() throws Exception {
final IRichBolt bolt = mock(IRichBolt.class);
final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);

ExecutionConfig executionConfig = new ExecutionConfig();
Configuration jobParameters = new Configuration();
jobParameters.setString(new String("path"), new String("/home/user/file.txt"));
executionConfig.setGlobalJobParameters(jobParameters);
byte[] bytes = InstantiationUtil.serializeObject(executionConfig);
Configuration jobConfiguration = new Configuration();
jobConfiguration.setBytes(ExecutionConfig.CONFIG_KEY, bytes);
Environment env = new RuntimeEnvironment(new JobID(), new JobVertexID(), new ExecutionAttemptID(),
new String(), new String(), 1, 2, jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
mock(MemoryManager.class), mock(IOManager.class), mock(BroadcastVariableManager.class),
mock(AccumulatorRegistry.class), mock(InputSplitProvider.class), mock(Map.class),
new ResultPartitionWriter[1], new InputGate[1], mock(ActorGateway.class),
mock(TaskManagerRuntimeInfo.class));
StreamingRuntimeContext ctx = new StreamingRuntimeContext(env, new ExecutionConfig(),
mock(KeySelector.class),
mock(StateHandleProvider.class), mock(Map.class));

wrapper.setup(mock(Output.class), ctx);
wrapper.open(mock(Configuration.class));

verify(bolt).prepare(eq(jobParameters.toMap()), any(TopologyContext.class), any(OutputCollector.class));
}

@SuppressWarnings("unchecked")
@Test
public void testClose() throws Exception {
Expand Down

0 comments on commit 28c81dc

Please sign in to comment.