Skip to content

Commit

Permalink
[FLINK-2545] add bucket member verification while build bloom filter.
Browse files Browse the repository at this point in the history
This closes apache#1067
  • Loading branch information
chengxiang li authored and StephanEwen committed Sep 1, 2015
1 parent 4789366 commit 9af4800
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1120,9 +1120,14 @@ protected int spillPartition() throws IOException {
final protected void buildBloomFilterForBucketsInPartition(int partNum, HashPartition<BT, PT> partition) {
// Find all the buckets which belongs to this partition, and build bloom filter for each bucket(include its overflow buckets).
final int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
for (MemorySegment segment : this.buckets) {
for (int i = 0; i < bucketsPerSegment; i++) {
final int bucketInSegmentOffset = i * HASH_BUCKET_SIZE;

int numSegs = this.buckets.length;
// go over all segments that are part of the table
for (int i = 0, bucket = 0; i < numSegs && bucket < numBuckets; i++) {
final MemorySegment segment = this.buckets[i];
// go over all buckets in the segment
for (int k = 0; k < bucketsPerSegment && bucket < numBuckets; k++, bucket++) {
final int bucketInSegmentOffset = k * HASH_BUCKET_SIZE;
byte partitionNumber = segment.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET);
if (partitionNumber == partNum) {
byte status = segment.get(bucketInSegmentOffset + HEADER_STATUS_OFFSET);
Expand All @@ -1140,6 +1145,10 @@ final protected void buildBloomFilterForBucketsInPartition(int partNum, HashPart
*/
final void buildBloomFilterForBucket(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
final int count = bucket.getShort(bucketInSegmentPos + HEADER_COUNT_OFFSET);
if (count <= 0) {
return;
}

int[] hashCodes = new int[count];
// As the hashcode and bloom filter occupy same bytes, so we read all hashcode out at first and then write back to bloom filter.
for (int i = 0; i < count; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1476,6 +1476,65 @@ public void testInMemoryReOpenWithSmallMemory() throws Exception {

this.memManager.release(join.getFreedMemory());
}

@Test
public void testBucketsNotFulfillSegment() throws Exception {
final int NUM_KEYS = 10000;
final int BUILD_VALS_PER_KEY = 3;
final int PROBE_VALS_PER_KEY = 10;

// create a build input that gives 30000 pairs with 3 values sharing the same key
MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);

// create a probe input that gives 100000 pairs with 10 values sharing a key
MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);

// allocate the memory for the HashTable
List<MemorySegment> memSegments;
try {
// 33 is minimum number of pages required to perform hash join this inputs
memSegments = this.memManager.allocatePages(MEM_OWNER, 33);
}
catch (MemoryAllocationException maex) {
fail("Memory for the Join could not be provided.");
return;
}

// For FLINK-2545, the buckets data may not fulfill it's buffer, for example, the buffer may contains 256 buckets,
// while hash table only assign 250 bucket on it. The unused buffer bytes may contains arbitrary data, which may
// influence hash table if forget to skip it. To mock this, put the invalid bucket data(partition=1, inMemory=true, count=-1)
// at the end of buffer.
for (MemorySegment segment : memSegments) {
int newBucketOffset = segment.size() - 128;
// initialize the header fields
segment.put(newBucketOffset + 0, (byte)0);
segment.put(newBucketOffset + 1, (byte)0);
segment.putShort(newBucketOffset + 2, (short) -1);
segment.putLong(newBucketOffset + 4, ~0x0L);
}

// ----------------------------------------------------------------------------------------

final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair, IntPair>(
this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
memSegments, ioManager);
join.open(buildInput, probeInput);

final IntPair recordReuse = new IntPair();
int numRecordsInJoinResult = 0;

while (join.nextRecord()) {
HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
while (buildSide.next(recordReuse) != null) {
numRecordsInJoinResult++;
}
}
Assert.assertEquals("Wrong number of records in join result.", NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);

join.close();
this.memManager.release(join.getFreedMemory());
}

// ============================================================================================

Expand Down

0 comments on commit 9af4800

Please sign in to comment.