Skip to content

Commit

Permalink
[hotfix] Code cleanups in the StreamConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Jul 29, 2015
1 parent 7bd57d7 commit 833862a
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 21 deletions.
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
Expand All @@ -37,6 +38,10 @@ public class StreamConfig implements Serializable {

private static final long serialVersionUID = 1L;

// ------------------------------------------------------------------------
// Config Keys
// ------------------------------------------------------------------------

private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
private static final String NUMBER_OF_INPUTS = "numberOfInputs";
private static final String CHAINED_OUTPUTS = "chainedOutputs";
Expand All @@ -59,16 +64,22 @@ public class StreamConfig implements Serializable {
private static final String EDGES_IN_ORDER = "edgesInOrder";
private static final String OUT_STREAM_EDGES = "outStreamEdges";
private static final String IN_STREAM_EDGES = "inStreamEdges";

private static final String CHECKPOINTING_ENABLED = "checkpointing";
private static final String STATEHANDLE_PROVIDER = "stateHandleProvider";
private static final String STATE_PARTITIONER = "statePartitioner";

// DEFAULT VALUES
// ------------------------------------------------------------------------
// Default Values
// ------------------------------------------------------------------------

private static final long DEFAULT_TIMEOUT = 100;
public static final String STATE_MONITORING = "STATE_MONITORING";

// CONFIG METHODS
// ------------------------------------------------------------------------
// Config
// ------------------------------------------------------------------------

private Configuration config;
private final Configuration config;

public StreamConfig(Configuration config) {
this.config = config;
Expand All @@ -78,6 +89,11 @@ public Configuration getConfiguration() {
return config;
}

// ------------------------------------------------------------------------
// Configured Properties
// ------------------------------------------------------------------------


public void setVertexID(Integer vertexID) {
config.setInteger(VERTEX_NAME, vertexID);
}
Expand Down Expand Up @@ -335,12 +351,12 @@ public List<StreamEdge> getInPhysicalEdges(ClassLoader cl) {
}
}

public void setStateMonitoring(boolean stateMonitoring) {
config.setBoolean(STATE_MONITORING, stateMonitoring);
public void setCheckpointingEnabled(boolean enabled) {
config.setBoolean(CHECKPOINTING_ENABLED, enabled);
}

public boolean getStateMonitoring() {
return config.getBoolean(STATE_MONITORING, false);
public boolean isCheckpointingEnabled() {
return config.getBoolean(CHECKPOINTING_ENABLED, false);
}

public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) {
Expand Down Expand Up @@ -435,28 +451,29 @@ public String toString() {
builder.append("\n=======================");
builder.append("Stream Config");
builder.append("=======================");
builder.append("\nTask name: " + getVertexID());
builder.append("\nNumber of non-chained inputs: " + getNumberOfInputs());
builder.append("\nNumber of non-chained outputs: " + getNumberOfOutputs());
builder.append("\nOutput names: " + getNonChainedOutputs(cl));
builder.append("\nTask name: ").append(getVertexID());
builder.append("\nNumber of non-chained inputs: ").append(getNumberOfInputs());
builder.append("\nNumber of non-chained outputs: ").append(getNumberOfOutputs());
builder.append("\nOutput names: ").append(getNonChainedOutputs(cl));
builder.append("\nPartitioning:");
for (StreamEdge output : getNonChainedOutputs(cl)) {
int outputname = output.getTargetId();
builder.append("\n\t" + outputname + ": " + output.getPartitioner());
builder.append("\n\t").append(outputname).append(": ").append(output.getPartitioner());
}

builder.append("\nChained subtasks: " + getChainedOutputs(cl));
builder.append("\nChained subtasks: ").append(getChainedOutputs(cl));

try {
builder.append("\nOperator: " + getStreamOperator(cl).getClass().getSimpleName());
} catch (Exception e) {
builder.append("\nOperator: ").append(getStreamOperator(cl).getClass().getSimpleName());
}
catch (Exception e) {
builder.append("\nOperator: Missing");
}
builder.append("\nBuffer timeout: " + getBufferTimeout());
builder.append("\nState Monitoring: " + getStateMonitoring());
builder.append("\nBuffer timeout: ").append(getBufferTimeout());
builder.append("\nState Monitoring: ").append(isCheckpointingEnabled());
if (isChainStart() && getChainedOutputs(cl).size() > 0) {
builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n");
builder.append(getTransitiveChainedTaskConfigs(cl)).toString();
builder.append(getTransitiveChainedTaskConfigs(cl));
}

return builder.toString();
Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -66,14 +67,19 @@
*/
public class StreamGraph extends StreamingPlan {

/** The default interval for checkpoints, in milliseconds */
public static final int DEFAULT_CHECKPOINTING_INTERVAL_MS = 5000;

private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);

private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;

private final StreamExecutionEnvironment environemnt;
private final ExecutionConfig executionConfig;

private CheckpointingMode checkpointingMode;
private boolean checkpointingEnabled = false;
private long checkpointingInterval = 5000;
private long checkpointingInterval = DEFAULT_CHECKPOINTING_INTERVAL_MS;
private boolean chaining = true;

private Map<Integer, StreamNode> streamNodes;
Expand Down
Expand Up @@ -269,7 +269,7 @@ private void setVertexConfig(Integer vertexID, StreamConfig config,
config.setNumberOfOutputs(nonChainableOutputs.size());
config.setNonChainedOutputs(nonChainableOutputs);
config.setChainedOutputs(chainableOutputs);
config.setStateMonitoring(streamGraph.isCheckpointingEnabled());
config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
config.setStateHandleProvider(streamGraph.getStateHandleProvider());
config.setStatePartitioner((KeySelector<?, Serializable>) vertex.getStatePartitioner());

Expand Down

0 comments on commit 833862a

Please sign in to comment.