From 4303d6541cd539715d64780b1c4dc2b883ead99e Mon Sep 17 00:00:00 2001 From: HuangWHWHW <404823056@qq.com> Date: Mon, 17 Aug 2015 11:31:02 +0800 Subject: [PATCH 1/3] [FLINK-2534][RUNTIME]Improve in CompactingHashTable.java --- .../operators/hash/CompactingHashTable.java | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java index d07c7e38deb62..b5c16713f32ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java @@ -223,7 +223,7 @@ public CompactingHashTable(TypeSerializer buildSideSerializer, // check the size of the first buffer and record it. all further buffers must have the same size. // the size must also be a power of 2 this.segmentSize = memorySegments.get(0).size(); - if ( (this.segmentSize & this.segmentSize - 1) != 0) { + if ((this.segmentSize & this.segmentSize - 1) != 0) { throw new IllegalArgumentException("Hash Table requires buffers whose size is a power of 2."); } @@ -380,8 +380,6 @@ public void insertOrReplaceRecord(T record) throws IOException { int countInSegment = bucket.getInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET); int numInSegment = 0; int posInSegment = bucketInSegmentOffset + BUCKET_HEADER_LENGTH; - - long currentForwardPointer = BUCKET_FORWARD_POINTER_NOT_SET; // loop over all segments that are involved in the bucket (original bucket plus overflow buckets) while (true) { @@ -390,13 +388,13 @@ public void insertOrReplaceRecord(T record) throws IOException { final int thisCode = bucket.getInt(posInSegment); posInSegment += HASH_CODE_LEN; + numInSegment++; // check if the hash code matches if (thisCode == searchHashCode) { // get the pointer to the pair final int pointerOffset = bucketInSegmentOffset + BUCKET_POINTER_START_OFFSET + (numInSegment * POINTER_LEN); final long pointer = bucket.getLong(pointerOffset); - numInSegment++; // deserialize the key to check whether it is really equal, or whether we had only a hash collision T valueAtPosition = partition.readRecordAt(pointer); @@ -406,9 +404,6 @@ public void insertOrReplaceRecord(T record) throws IOException { return; } } - else { - numInSegment++; - } } // this segment is done. check if there is another chained bucket @@ -436,7 +431,6 @@ public void insertOrReplaceRecord(T record) throws IOException { countInSegment = bucket.getInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET); posInSegment = bucketInSegmentOffset + BUCKET_HEADER_LENGTH; numInSegment = 0; - currentForwardPointer = newForwardPointer; } } @@ -869,9 +863,7 @@ private static int getInitialTableSize(int numBuffers, int bufferSize, int numPa final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES); final long bucketBytes = numRecordsStorable * RECORD_OVERHEAD_BYTES; long numBuckets = bucketBytes / (2 * HASH_BUCKET_SIZE) + 1; - while(numBuckets % numPartitions != 0) { - numBuckets++; - } + numBuckets += numPartitions - numBuckets % numPartitions; return numBuckets > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) numBuckets; } From 3bc7a124718e618656079cca8f78bf58b61eb5ec Mon Sep 17 00:00:00 2001 From: HuangWHWHW <404823056@qq.com> Date: Mon, 17 Aug 2015 14:20:58 +0800 Subject: [PATCH 2/3] [FLINK-2534][runtime]fix a bug and rerun CI --- .../flink/runtime/operators/hash/CompactingHashTable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java index b5c16713f32ba..247606af54a5d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java @@ -388,7 +388,6 @@ public void insertOrReplaceRecord(T record) throws IOException { final int thisCode = bucket.getInt(posInSegment); posInSegment += HASH_CODE_LEN; - numInSegment++; // check if the hash code matches if (thisCode == searchHashCode) { @@ -404,6 +403,7 @@ public void insertOrReplaceRecord(T record) throws IOException { return; } } + numInSegment++; } // this segment is done. check if there is another chained bucket From ffab7428815e9050f61e6a1a0921551f1f4185ec Mon Sep 17 00:00:00 2001 From: HuangWHWHW <404823056@qq.com> Date: Mon, 17 Aug 2015 16:49:12 +0800 Subject: [PATCH 3/3] [FLINK-2534]update a little changes Remove a space by mistake --- .../flink/runtime/operators/hash/CompactingHashTable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java index 247606af54a5d..ff6548e059288 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java @@ -223,7 +223,7 @@ public CompactingHashTable(TypeSerializer buildSideSerializer, // check the size of the first buffer and record it. all further buffers must have the same size. // the size must also be a power of 2 this.segmentSize = memorySegments.get(0).size(); - if ((this.segmentSize & this.segmentSize - 1) != 0) { + if ( (this.segmentSize & this.segmentSize - 1) != 0) { throw new IllegalArgumentException("Hash Table requires buffers whose size is a power of 2."); }