Skip to content

Commit

Permalink
[FLINK-2406] [streaming] Abstract and improve stream alignment via th…
Browse files Browse the repository at this point in the history
…e BarrierBuffer

 - Add an interface for the functionaliy of the barrier buffer (for later addition of other implementatiions)
 - Add broader tests for the BarrierBuffer, inluding trailing data and barrier races.
 - Checkpoint barriers are handled by the buffer directly, rather than being returned and re-injected.
 - Simplify logic in the BarrierBuffer and fix certain corner cases.
 - Give access to spill directories properly via I/O manager, rather than via GlobalConfiguration singleton.
 - Rename the "BarrierBufferIOTest" to "BarrierBufferMassiveRandomTest"
 - A lot of code style/robustness fixes (proplery define constants, visibility, exception signatures)
  • Loading branch information
StephanEwen committed Jul 28, 2015
1 parent ed30ff4 commit 0579f90
Show file tree
Hide file tree
Showing 16 changed files with 1,175 additions and 564 deletions.
Expand Up @@ -301,9 +301,19 @@ public abstract BulkBlockChannelReader createBulkBlockChannelReader(FileIOChanne
* *
* @return The number of temporary file directories. * @return The number of temporary file directories.
*/ */
public int getNumberOfTempDirs() { public int getNumberOfSpillingDirectories() {
return this.paths.length; return this.paths.length;
} }

/**
* Gets the directories that the I/O manager spills to.
*
* @return The directories that the I/O manager spills to.
*/
public File[] getSpillingDirectories() {
return this.paths;
}



protected int getNextPathNum() { protected int getNextPathNum() {
final int next = this.nextPath; final int next = this.nextPath;
Expand Down
Expand Up @@ -22,19 +22,34 @@
import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.event.task.RuntimeEvent; import org.apache.flink.runtime.event.task.RuntimeEvent;


import java.io.IOException;


public class EndOfPartitionEvent extends RuntimeEvent { public class EndOfPartitionEvent extends RuntimeEvent {


public static final EndOfPartitionEvent INSTANCE = new EndOfPartitionEvent(); public static final EndOfPartitionEvent INSTANCE = new EndOfPartitionEvent();



@Override @Override
public void read(DataInputView in) throws IOException { public void read(DataInputView in) {
// Nothing to do here // Nothing to do here
} }


@Override @Override
public void write(DataOutputView out) throws IOException { public void write(DataOutputView out) {
// Nothing to do here // Nothing to do here
} }

@Override
public int hashCode() {
return 1965146673;
}

@Override
public boolean equals(Object obj) {
return obj != null && obj.getClass() == EndOfPartitionEvent.class;
}

@Override
public String toString() {
return getClass().getSimpleName();
}
} }
Expand Up @@ -408,7 +408,7 @@ public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedExcep


// Sanity check that notifications only happen when data is available // Sanity check that notifications only happen when data is available
if (buffer == null) { if (buffer == null) {
throw new IllegalStateException("Bug in input gate/channel logic: input gate got" + throw new IllegalStateException("Bug in input gate/channel logic: input gate got " +
"notified by channel about available data, but none was available."); "notified by channel about available data, but none was available.");
} }


Expand Down

0 comments on commit 0579f90

Please sign in to comment.