Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2462] [streaming] Major cleanup of streaming task structure #1017

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,13 @@ public void flush() throws IOException {
}

public void clearBuffers() {
if (serializers != null) {
for (RecordSerializer<?> s : serializers) {
synchronized (s) {
Buffer b = s.getCurrentBuffer();
s.clear();

if (b != null) {
b.recycle();
}
for (RecordSerializer<?> s : serializers) {
synchronized (s) {
Buffer b = s.getCurrentBuffer();
s.clear();

if (b != null) {
b.recycle();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public abstract class AbstractInvokable {
/**
* Must be overwritten by the concrete task to instantiate the required record reader and record writer.
*/
public abstract void registerInputOutput();
public abstract void registerInputOutput() throws Exception;

/**
* Must be overwritten by the concrete task. This method is called by the task manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,18 @@

package org.apache.flink.runtime.jobgraph.tasks;


/**
* This interface needs to be implemented by runtime tasks that want to be able to receive
* notifications about completed checkpoints.
*/
public interface CheckpointNotificationOperator {


/**
* Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
* the notification from all participating tasks.
*
* @param checkpointId The ID of the checkpoint that is complete..
* @throws Exception The notification method may forward its exceptions.
*/
void notifyCheckpointComplete(long checkpointId) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
* and as a setup method on the TaskManager.
*/
@Override
public void registerInputOutput() {
public void registerInputOutput() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(formatLogString("Start registering input and output."));
}
Expand All @@ -239,26 +239,13 @@ public void registerInputOutput() {
final Class<? extends PactDriver<S, OT>> driverClass = this.config.getDriver();
this.driver = InstantiationUtil.instantiate(driverClass, PactDriver.class);

// initialize the readers. this is necessary for nephele to create the input gates
// however, this does not trigger any local processing.
try {
initInputReaders();
initBroadcastInputReaders();
} catch (Exception e) {
throw new RuntimeException("Initializing the input streams failed in Task " + getEnvironment().getTaskName() +
(e.getMessage() == null ? "." : ": " + e.getMessage()), e);
}
// initialize the readers.
// this does not yet trigger any stream consuming or processing.
initInputReaders();
initBroadcastInputReaders();

// initialize the writers. this is necessary for nephele to create the output gates.
// because in the presence of chained tasks, the tasks writers depend on the last task in the chain,
// we need to initialize the chained tasks as well. the chained tasks are only set up, but no work
// (such as setting up a sorter, etc.) starts
try {
initOutputs();
} catch (Exception e) {
throw new RuntimeException("Initializing the output handlers failed" +
(e.getMessage() == null ? "." : ": " + e.getMessage()), e);
}
// initialize the writers.
initOutputs();

if (LOG.isDebugEnabled()) {
LOG.debug(formatLogString("Finished registering input and output."));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@

public class MockEnvironment implements Environment {

private final String taskName;

private final MemoryManager memManager;

private final IOManager ioManager;
Expand All @@ -85,7 +87,8 @@ public class MockEnvironment implements Environment {

private final int bufferSize;

public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
this.taskName = taskName;
this.jobConfiguration = new Configuration();
this.taskConfiguration = new Configuration();
this.inputs = new LinkedList<InputGate>();
Expand Down Expand Up @@ -214,12 +217,12 @@ public InputSplitProvider getInputSplitProvider() {

@Override
public String getTaskName() {
return null;
return taskName;
}

@Override
public String getTaskNameWithSubtasks() {
return null;
return taskName + "(0/1)";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,11 @@ public abstract class TaskTestBase {
public void initEnvironment(long memorySize, int bufferSize) {
this.memorySize = memorySize;
this.inputSplitProvider = new MockInputSplitProvider();
this.mockEnv = new MockEnvironment(this.memorySize, this.inputSplitProvider, bufferSize);
this.mockEnv = new MockEnvironment("mock task", this.memorySize, this.inputSplitProvider, bufferSize);
}

public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId) {
final IteratorWrappingTestSingleInputGate<Record> reader = addInput(input, groupId, true);

return reader;
return addInput(input, groupId, true);
}

public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId, boolean read) {
Expand Down Expand Up @@ -89,19 +87,32 @@ public Configuration getConfiguration() {
return this.mockEnv.getTaskConfiguration();
}

public void registerTask(AbstractInvokable task, @SuppressWarnings("rawtypes") Class<? extends PactDriver> driver, Class<? extends RichFunction> stubClass) {
public void registerTask(AbstractInvokable task,
@SuppressWarnings("rawtypes") Class<? extends PactDriver> driver,
Class<? extends RichFunction> stubClass) {

final TaskConfig config = new TaskConfig(this.mockEnv.getTaskConfiguration());
config.setDriver(driver);
config.setStubWrapper(new UserCodeClassWrapper<RichFunction>(stubClass));

task.setEnvironment(this.mockEnv);

task.registerInputOutput();
try {
task.registerInputOutput();
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}

public void registerTask(AbstractInvokable task) {
task.setEnvironment(this.mockEnv);
task.registerInputOutput();
try {
task.registerInputOutput();
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}

public void registerFileOutputTask(AbstractInvokable outTask, Class<? extends FileOutputFormat> stubClass, String outPath) {
Expand All @@ -118,7 +129,12 @@ public void registerFileOutputTask(AbstractInvokable outTask, FileOutputFormat o

outTask.setEnvironment(this.mockEnv);

outTask.registerInputOutput();
try {
outTask.registerInputOutput();
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}

public void registerFileInputTask(AbstractInvokable inTask,
Expand All @@ -142,7 +158,12 @@ public void registerFileInputTask(AbstractInvokable inTask,

inTask.setEnvironment(this.mockEnv);

inTask.registerInputOutput();
try {
inTask.registerInputOutput();
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}

public MemoryManager getMemoryManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class StreamEdge implements Serializable {
* A list of output names that the target vertex listens to (if there is
* output selection).
*/
final private List<String> selectedNames;
private final List<String> selectedNames;
private StreamPartitioner<?> outputPartitioner;

public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
Expand Down Expand Up @@ -108,11 +108,7 @@ public boolean equals(Object o) {

StreamEdge that = (StreamEdge) o;

if (!edgeId.equals(that.edgeId)) {
return false;
}

return true;
return edgeId.equals(that.edgeId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String, Op
public Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> getStateSnapshotFromFunction(long checkpointId, long timestamp)
throws Exception {
// Get all the states for the operator
Map<String, StreamOperatorState> operatorStates = runtimeContext.getOperatorStates();
Map<String, StreamOperatorState<?, ?>> operatorStates = runtimeContext.getOperatorStates();

Map<String, OperatorStateHandle> operatorStateSnapshots;
if (operatorStates.isEmpty()) {
Expand All @@ -108,7 +108,7 @@ public Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> getSt
// Checkpoint the states and store the handles in a map
Map<String, OperatorStateHandle> snapshots = new HashMap<String, OperatorStateHandle>();

for (Entry<String, StreamOperatorState> state : operatorStates.entrySet()) {
for (Entry<String, StreamOperatorState<?, ?>> state : operatorStates.entrySet()) {
boolean isPartitioned = state.getValue() instanceof PartitionedStreamOperatorState;
snapshots.put(state.getKey(),
new OperatorStateHandle(state.getValue().snapshotState(checkpointId, timestamp),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,16 @@
* of this interface that can be used to emit elements and other messages, such as barriers
* and watermarks, from an operator.
*
* @param <T> The type of the elments that can be emitted.
* @param <T> The type of the elements that can be emitted.
*/
public interface Output<T> extends Collector<T> {

/**
* Emits a {@link Watermark} from an operator. This watermark is broadcast to all downstream
* operators.
*
* <p>
* A watermark specifies that no element with a timestamp older or equal to the watermark
* timestamp will be emitted in the future.
* <p>A watermark specifies that no element with a timestamp older or equal to the watermark
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change this? The oracle Javadoc literature (http://www.oracle.com/technetwork/articles/java/index-137868.html) and style guides (http://blog.joda.org/2012/11/javadoc-coding-standards.html) have it like it was. Javadoc is not HTML so tags like <li>, <p> and so on are not closed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I think I saw it differently in some of Sun's classes, and copied the style.

It seems the changes to not hurt (JavaDocs interpret the HTML properly), but I'll stick with the official style in the future. Thanks for pointing that out.

* timestamp will be emitted in the future.</p>
*/
void emitWatermark(Watermark mark);
}