Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2525]Add configuration support in Storm-compatibility #1046

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/apis/storm_compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ The input type is `Tuple1<String>` and `Fields("sentence")` specify that `input.

See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java) for examples.

## Configure for embedded Spouts/Bolts
Embedded Spouts/Bolts can be configure with user defined parameters.
User defined parameters is stored in a `Map`(as in Storm).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... are stored ...

And this Map is provided as a parameter in the calls `Spout.open(...)` and `Bolt.prepare(...)`.
Configuration can be used in storm topologies mode or flink mode.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

capitalize Storm and Flink

If a whole topology is executed using `FlinkTopologyBuilder` etc., there is no special attention required &ndash; it works as in regular Storm.

## Multiple Output Streams

Flink can also handle the declaration of multiple output streams for Spouts and Bolts.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
The Storm compatibility layer allows to embed spouts or bolt unmodified within a regular Flink streaming program (`StormSpoutWrapper` and `StormBoltWrapper`). Additionally, a whole Storm topology can be submitted to Flink (see `FlinkTopologyBuilder`, `FlinkLocalCluster`, and `FlinkSubmitter`). Only a few minor changes to the original submitting code are required. The code that builds the topology itself, can be reused unmodified. See `flink-storm-examples` for a simple word-count example.

The following Strom features are not (yet/fully) supported by the compatibility layer right now:
* the spout/bolt configuration within `open()`/`prepare()` is not yet supported (ie, `Map conf` parameter)
* topology and tuple meta information (ie, `TopologyContext` not fully supported)
* no fault-tolerance guarantees (ie, calls to `ack()`/`fail()` and anchoring is ignored)
* for whole Storm topologies the following is not supported by Flink:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;

import org.apache.flink.stormcompatibility.util.StormConfig;
import org.apache.flink.util.InstantiationUtil;
import scala.Some;
import scala.concurrent.Await;
import scala.concurrent.Future;
Expand All @@ -66,7 +68,6 @@
public class FlinkClient {

/** The client's configuration */
@SuppressWarnings("unused")
private final Map<?,?> conf;
/** The jobmanager's host name */
private final String jobManagerHost;
Expand Down Expand Up @@ -182,6 +183,15 @@ public void submitTopologyWithOpts(final String name, final String uploadedJarLo

final Configuration configuration = jobGraph.getJobConfiguration();

/* set storm configuration */
if (this.conf != null) {
try {
InstantiationUtil.writeObjectToConfig(this.conf, configuration, StormConfig.STORM_DEFAULT_CONFIG);
} catch (final IOException e) {
throw new RuntimeException("Problem with serialize storm configuration", e);
}
}

final Client client;

configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.SubmitOptions;
import backtype.storm.generated.TopologyInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.stormcompatibility.util.StormConfig;
import org.apache.flink.streaming.util.ClusterUtil;
import org.apache.flink.util.InstantiationUtil;

import java.util.Map;

Expand All @@ -41,7 +45,14 @@ public void submitTopology(final String topologyName, final Map<?, ?> conf, fina

public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
final SubmitOptions submitOpts) throws Exception {
ClusterUtil.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
Configuration jobConfiguration = jobGraph.getJobConfiguration();

if (conf != null) {
InstantiationUtil.writeObjectToConfig(conf, jobConfiguration, StormConfig.STORM_DEFAULT_CONFIG);
}

ClusterUtil.startOnMiniCluster(jobGraph, topology.getNumberOfTasks());
}

public void killTopology(final String topologyName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.stormcompatibility.util;

/**
* This class contains all constants for the storm configuration used in storm-compatibility.
*/
public final class StormConfig {

/**
* storm configuration
*/
public static final String STORM_DEFAULT_CONFIG = "storm.config";

/**
* Not instantiable.
*/
private StormConfig() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@

package org.apache.flink.stormcompatibility.wrappers;

import java.util.Collection;
import java.util.HashMap;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.IRichSpout;

import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

/**
* A {@link AbstractStormSpoutWrapper} wraps an {@link IRichSpout} in order to execute the Storm bolt within a Flink
* Streaming program. It takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see
Expand Down Expand Up @@ -99,7 +99,15 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
@Override
public final void run(final SourceContext<OUT> ctx) throws Exception {
this.collector = new StormSpoutCollector<OUT>(this.numberOfAttributes, ctx);
this.spout.open(null,

/* parameters is task configuration, we can get storm configuration only from job configuration */
Map config = new HashMap();
Map stormConf = StormWrapperSetupHelper.getStormConfFromContext(super.getRuntimeContext());
if (stormConf != null) {
config.putAll(stormConf);
}

this.spout.open(config,
StormWrapperSetupHelper
.convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true),
new SpoutOutputCollector(this.collector));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
Expand Down Expand Up @@ -205,7 +206,12 @@ public void open(final Configuration parameters) throws Exception {
this.numberOfAttributes, flinkCollector));
}

this.bolt.prepare(null, topologyContext, stormCollector);
Map config = new HashMap();
Map stormConf = StormWrapperSetupHelper.getStormConfFromContext(super.getRuntimeContext());
if (stormConf != null) {
config.putAll(stormConf);
}
this.bolt.prepare(config, topologyContext, stormCollector);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.stormcompatibility.api.FlinkTopologyContext;
import org.apache.flink.stormcompatibility.util.StormConfig;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.apache.flink.util.InstantiationUtil;

import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -107,4 +112,39 @@ public static TopologyContext convertToTopologyContext(final StreamingRuntimeCon
return new FlinkTopologyContext(new StormTopology(spoutSpecs, bolts, null), taskToComponents, taskId);
}

/**
* Get storm configuration from StreamingRuntimeContext.
* @param ctx The RuntimeContext of operator.
* @return The storm configuration map.
* @throws Exception
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JavaDoc incomplete

* If configuration contains classes from the user code, it may lead to ClassNotFoundException.
*/
public static Map getStormConfFromContext(final RuntimeContext ctx)
throws Exception {
Map stormConf = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test for this method.

if (ctx instanceof StreamingRuntimeContext) {
Configuration jobConfiguration = ((StreamingRuntimeContext) ctx).getJobConfiguration();

if (jobConfiguration != null) {
/* topologies mode */
stormConf = (Map) InstantiationUtil.readObjectFromConfig(jobConfiguration, StormConfig.STORM_DEFAULT_CONFIG, StormWrapperSetupHelper.class.getClassLoader());

/* embedded mode */
if (stormConf == null) {
byte[] bytes = (byte[])jobConfiguration.getBytes(ExecutionConfig.CONFIG_KEY, null);
if (bytes != null) {
ExecutionConfig executionConfig;
executionConfig = (ExecutionConfig)InstantiationUtil.deserializeObject(bytes, StormWrapperSetupHelper.class.getClassLoader());
Configuration jobParameters = (Configuration)executionConfig.getGlobalJobParameters();
if (jobParameters != null) {
stormConf = jobParameters.getConfDataClone();
}
}
}
}
}

return stormConf;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,47 @@
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.runtime.taskmanager.RuntimeEnvironment;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.stormcompatibility.util.AbstractTest;
import org.apache.flink.stormcompatibility.util.SplitStreamType;
import org.apache.flink.stormcompatibility.util.StormConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;

import org.apache.flink.util.InstantiationUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.*;

@RunWith(PowerMockRunner.class)
Expand Down Expand Up @@ -218,6 +240,64 @@ public void testOpenSink() throws Exception {
verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNull(OutputCollector.class));
}

@SuppressWarnings("unchecked")
@Test
public void testOpenWithStormConf() throws Exception {
final IRichBolt bolt = mock(IRichBolt.class);
final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);

Map stormConf = new HashMap();
stormConf.put("path", "/home/user/file.txt");
stormConf.put(1, 1024);
byte[] bytes = InstantiationUtil.serializeObject(stormConf);
Configuration jobConfiguration = new Configuration();
jobConfiguration.setBytes(StormConfig.STORM_DEFAULT_CONFIG, bytes);
jobConfiguration.setInteger("port", 5566);
Environment env = new RuntimeEnvironment(new JobID(), new JobVertexID(), new ExecutionAttemptID(),
new String(), new String(), 1, 2, jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
mock(MemoryManager.class), mock(IOManager.class), mock(BroadcastVariableManager.class),
mock(AccumulatorRegistry.class), mock(InputSplitProvider.class), mock(Map.class),
new ResultPartitionWriter[1], new InputGate[1], mock(ActorGateway.class),
mock(TaskManagerRuntimeInfo.class));
StreamingRuntimeContext ctx = new StreamingRuntimeContext(env, new ExecutionConfig(),
mock(KeySelector.class),
mock(StateHandleProvider.class), mock(Map.class));

wrapper.setup(mock(Output.class), ctx);
wrapper.open(mock(Configuration.class));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you mock here and not use jobConfiguration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The open() is usually called by openAllOperators(), and the Configuration config parameter is usually task Configuration, not job Configuration. So i mock it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I understand. This is the unused TaskConfiguration.


verify(bolt).prepare(eq(stormConf), any(TopologyContext.class), any(OutputCollector.class));
}

@SuppressWarnings("unchecked")
@Test
public void testOpenWithJobConf() throws Exception {
final IRichBolt bolt = mock(IRichBolt.class);
final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);

ExecutionConfig executionConfig = new ExecutionConfig();
Configuration jobParameters = new Configuration();
jobParameters.setString("path", "/home/user/file.txt");
executionConfig.setGlobalJobParameters(jobParameters);
byte[] bytes = InstantiationUtil.serializeObject(executionConfig);
Configuration jobConfiguration = new Configuration();
jobConfiguration.setBytes(ExecutionConfig.CONFIG_KEY, bytes);
Environment env = new RuntimeEnvironment(new JobID(), new JobVertexID(), new ExecutionAttemptID(),
new String(), new String(), 1, 2, jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
mock(MemoryManager.class), mock(IOManager.class), mock(BroadcastVariableManager.class),
mock(AccumulatorRegistry.class), mock(InputSplitProvider.class), mock(Map.class),
new ResultPartitionWriter[1], new InputGate[1], mock(ActorGateway.class),
mock(TaskManagerRuntimeInfo.class));
StreamingRuntimeContext ctx = new StreamingRuntimeContext(env, new ExecutionConfig(),
mock(KeySelector.class),
mock(StateHandleProvider.class), mock(Map.class));

wrapper.setup(mock(Output.class), ctx);
wrapper.open(mock(Configuration.class));

verify(bolt).prepare(eq(jobParameters.toMap()), any(TopologyContext.class), any(OutputCollector.class));
}

@SuppressWarnings("unchecked")
@Test
public void testClose() throws Exception {
Expand Down