Skip to content

Commit

Permalink
[FLINK-2648] [FLINK-2717] [runtime] Harden memory release in sorters …
Browse files Browse the repository at this point in the history
…against asynchronous canceling
  • Loading branch information
StephanEwen committed Sep 21, 2015
1 parent bd74bae commit 435ee4e
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 185 deletions.
Expand Up @@ -74,10 +74,10 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin

private QuickSort sortAlgo = new QuickSort();

private MemoryManager memManager;

private Collector<OUT> output;

private List<MemorySegment> memory;

private long oversizedRecordCount;

private volatile boolean running = true;
Expand Down Expand Up @@ -115,9 +115,8 @@ public void prepare() throws Exception {
if (driverStrategy != DriverStrategy.SORTED_GROUP_COMBINE){
throw new Exception("Invalid strategy " + driverStrategy + " for group reduce combiner.");
}

this.memManager = this.taskContext.getMemoryManager();
final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig().getRelativeMemoryDriver());



final TypeSerializerFactory<IN> serializerFactory = this.taskContext.getInputSerializer(0);
this.serializer = serializerFactory.getSerializer();
Expand All @@ -128,8 +127,9 @@ public void prepare() throws Exception {
this.combiner = this.taskContext.getStub();
this.output = this.taskContext.getOutputCollector();

final List<MemorySegment> memory = this.memManager.allocatePages(this.taskContext.getOwningNepheleTask(),
numMemoryPages);
MemoryManager memManager = this.taskContext.getMemoryManager();
final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig().getRelativeMemoryDriver());
this.memory = memManager.allocatePages(this.taskContext.getOwningNepheleTask(), numMemoryPages);

// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
if (sortingComparator.supportsSerializationWithKeyNormalization() &&
Expand Down Expand Up @@ -218,16 +218,26 @@ private void sortAndCombine() throws Exception {
@Override
public void cleanup() throws Exception {
if (this.sorter != null) {
this.memManager.release(this.sorter.dispose());
this.sorter.dispose();
}

this.taskContext.getMemoryManager().release(this.memory);
}

@Override
public void cancel() {
this.running = false;

if (this.sorter != null) {
this.memManager.release(this.sorter.dispose());
try {
this.sorter.dispose();
}
catch (Exception e) {
// may happen during concurrent modification
}
}

this.taskContext.getMemoryManager().release(this.memory);
}

/**
Expand Down
Expand Up @@ -63,12 +63,12 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>

private Collector<T> output;

private MemoryManager memManager;

private InMemorySorter<T> sorter;

private QuickSort sortAlgo = new QuickSort();

private List<MemorySegment> memory;

private boolean running;

private boolean objectReuseEnabled = false;
Expand Down Expand Up @@ -105,18 +105,17 @@ public void prepare() throws Exception {
throw new Exception("Invalid strategy " + this.taskContext.getTaskConfig().getDriverStrategy() + " for reduce combiner.");
}

this.memManager = this.taskContext.getMemoryManager();
final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig()
.getRelativeMemoryDriver());

// instantiate the serializer / comparator
final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0);
this.comparator = this.taskContext.getDriverComparator(0);
this.serializer = serializerFactory.getSerializer();
this.reducer = this.taskContext.getStub();
this.output = this.taskContext.getOutputCollector();

final List<MemorySegment> memory = this.memManager.allocatePages(this.taskContext.getOwningNepheleTask(), numMemoryPages);
MemoryManager memManager = this.taskContext.getMemoryManager();
final int numMemoryPages = memManager.computeNumberOfPages(
this.taskContext.getTaskConfig().getRelativeMemoryDriver());
this.memory = memManager.allocatePages(this.taskContext.getOwningNepheleTask(), numMemoryPages);

// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
if (this.comparator.supportsSerializationWithKeyNormalization() &&
Expand Down Expand Up @@ -241,12 +240,21 @@ private void sortAndCombine() throws Exception {

@Override
public void cleanup() {
this.memManager.release(this.sorter.dispose());
this.sorter.dispose();
this.taskContext.getMemoryManager().release(this.memory);
}

@Override
public void cancel() {
this.running = false;
this.memManager.release(this.sorter.dispose());

try {
this.sorter.dispose();
}
catch (Exception e) {
// may happen during concurrent modifications
}

this.taskContext.getMemoryManager().release(this.memory);
}
}
Expand Up @@ -71,15 +71,13 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {

private TypeSerializer<IN> serializer;

private TypeComparator<IN> sortingComparator;

private TypeComparator<IN> groupingComparator;

private AbstractInvokable parent;

private QuickSort sortAlgo = new QuickSort();

private MemoryManager memManager;
private List<MemorySegment> memory;

private volatile boolean running = true;

Expand All @@ -102,28 +100,28 @@ public void openTask() throws Exception {
final Configuration stubConfig = this.config.getStubParameters();
RegularPactTask.openUserCode(this.reducer, stubConfig);

// ----------------- Set up the asynchronous sorter -------------------------

this.memManager = this.parent.getEnvironment().getMemoryManager();
final int numMemoryPages = memManager.computeNumberOfPages(this.config.getRelativeMemoryDriver());
// ----------------- Set up the sorter -------------------------

// instantiate the serializer / comparator
final TypeSerializerFactory<IN> serializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);
final TypeComparatorFactory<IN> sortingComparatorFactory = this.config.getDriverComparator(0, this.userCodeClassLoader);
final TypeComparatorFactory<IN> groupingComparatorFactory = this.config.getDriverComparator(1, this.userCodeClassLoader);
this.serializer = serializerFactory.getSerializer();
this.sortingComparator = sortingComparatorFactory.createComparator();

TypeComparator<IN> sortingComparator = sortingComparatorFactory.createComparator();
this.groupingComparator = groupingComparatorFactory.createComparator();

final List<MemorySegment> memory = this.memManager.allocatePages(this.parent, numMemoryPages);
MemoryManager memManager = this.parent.getEnvironment().getMemoryManager();
final int numMemoryPages = memManager.computeNumberOfPages(this.config.getRelativeMemoryDriver());
this.memory = memManager.allocatePages(this.parent, numMemoryPages);

// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
if (this.sortingComparator.supportsSerializationWithKeyNormalization() &&
if (sortingComparator.supportsSerializationWithKeyNormalization() &&
this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
{
this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, this.sortingComparator, memory);
this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, sortingComparator, memory);
} else {
this.sorter = new NormalizedKeySorter<IN>(this.serializer, this.sortingComparator.duplicate(), memory);
this.sorter = new NormalizedKeySorter<IN>(this.serializer, sortingComparator.duplicate(), memory);
}

if (LOG.isDebugEnabled()) {
Expand All @@ -133,19 +131,30 @@ public void openTask() throws Exception {

@Override
public void closeTask() throws Exception {
this.memManager.release(this.sorter.dispose());

if (!this.running) {
return;
if (this.sorter != null) {
this.sorter.dispose();
}
this.parent.getEnvironment().getMemoryManager().release(this.memory);

RegularPactTask.closeUserCode(this.reducer);
if (this.running) {
RegularPactTask.closeUserCode(this.reducer);
}
}

@Override
public void cancelTask() {
this.running = false;
this.memManager.release(this.sorter.dispose());

if (this.sorter != null) {
try {
this.sorter.dispose();
}
catch (Exception e) {
// may happen during concurrent modification
}
}

this.parent.getEnvironment().getMemoryManager().release(this.memory);
}

// --------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -58,9 +58,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
private static final Logger LOG = LoggerFactory.getLogger(SynchronousChainedCombineDriver.class);


/**
* Fix length records with a length below this threshold will be in-place sorted, if possible.
*/
/** Fix length records with a length below this threshold will be in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;

// --------------------------------------------------------------------------------------------
Expand All @@ -71,16 +69,14 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,

private TypeSerializer<IN> serializer;

private TypeComparator<IN> sortingComparator;

private TypeComparator<IN> groupingComparator;

private AbstractInvokable parent;

private QuickSort sortAlgo = new QuickSort();

private MemoryManager memManager;
private final QuickSort sortAlgo = new QuickSort();

private List<MemorySegment> memory;

private volatile boolean running = true;

// --------------------------------------------------------------------------------------------
Expand All @@ -102,28 +98,29 @@ public void openTask() throws Exception {
final Configuration stubConfig = this.config.getStubParameters();
RegularPactTask.openUserCode(this.combiner, stubConfig);

// ----------------- Set up the asynchronous sorter -------------------------

this.memManager = this.parent.getEnvironment().getMemoryManager();
final int numMemoryPages = memManager.computeNumberOfPages(this.config.getRelativeMemoryDriver());
// ----------------- Set up the sorter -------------------------

// instantiate the serializer / comparator
final TypeSerializerFactory<IN> serializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);
final TypeComparatorFactory<IN> sortingComparatorFactory = this.config.getDriverComparator(0, this.userCodeClassLoader);
final TypeComparatorFactory<IN> groupingComparatorFactory = this.config.getDriverComparator(1, this.userCodeClassLoader);

this.serializer = serializerFactory.getSerializer();
this.sortingComparator = sortingComparatorFactory.createComparator();
this.groupingComparator = groupingComparatorFactory.createComparator();

final List<MemorySegment> memory = this.memManager.allocatePages(this.parent, numMemoryPages);
TypeComparator<IN> sortingComparator = sortingComparatorFactory.createComparator();
this.groupingComparator = groupingComparatorFactory.createComparator();

MemoryManager memManager = this.parent.getEnvironment().getMemoryManager();
final int numMemoryPages = memManager.computeNumberOfPages(this.config.getRelativeMemoryDriver());
this.memory = memManager.allocatePages(this.parent, numMemoryPages);

// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
if (this.sortingComparator.supportsSerializationWithKeyNormalization() &&
if (sortingComparator.supportsSerializationWithKeyNormalization() &&
this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
{
this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, this.sortingComparator, memory);
this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, sortingComparator, this.memory);
} else {
this.sorter = new NormalizedKeySorter<IN>(this.serializer, this.sortingComparator.duplicate(), memory);
this.sorter = new NormalizedKeySorter<IN>(this.serializer, sortingComparator.duplicate(), this.memory);
}

if (LOG.isDebugEnabled()) {
Expand All @@ -133,19 +130,25 @@ public void openTask() throws Exception {

@Override
public void closeTask() throws Exception {
this.memManager.release(this.sorter.dispose());
this.sorter.dispose();
this.parent.getEnvironment().getMemoryManager().release(this.memory);

if (!this.running) {
return;
if (this.running) {
RegularPactTask.closeUserCode(this.combiner);
}

RegularPactTask.closeUserCode(this.combiner);
}

@Override
public void cancelTask() {
this.running = false;
this.memManager.release(this.sorter.dispose());
try {
this.sorter.dispose();
}
catch (Exception e) {
// may happen during concurrent modification when canceling
}

this.parent.getEnvironment().getMemoryManager().release(this.memory);
}

// --------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -121,13 +121,7 @@ public FixedLengthRecordSorter(TypeSerializer<T> serializer, TypeComparator<T> c
this.lastEntryOffset = (this.recordsPerSegment - 1) * this.recordSize;
this.swapBuffer = new byte[this.recordSize];

if (memory instanceof ArrayList<?>) {
this.freeMemory = (ArrayList<MemorySegment>) memory;
}
else {
this.freeMemory = new ArrayList<MemorySegment>(memory.size());
this.freeMemory.addAll(memory);
}
this.freeMemory = new ArrayList<MemorySegment>(memory);

// create the buffer collections
this.sortBuffer = new ArrayList<MemorySegment>(16);
Expand Down Expand Up @@ -174,16 +168,10 @@ public boolean isEmpty() {
return this.numRecords == 0;
}

/**
* Collects all memory segments from this sorter.
*
* @return All memory segments from this sorter.
*/
@Override
public List<MemorySegment> dispose() {
this.freeMemory.addAll(this.sortBuffer);
public void dispose() {
this.freeMemory.clear();
this.sortBuffer.clear();
return this.freeMemory;
}

@Override
Expand All @@ -195,23 +183,16 @@ public long getCapacity() {
public long getOccupancy() {
return this.sortBufferBytes;
}

@Override
public long getNumRecordBytes() {
return this.sortBufferBytes;
}

// -------------------------------------------------------------------------
// Retrieving and Writing
// -------------------------------------------------------------------------

/**
* Gets the record at the given logical position.
*
* @param reuse The reuse object to deserialize the record into.
* @param logicalPosition The logical position of the record.
* @throws IOException Thrown, if an exception occurred during deserialization.
*/

@Override
public T getRecord(int logicalPosition) throws IOException {
return getRecord(serializer.createInstance(), logicalPosition);
}

@Override
public T getRecord(T reuse, int logicalPosition) throws IOException {
final int buffer = logicalPosition / this.recordsPerSegment;
Expand Down

0 comments on commit 435ee4e

Please sign in to comment.