Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2076] [runtime] Fix memory leakage in MutableHashTable #751

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,10 @@ protected boolean prepareNextPartition() throws IOException {

List<MemorySegment> memory = new ArrayList<MemorySegment>();
memory.add(getNextBuffer());
memory.add(getNextBuffer());
MemorySegment nextBuffer = getNextBuffer();
if (nextBuffer != null) {
memory.add(nextBuffer);
}

ChannelReaderInputViewIterator<PT> probeReader = new ChannelReaderInputViewIterator<PT>(this.currentSpilledProbeSide,
returnQueue, memory, this.availableMemory, this.probeSideSerializer, p.getProbeSideBlockCount());
Expand Down Expand Up @@ -652,6 +655,7 @@ public void close() {
throw new RuntimeException("Hashtable closing was interrupted");
}
}
this.writeBehindBuffersAvailable = 0;
}

public void abort() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1400,6 +1400,82 @@ public void testInMemoryReOpen() throws IOException

this.memManager.release(join.getFreedMemory());
}

/*
* This test is same as `testInMemoryReOpen()` but only number of keys and pages are different. This test
* validates a bug fix MutableHashTable memory leakage with small memory segments.
*/
@Test
public void testInMemoryReOpenWithSmallMemory() throws Exception {
final int NUM_KEYS = 10000;
final int BUILD_VALS_PER_KEY = 3;
final int PROBE_VALS_PER_KEY = 10;

// create a build input that gives 30000 pairs with 3 values sharing the same key
MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);

// create a probe input that gives 100000 pairs with 10 values sharing a key
MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);

// allocate the memory for the HashTable
List<MemorySegment> memSegments;
try {
// 33 is minimum number of pages required to perform hash join this inputs
memSegments = this.memManager.allocatePages(MEM_OWNER, 33);
}
catch (MemoryAllocationException maex) {
fail("Memory for the Join could not be provided.");
return;
}

// ----------------------------------------------------------------------------------------

final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair, IntPair>(
this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
memSegments, ioManager);
join.open(buildInput, probeInput);

final IntPair recordReuse = new IntPair();
int numRecordsInJoinResult = 0;

while (join.nextRecord()) {
HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
while (buildSide.next(recordReuse) != null) {
numRecordsInJoinResult++;
}
}
Assert.assertEquals("Wrong number of records in join result.", NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);

join.close();

// ----------------------------------------------------------------------------------------
// recreate the inputs

// create a build input that gives 30000 pairs with 3 values sharing the same key
buildInput = new UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);

// create a probe input that gives 100000 pairs with 10 values sharing a key
probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);

join.open(buildInput, probeInput);

numRecordsInJoinResult = 0;

while (join.nextRecord()) {
HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
while (buildSide.next(recordReuse) != null) {
numRecordsInJoinResult++;
}
}
Assert.assertEquals("Wrong number of records in join result.", NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);

join.close();

// ----------------------------------------------------------------------------------------

this.memManager.release(join.getFreedMemory());
}

// ============================================================================================

Expand Down