diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericPairComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericPairComparator.java index 09d4aeef499e7..e3bf4ca5acbd7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericPairComparator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericPairComparator.java @@ -20,9 +20,6 @@ import java.io.Serializable; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypePairComparator; - public class GenericPairComparator extends TypePairComparator implements Serializable { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java index 3b2eda53215e7..f326d89aff9bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java @@ -20,7 +20,6 @@ import java.io.IOException; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.operators.hash.CompactingHashTable; import org.apache.flink.util.Collector; @@ -38,23 +37,20 @@ public class SolutionSetFastUpdateOutputCollector implements Collector { private final Collector delegate; private final CompactingHashTable solutionSet; - - private final T tmpHolder; - public SolutionSetFastUpdateOutputCollector(CompactingHashTable solutionSet, TypeSerializer serializer) { - this(solutionSet, serializer, null); + public SolutionSetFastUpdateOutputCollector(CompactingHashTable solutionSet) { + this(solutionSet, null); } - public SolutionSetFastUpdateOutputCollector(CompactingHashTable solutionSet, TypeSerializer serializer, Collector delegate) { + public SolutionSetFastUpdateOutputCollector(CompactingHashTable solutionSet, Collector delegate) { this.solutionSet = solutionSet; this.delegate = delegate; - this.tmpHolder = serializer.createInstance(); } @Override public void collect(T record) { try { - solutionSet.insertOrReplaceRecord(record, tmpHolder); + solutionSet.insertOrReplaceRecord(record); if (delegate != null) { delegate.collect(record); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java index f400b7e6eba40..c39efa558075d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java @@ -16,12 +16,10 @@ * limitations under the License. */ - package org.apache.flink.runtime.iterative.io; import java.io.IOException; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.operators.hash.CompactingHashTable; import org.apache.flink.util.Collector; @@ -40,23 +38,20 @@ public class SolutionSetUpdateOutputCollector implements Collector { private final Collector delegate; private final CompactingHashTable solutionSet; - - private final T tmpHolder; - public SolutionSetUpdateOutputCollector(CompactingHashTable solutionSet, TypeSerializer serializer) { - this(solutionSet, serializer, null); + public SolutionSetUpdateOutputCollector(CompactingHashTable solutionSet) { + this(solutionSet, null); } - public SolutionSetUpdateOutputCollector(CompactingHashTable solutionSet, TypeSerializer serializer, Collector delegate) { + public SolutionSetUpdateOutputCollector(CompactingHashTable solutionSet, Collector delegate) { this.solutionSet = solutionSet; this.delegate = delegate; - this.tmpHolder = serializer.createInstance(); } @Override public void collect(T record) { try { - solutionSet.insertOrReplaceRecord(record, tmpHolder); + solutionSet.insertOrReplaceRecord(record); if (delegate != null) { delegate.collect(record); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java index 72434e67ef826..67d8f56373bbb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java @@ -329,8 +329,7 @@ protected Collector createSolutionSetUpdateOutputCollector(Collector del if (ss instanceof CompactingHashTable) { @SuppressWarnings("unchecked") CompactingHashTable solutionSet = (CompactingHashTable) ss; - TypeSerializer serializer = getOutputSerializer(); - return new SolutionSetUpdateOutputCollector(solutionSet, serializer, delegate); + return new SolutionSetUpdateOutputCollector(solutionSet, delegate); } else if (ss instanceof JoinHashMap) { @SuppressWarnings("unchecked") diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractMutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractMutableHashTable.java index f3e8a22bce69d..86ce0135e9396 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractMutableHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractMutableHashTable.java @@ -61,15 +61,13 @@ public TypeComparator getBuildSideComparator() { public abstract void abort(); - public abstract void buildTable(final MutableObjectIterator input) throws IOException; - public abstract List getFreeMemory(); // ------------- Modifier ------------- public abstract void insert(T record) throws IOException; - public abstract void insertOrReplaceRecord(T record, T tempHolder) throws IOException; + public abstract void insertOrReplaceRecord(T record) throws IOException; // ------------- Accessors ------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java index 6533e194a6f1a..d07c7e38deb62 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java @@ -22,10 +22,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -37,41 +37,20 @@ import org.apache.flink.util.MutableObjectIterator; /** - * An implementation of an in-memory Hash Table for variable-length records. - *

- * The design of this class follows on many parts the design presented in - * "Hash joins and hash teams in Microsoft SQL Server", by Goetz Graefe et al.. - *

- *


- * The layout of the buckets inside a memory segment is as follows: + * A hash table that uses Flink's managed memory and supports replacement of records or + * updates to records. For an overview of the general data structure of the hash table, please + * refer to the description of the {@link org.apache.flink.runtime.operators.hash.MutableHashTable}. + * + *

The hash table is internally divided into two parts: The hash index, and the partition buffers + * that store the actual records. When records are inserted or updated, the hash table appends the + * records to its corresponding partition, and inserts or updates the entry in the hash index. + * In the case that the hash table runs out of memory, it compacts a partition by walking through the + * hash index and copying all reachable elements into a fresh partition. After that, it releases the + * memory of the partition to compact.

* - *
- * +----------------------------- Bucket x ----------------------------
- * |Partition (1 byte) | reserved (3 bytes) | element count (4 bytes) |
- * | next-bucket-in-chain-pointer (8 bytes) |
- * |
- * |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) |
- * | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes)
- * |
- * |pointer 1 (8 bytes) | pointer 2 (8 bytes) | pointer 3 (8 bytes) |
- * | ... pointer n-1 (8 bytes) | pointer n (8 bytes)
- * |
- * +---------------------------- Bucket x + 1--------------------------
- * |Partition (1 byte) | reserved (3 bytes) | element count (4 bytes) |
- * | next-bucket-in-chain-pointer (8 bytes) |
- * |
- * |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) |
- * | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes)
- * |
- * |pointer 1 (8 bytes) | pointer 2 (8 bytes) | pointer 3 (8 bytes) |
- * | ... pointer n-1 (8 bytes) | pointer n (8 bytes)
- * +-------------------------------------------------------------------
- * | ...
- * |
- * 
* @param Record type stored in hash table */ -public class CompactingHashTable extends AbstractMutableHashTable{ +public class CompactingHashTable extends AbstractMutableHashTable { private static final Logger LOG = LoggerFactory.getLogger(CompactingHashTable.class); @@ -79,11 +58,10 @@ public class CompactingHashTable extends AbstractMutableHashTable{ // Internal Constants // ------------------------------------------------------------------------ + /** The minimum number of memory segments that the compacting hash table needs to work properly */ private static final int MIN_NUM_MEMORY_SEGMENTS = 33; - /** - * The maximum number of partitions - */ + /** The maximum number of partitions */ private static final int MAX_NUM_PARTITIONS = 32; /** @@ -91,7 +69,7 @@ public class CompactingHashTable extends AbstractMutableHashTable{ * used to determine the ratio of the number of memory segments intended for partition * buffers and the number of memory segments in the hash-table structure. */ - private static final int DEFAULT_RECORD_LEN = 24; //FIXME maybe find a better default + private static final int DEFAULT_RECORD_LEN = 24; /** * The length of the hash code stored in the bucket. @@ -155,14 +133,14 @@ public class CompactingHashTable extends AbstractMutableHashTable{ // Members // ------------------------------------------------------------------------ - /** - * The free memory segments currently available to the hash join. - */ + /** The lock to synchronize state changes on */ + private final Object stateLock = new Object(); + + /** The free memory segments currently available to the hash join. */ private final ArrayList availableMemory; - /** - * The size of the segments used by the hash join buckets. All segments must be of equal size to ease offset computations. - */ + /** The size of the segments used by the hash join buckets. + * All segments must be of equal size to ease offset computations. */ private final int segmentSize; /** @@ -173,63 +151,59 @@ public class CompactingHashTable extends AbstractMutableHashTable{ */ private final int bucketsPerSegmentMask; - /** - * The number of bits that describe the position of a bucket in a memory segment. Computed as log2(bucketsPerSegment). - */ + /** The number of bits that describe the position of a bucket in a memory segment. + * Computed as log2(bucketsPerSegment). */ private final int bucketsPerSegmentBits; - /** - * An estimate for the average record length. - */ + /** An estimate for the average record length. */ private final int avgRecordLen; + + private final int pageSizeInBits; // ------------------------------------------------------------------------ - /** - * The partitions of the hash table. - */ + /** The partitions of the hash table. */ private final ArrayList> partitions; - /** - * The array of memory segments that contain the buckets which form the actual hash-table - * of hash-codes and pointers to the elements. - */ + /** The array of memory segments that contain the buckets which form the actual hash-table + * of hash-codes and pointers to the elements. */ private MemorySegment[] buckets; - /** - * temporary storage for partition compaction (always attempts to allocate as many segments as the largest partition) - */ + /** Temporary storage for partition compaction (always attempts to allocate + * as many segments as the largest partition) */ private InMemoryPartition compactionMemory; - /** - * The number of buckets in the current table. The bucket array is not necessarily fully - * used, when not all buckets that would fit into the last segment are actually used. - */ + /** The number of buckets in the current table. The bucket array is not necessarily fully + * used, when not all buckets that would fit into the last segment are actually used. */ private int numBuckets; - /** - * flag necessary so a resize is never triggered during a resize since the code paths are interleaved - */ - private boolean isResizing = false; + /** Flag to interrupt closed loops */ + private boolean running = true; + + /** Flag to mark the table as open / closed */ + private boolean closed; - private AtomicBoolean closed = new AtomicBoolean(); + /** Flag necessary so a resize is never triggered during a resize since the code paths are interleaved */ + private boolean isResizing; - private boolean running = true; - - private int pageSizeInBits; // ------------------------------------------------------------------------ // Construction and Teardown // ------------------------------------------------------------------------ - public CompactingHashTable(TypeSerializer buildSideSerializer, TypeComparator buildSideComparator, List memorySegments) - { + public CompactingHashTable(TypeSerializer buildSideSerializer, + TypeComparator buildSideComparator, + List memorySegments) { this(buildSideSerializer, buildSideComparator, memorySegments, DEFAULT_RECORD_LEN); } - public CompactingHashTable(TypeSerializer buildSideSerializer, TypeComparator buildSideComparator, List memorySegments, int avgRecordLen) - { + public CompactingHashTable(TypeSerializer buildSideSerializer, + TypeComparator buildSideComparator, + List memorySegments, + int avgRecordLen) { + super(buildSideSerializer, buildSideComparator); + // some sanity checks first if (memorySegments == null) { throw new NullPointerException(); @@ -252,6 +226,9 @@ public CompactingHashTable(TypeSerializer buildSideSerializer, TypeComparator if ( (this.segmentSize & this.segmentSize - 1) != 0) { throw new IllegalArgumentException("Hash Table requires buffers whose size is a power of 2."); } + + this.pageSizeInBits = MathUtils.log2strict(this.segmentSize); + int bucketsPerSegment = this.segmentSize >> NUM_INTRA_BUCKET_BITS; if (bucketsPerSegment == 0) { throw new IllegalArgumentException("Hash Table requires buffers of at least " + HASH_BUCKET_SIZE + " bytes."); @@ -262,22 +239,26 @@ public CompactingHashTable(TypeSerializer buildSideSerializer, TypeComparator this.partitions = new ArrayList>(); // because we allow to open and close multiple times, the state is initially closed - this.closed.set(true); + this.closed = true; + // so far no partition has any MemorySegments } // ------------------------------------------------------------------------ - // Life-Cycle + // life cycle // ------------------------------------------------------------------------ /** - * Build the hash table + * Initialize the hash table */ + @Override public void open() { - // sanity checks - if (!this.closed.compareAndSet(true, false)) { - throw new IllegalStateException("Hash Table cannot be opened, because it is currently not closed."); + synchronized (stateLock) { + if (!closed) { + throw new IllegalStateException("currently not closed."); + } + closed = false; } // create the partitions @@ -290,7 +271,6 @@ public void open() { initTable(numBuckets, (byte) partitionFanOut); } - /** * Closes the hash table. This effectively releases all internal structures and closes all @@ -299,10 +279,14 @@ public void open() { * all resources that are currently held by the hash join. If another process still access the hash * table after close has been called no operations will be performed. */ + @Override public void close() { // make sure that we close only once - if (!this.closed.compareAndSet(false, true)) { - return; + synchronized (this.stateLock) { + if (this.closed) { + return; + } + this.closed = true; } LOG.debug("Closing hash table and releasing resources."); @@ -313,45 +297,41 @@ public void close() { // clear the memory in the partitions clearPartitions(); } - + + @Override public void abort() { this.running = false; - LOG.debug("Cancelling hash table operations."); } - + + @Override public List getFreeMemory() { - if (!this.closed.get()) { + if (!this.closed) { throw new IllegalStateException("Cannot return memory while join is open."); } return this.availableMemory; } - - - public void buildTable(final MutableObjectIterator input) throws IOException { - T record = this.buildSideSerializer.createInstance(); - - // go over the complete input and insert every element into the hash table - while (this.running && ((record = input.next(record)) != null)) { - insert(record); - } - } + // ------------------------------------------------------------------------ + // adding data to the hash table + // ------------------------------------------------------------------------ + public void buildTableWithUniqueKey(final MutableObjectIterator input) throws IOException { - T record = this.buildSideSerializer.createInstance(); - T tmp = this.buildSideSerializer.createInstance(); - // go over the complete input and insert every element into the hash table - while (this.running && ((record = input.next(record)) != null)) { - insertOrReplaceRecord(record, tmp); + + T value; + while (this.running && (value = input.next()) != null) { + insertOrReplaceRecord(value); } } - + + @Override public final void insert(T record) throws IOException { - if(this.closed.get()) { + if (this.closed) { return; } + final int hashCode = hash(this.buildSideComparator.hash(record)); final int posHashCode = hashCode % this.numBuckets; @@ -364,60 +344,8 @@ public final void insert(T record) throws IOException { final int partitionNumber = bucket.get(bucketInSegmentPos + HEADER_PARTITION_OFFSET); InMemoryPartition partition = this.partitions.get(partitionNumber); - - long pointer; - try { - pointer = partition.appendRecord(record); - if((pointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) { - this.compactionMemory.allocateSegments((int)(pointer >> this.pageSizeInBits)); - } - } catch (EOFException e) { - try { - compactPartition(partitionNumber); - // retry append - partition = this.partitions.get(partitionNumber); // compaction invalidates reference - pointer = partition.appendRecord(record); - } catch (EOFException ex) { - throw new RuntimeException("Memory ran out. Compaction failed. " + - getMemoryConsumptionString() + - " Message: " + ex.getMessage()); - } catch (IndexOutOfBoundsException ex) { - throw new RuntimeException("Memory ran out. Compaction failed. " + - getMemoryConsumptionString() + - " Message: " + ex.getMessage()); - } - } catch (IndexOutOfBoundsException e1) { - try { - compactPartition(partitionNumber); - // retry append - partition = this.partitions.get(partitionNumber); // compaction invalidates reference - pointer = partition.appendRecord(record); - } catch (EOFException ex) { - throw new RuntimeException("Memory ran out. Compaction failed. " + - getMemoryConsumptionString() + - " Message: " + ex.getMessage()); - } catch (IndexOutOfBoundsException ex) { - throw new RuntimeException("Memory ran out. Compaction failed. " + - getMemoryConsumptionString() + - " Message: " + ex.getMessage()); - } - } - insertBucketEntryFromStart(partition, bucket, bucketInSegmentPos, hashCode, pointer); - } - - - @Override - public HashTableProber getProber(TypeComparator probeSideComparator, TypePairComparator pairComparator) { - return new HashTableProber(probeSideComparator, pairComparator); - } - - /** - * - * @return Iterator over hash table - * @see EntryIterator - */ - public MutableObjectIterator getEntryIterator() { - return new EntryIterator(this); + long pointer = insertRecordIntoPartition(record, partition, false); + insertBucketEntryFromStart(bucket, bucketInSegmentPos, hashCode, pointer, partitionNumber); } /** @@ -425,11 +353,10 @@ public MutableObjectIterator getEntryIterator() { * May trigger expensive compaction. * * @param record record to insert or replace - * @param tempHolder instance of T that will be overwritten * @throws IOException */ - public void insertOrReplaceRecord(T record, T tempHolder) throws IOException { - if(this.closed.get()) { + public void insertOrReplaceRecord(T record) throws IOException { + if (this.closed) { return; } @@ -437,14 +364,15 @@ public void insertOrReplaceRecord(T record, T tempHolder) throws IOException { final int posHashCode = searchHashCode % this.numBuckets; // get the bucket for the given hash code - MemorySegment originalBucket = this.buckets[posHashCode >> this.bucketsPerSegmentBits]; - int originalBucketOffset = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS; + final MemorySegment originalBucket = this.buckets[posHashCode >> this.bucketsPerSegmentBits]; + final int originalBucketOffset = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS; + MemorySegment bucket = originalBucket; int bucketInSegmentOffset = originalBucketOffset; // get the basic characteristics of the bucket final int partitionNumber = bucket.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET); - InMemoryPartition partition = this.partitions.get(partitionNumber); + final InMemoryPartition partition = this.partitions.get(partitionNumber); final MemorySegment[] overflowSegments = partition.overflowSegments; this.buildSideComparator.setReference(record); @@ -471,58 +399,12 @@ public void insertOrReplaceRecord(T record, T tempHolder) throws IOException { numInSegment++; // deserialize the key to check whether it is really equal, or whether we had only a hash collision - try { - tempHolder = partition.readRecordAt(pointer, tempHolder); - if (this.buildSideComparator.equalToReference(tempHolder)) { - long newPointer = partition.appendRecord(record); - bucket.putLong(pointerOffset, newPointer); - partition.setCompaction(false); - if((newPointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) { - this.compactionMemory.allocateSegments((int)(newPointer >> this.pageSizeInBits)); - } - return; - } - } catch (EOFException e) { - // system is out of memory so we attempt to reclaim memory with a copy compact run - long newPointer; - try { - compactPartition(partition.getPartitionNumber()); - // retry append - partition = this.partitions.get(partitionNumber); // compaction invalidates reference - newPointer = partition.appendRecord(record); - } catch (EOFException ex) { - throw new RuntimeException("Memory ran out. Compaction failed. " + - getMemoryConsumptionString() + - " Message: " + ex.getMessage()); - } catch (IndexOutOfBoundsException ex) { - throw new RuntimeException("Memory ran out. Compaction failed. " + - getMemoryConsumptionString() + - " Message: " + ex.getMessage()); - } - bucket.putLong(pointerOffset, newPointer); - return; - } catch (IndexOutOfBoundsException e) { - // system is out of memory so we attempt to reclaim memory with a copy compact run - long newPointer; - try { - compactPartition(partition.getPartitionNumber()); - // retry append - partition = this.partitions.get(partitionNumber); // compaction invalidates reference - newPointer = partition.appendRecord(record); - } catch (EOFException ex) { - throw new RuntimeException("Memory ran out. Compaction failed. " + - getMemoryConsumptionString() + - " Message: " + ex.getMessage()); - } catch (IndexOutOfBoundsException ex) { - throw new RuntimeException("Memory ran out. Compaction failed. " + - getMemoryConsumptionString() + - " Message: " + ex.getMessage()); - } + T valueAtPosition = partition.readRecordAt(pointer); + if (this.buildSideComparator.equalToReference(valueAtPosition)) { + long newPointer = insertRecordIntoPartition(record, partition, true); bucket.putLong(pointerOffset, newPointer); return; - } catch (IOException e) { - throw new RuntimeException("Error deserializing record from the hashtable: " + e.getMessage(), e); - } + } } else { numInSegment++; @@ -532,28 +414,89 @@ public void insertOrReplaceRecord(T record, T tempHolder) throws IOException { // this segment is done. check if there is another chained bucket long newForwardPointer = bucket.getLong(bucketInSegmentOffset + HEADER_FORWARD_OFFSET); if (newForwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) { + // nothing found. append and insert - long pointer = partition.appendRecord(record); - //insertBucketEntryFromStart(partition, originalBucket, originalBucketOffset, searchHashCode, pointer); - insertBucketEntryFromSearch(partition, originalBucket, bucket, originalBucketOffset, bucketInSegmentOffset, countInSegment, currentForwardPointer, searchHashCode, pointer); - if((pointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) { - this.compactionMemory.allocateSegments((int)(pointer >> this.pageSizeInBits)); + long pointer = insertRecordIntoPartition(record, partition, false); + + if (countInSegment < NUM_ENTRIES_PER_BUCKET) { + // we are good in our current bucket, put the values + bucket.putInt(bucketInSegmentOffset + BUCKET_HEADER_LENGTH + (countInSegment * HASH_CODE_LEN), searchHashCode); // hash code + bucket.putLong(bucketInSegmentOffset + BUCKET_POINTER_START_OFFSET + (countInSegment * POINTER_LEN), pointer); // pointer + bucket.putInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET, countInSegment + 1); // update count + } + else { + insertBucketEntryFromStart(originalBucket, originalBucketOffset, searchHashCode, pointer, partitionNumber); } return; } final int overflowSegNum = (int) (newForwardPointer >>> 32); bucket = overflowSegments[overflowSegNum]; - bucketInSegmentOffset = (int) (newForwardPointer & 0xffffffff); + bucketInSegmentOffset = (int) newForwardPointer; countInSegment = bucket.getInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET); posInSegment = bucketInSegmentOffset + BUCKET_HEADER_LENGTH; numInSegment = 0; currentForwardPointer = newForwardPointer; } } + + private long insertRecordIntoPartition(T record, InMemoryPartition partition, + boolean fragments) throws IOException { + try { + long pointer = partition.appendRecord(record); + if (fragments) { + partition.setIsCompacted(false); + } + if ((pointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) { + this.compactionMemory.allocateSegments((int)(pointer >> this.pageSizeInBits)); + } + return pointer; + } + catch (Exception e) { + if (e instanceof EOFException || e instanceof IndexOutOfBoundsException) { + // this indicates an out of memory situation + try { + final int partitionNumber = partition.getPartitionNumber(); + compactPartition(partitionNumber); + + // retry append + partition = this.partitions.get(partitionNumber); // compaction invalidates reference + long newPointer = partition.appendRecord(record); + if ((newPointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) { + this.compactionMemory.allocateSegments((int)(newPointer >> this.pageSizeInBits)); + } + return newPointer; + } + catch (EOFException ex) { + throw new RuntimeException("Memory ran out. Compaction failed. " + + getMemoryConsumptionString() + " Message: " + ex.getMessage()); + } + catch (IndexOutOfBoundsException ex) { + throw new RuntimeException("Memory ran out. Compaction failed. " + + getMemoryConsumptionString() + " Message: " + ex.getMessage()); + } + } + else if (e instanceof IOException) { + throw (IOException) e; + } + else //noinspection ConstantConditions + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + else { + throw new RuntimeException("Writing record to compacting hash table failed", e); + } + } + } - private void insertBucketEntryFromStart(InMemoryPartition p, MemorySegment bucket, - int bucketInSegmentPos, int hashCode, long pointer) + + /** + * IMPORTANT!!! We pass only the partition number, because we must make sure we get a fresh + * partition reference. The partition reference used during search for the key may have become + * invalid during the compaction. + */ + private void insertBucketEntryFromStart(MemorySegment bucket, int bucketInSegmentPos, + int hashCode, long pointer, int partitionNumber) throws IOException { boolean checkForResize = false; @@ -564,8 +507,11 @@ private void insertBucketEntryFromStart(InMemoryPartition p, MemorySegment bu bucket.putInt(bucketInSegmentPos + BUCKET_HEADER_LENGTH + (count * HASH_CODE_LEN), hashCode); // hash code bucket.putLong(bucketInSegmentPos + BUCKET_POINTER_START_OFFSET + (count * POINTER_LEN), pointer); // pointer bucket.putInt(bucketInSegmentPos + HEADER_COUNT_OFFSET, count + 1); // update count - } else { + } + else { // we need to go to the overflow buckets + final InMemoryPartition p = this.partitions.get(partitionNumber); + final long originalForwardPointer = bucket.getLong(bucketInSegmentPos + HEADER_FORWARD_OFFSET); final long forwardForNewBucket; @@ -573,7 +519,7 @@ private void insertBucketEntryFromStart(InMemoryPartition p, MemorySegment bu // forward pointer set final int overflowSegNum = (int) (originalForwardPointer >>> 32); - final int segOffset = (int) (originalForwardPointer & 0xffffffff); + final int segOffset = (int) originalForwardPointer; final MemorySegment seg = p.overflowSegments[overflowSegNum]; final int obCount = seg.getInt(segOffset + HEADER_COUNT_OFFSET); @@ -635,31 +581,42 @@ private void insertBucketEntryFromStart(InMemoryPartition p, MemorySegment bu bucket.putLong(bucketInSegmentPos + HEADER_FORWARD_OFFSET, pointerToNewBucket); // finally, insert the values into the overflow buckets - overflowSeg.putInt(overflowBucketOffset + BUCKET_HEADER_LENGTH, hashCode); // hash code + overflowSeg.putInt(overflowBucketOffset + BUCKET_HEADER_LENGTH, hashCode); // hash code overflowSeg.putLong(overflowBucketOffset + BUCKET_POINTER_START_OFFSET, pointer); // pointer // set the count to one - overflowSeg.putInt(overflowBucketOffset + HEADER_COUNT_OFFSET, 1); - if(checkForResize && !this.isResizing) { + overflowSeg.putInt(overflowBucketOffset + HEADER_COUNT_OFFSET, 1); + + if (checkForResize && !this.isResizing) { // check if we should resize buckets - if(this.buckets.length <= getOverflowSegmentCount()) { + if (this.buckets.length <= getOverflowSegmentCount()) { resizeHashTable(); } } } } - - private void insertBucketEntryFromSearch(InMemoryPartition partition, MemorySegment originalBucket, MemorySegment currentBucket, int originalBucketOffset, int currentBucketOffset, int countInCurrentBucket, long currentForwardPointer, int hashCode, long pointer) throws IOException { + + /** + * IMPORTANT!!! We pass only the partition number, because we must make sure we get a fresh + * partition reference. The partition reference used during search for the key may have become + * invalid during the compaction. + */ + private void insertBucketEntryFromSearch(MemorySegment originalBucket, MemorySegment currentBucket, + int originalBucketOffset, int currentBucketOffset, + int countInCurrentBucket, long originalForwardPointer, + int hashCode, long pointer, int partitionNumber) throws IOException { boolean checkForResize = false; if (countInCurrentBucket < NUM_ENTRIES_PER_BUCKET) { // we are good in our current bucket, put the values - currentBucket.putInt(currentBucketOffset + BUCKET_HEADER_LENGTH + (countInCurrentBucket * HASH_CODE_LEN), hashCode); // hash code + currentBucket.putInt(currentBucketOffset + BUCKET_HEADER_LENGTH + (countInCurrentBucket * HASH_CODE_LEN), hashCode); // hash code currentBucket.putLong(currentBucketOffset + BUCKET_POINTER_START_OFFSET + (countInCurrentBucket * POINTER_LEN), pointer); // pointer currentBucket.putInt(currentBucketOffset + HEADER_COUNT_OFFSET, countInCurrentBucket + 1); // update count - } else { - // we need a new overflow bucket + } + else { + // we go to a new overflow bucket + final InMemoryPartition partition = this.partitions.get(partitionNumber); MemorySegment overflowSeg; - final int overflowBucketNum; + final int overflowSegmentNum; final int overflowBucketOffset; // first, see if there is space for an overflow bucket remaining in the last overflow segment @@ -667,7 +624,7 @@ private void insertBucketEntryFromSearch(InMemoryPartition partition, MemoryS // no space left in last bucket, or no bucket yet, so create an overflow segment overflowSeg = getNextBuffer(); overflowBucketOffset = 0; - overflowBucketNum = partition.numOverflowSegments; + overflowSegmentNum = partition.numOverflowSegments; // add the new overflow segment if (partition.overflowSegments.length <= partition.numOverflowSegments) { @@ -678,10 +635,11 @@ private void insertBucketEntryFromSearch(InMemoryPartition partition, MemoryS partition.overflowSegments[partition.numOverflowSegments] = overflowSeg; partition.numOverflowSegments++; checkForResize = true; - } else { + } + else { // there is space in the last overflow segment - overflowBucketNum = partition.numOverflowSegments - 1; - overflowSeg = partition.overflowSegments[overflowBucketNum]; + overflowSegmentNum = partition.numOverflowSegments - 1; + overflowSeg = partition.overflowSegments[overflowSegmentNum]; overflowBucketOffset = partition.nextOverflowBucket << NUM_INTRA_BUCKET_BITS; } @@ -690,10 +648,11 @@ private void insertBucketEntryFromSearch(InMemoryPartition partition, MemoryS partition.nextOverflowBucket = (partition.nextOverflowBucket == this.bucketsPerSegmentMask ? 0 : partition.nextOverflowBucket + 1); // insert the new overflow bucket in the chain of buckets + // 1) set the old forward pointer // 2) let the bucket in the main table point to this one - overflowSeg.putLong(overflowBucketOffset + HEADER_FORWARD_OFFSET, currentForwardPointer); - final long pointerToNewBucket = (((long) overflowBucketNum) << 32) | ((long) overflowBucketOffset); + overflowSeg.putLong(overflowBucketOffset + HEADER_FORWARD_OFFSET, originalForwardPointer); + final long pointerToNewBucket = (((long) overflowSegmentNum) << 32) | ((long) overflowBucketOffset); originalBucket.putLong(originalBucketOffset + HEADER_FORWARD_OFFSET, pointerToNewBucket); // finally, insert the values into the overflow buckets @@ -710,16 +669,33 @@ private void insertBucketEntryFromSearch(InMemoryPartition partition, MemoryS } } } + + // -------------------------------------------------------------------------------------------- + // Access to the entries + // -------------------------------------------------------------------------------------------- + + @Override + public HashTableProber getProber(TypeComparator probeSideComparator, TypePairComparator pairComparator) { + return new HashTableProber(probeSideComparator, pairComparator); + } + + /** + * + * @return Iterator over hash table + * @see EntryIterator + */ + public MutableObjectIterator getEntryIterator() { + return new EntryIterator(this); + } // -------------------------------------------------------------------------------------------- - // Setup and Tear Down of Structures + // Setup and Tear Down of Structures // -------------------------------------------------------------------------------------------- private void createPartitions(int numPartitions) { this.partitions.clear(); ListMemorySegmentSource memSource = new ListMemorySegmentSource(this.availableMemory); - this.pageSizeInBits = MathUtils.log2strict(this.segmentSize); for (int i = 0; i < numPartitions; i++) { this.partitions.add(new InMemoryPartition(this.buildSideSerializer, i, memSource, this.segmentSize, pageSizeInBits)); @@ -728,8 +704,7 @@ private void createPartitions(int numPartitions) { } private void clearPartitions() { - for (int i = 0; i < this.partitions.size(); i++) { - InMemoryPartition p = this.partitions.get(i); + for (InMemoryPartition p : this.partitions) { p.clearAllMemory(this.availableMemory); } this.partitions.clear(); @@ -768,14 +743,14 @@ private void releaseTable() { // set the counters back this.numBuckets = 0; if (this.buckets != null) { - for (int i = 0; i < this.buckets.length; i++) { - this.availableMemory.add(this.buckets[i]); + for (MemorySegment bucket : this.buckets) { + this.availableMemory.add(bucket); } this.buckets = null; } } - private final MemorySegment getNextBuffer() { + private MemorySegment getNextBuffer() { // check if the list directly offers memory int s = this.availableMemory.size(); if (s > 0) { @@ -799,7 +774,7 @@ private final MemorySegment getNextBuffer() { * @param numBuffers The number of buffers available. * @return The number of partitions to use. */ - private static final int getPartitioningFanOutNoEstimates(int numBuffers) { + private static int getPartitioningFanOutNoEstimates(int numBuffers) { return Math.max(10, Math.min(numBuffers / 10, MAX_NUM_PARTITIONS)); } @@ -807,14 +782,13 @@ private static final int getPartitioningFanOutNoEstimates(int numBuffers) { * @return String containing a summary of the memory consumption for error messages */ private String getMemoryConsumptionString() { - String result = new String("numPartitions: " + this.partitions.size() + + return "numPartitions: " + this.partitions.size() + " minPartition: " + getMinPartition() + " maxPartition: " + getMaxPartition() + " number of overflow segments: " + getOverflowSegmentCount() + " bucketSize: " + this.buckets.length + - " Overall memory: " + getSize() + - " Partition memory: " + getPartitionSize()); - return result; + " Overall memory: " + getSize() + + " Partition memory: " + getPartitionSize(); } /** @@ -878,7 +852,7 @@ private int getMinPartition() { */ private int getOverflowSegmentCount() { int result = 0; - for(InMemoryPartition p : this.partitions) { + for (InMemoryPartition p : this.partitions) { result += p.numOverflowSegments; } return result; @@ -890,7 +864,7 @@ private int getOverflowSegmentCount() { * * @return number of buckets */ - private static final int getInitialTableSize(int numBuffers, int bufferSize, int numPartitions, int recordLenBytes) { + private static int getInitialTableSize(int numBuffers, int bufferSize, int numPartitions, int recordLenBytes) { final long totalSize = ((long) bufferSize) * numBuffers; final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES); final long bucketBytes = numRecordsStorable * RECORD_OVERHEAD_BYTES; @@ -908,7 +882,7 @@ private static final int getInitialTableSize(int numBuffers, int bufferSize, int * @param numPartitions number of partitions * @return The hash code for the integer. */ - private static final byte assignPartition(int bucket, byte numPartitions) { + private static byte assignPartition(int bucket, byte numPartitions) { return (byte) (bucket % numPartitions); } @@ -924,26 +898,27 @@ private boolean resizeHashTable() throws IOException { final int newNumSegments = (newNumBuckets + (bucketsPerSegment-1)) / bucketsPerSegment; final int additionalSegments = newNumSegments-this.buckets.length; final int numPartitions = this.partitions.size(); - if(this.availableMemory.size() < additionalSegments) { - for(int i = 0; i < numPartitions; i++) { + + if (this.availableMemory.size() < additionalSegments) { + for (int i = 0; i < numPartitions; i++) { compactPartition(i); if(this.availableMemory.size() >= additionalSegments) { break; } } } - if(this.availableMemory.size() < additionalSegments || this.closed.get()) { + + if (this.availableMemory.size() < additionalSegments || this.closed) { return false; - } else { + } + else { this.isResizing = true; // allocate new buckets final int startOffset = (this.numBuckets * HASH_BUCKET_SIZE) % this.segmentSize; - MemorySegment[] newBuckets = new MemorySegment[additionalSegments]; final int oldNumBuckets = this.numBuckets; final int oldNumSegments = this.buckets.length; MemorySegment[] mergedBuckets = new MemorySegment[newNumSegments]; System.arraycopy(this.buckets, 0, mergedBuckets, 0, this.buckets.length); - System.arraycopy(newBuckets, 0, mergedBuckets, this.buckets.length, newBuckets.length); this.buckets = mergedBuckets; this.numBuckets = newNumBuckets; // initialize all new buckets @@ -951,7 +926,7 @@ private boolean resizeHashTable() throws IOException { final int startSegment = oldSegment ? (oldNumSegments-1) : oldNumSegments; for (int i = startSegment, bucket = oldNumBuckets; i < newNumSegments && bucket < this.numBuckets; i++) { MemorySegment seg; - int bucketOffset = 0; + int bucketOffset; if(oldSegment) { // the first couple of new buckets may be located on an old segment seg = this.buckets[i]; for (int k = (oldNumBuckets % bucketsPerSegment) ; k < bucketsPerSegment && bucket < this.numBuckets; k++, bucket++) { @@ -965,7 +940,7 @@ private boolean resizeHashTable() throws IOException { seg = getNextBuffer(); // go over all buckets in the segment for (int k = 0; k < bucketsPerSegment && bucket < this.numBuckets; k++, bucket++) { - bucketOffset = k * HASH_BUCKET_SIZE; + bucketOffset = k * HASH_BUCKET_SIZE; // initialize the header fields seg.put(bucketOffset + HEADER_PARTITION_OFFSET, assignPartition(bucket, (byte)numPartitions)); seg.putInt(bucketOffset + HEADER_COUNT_OFFSET, 0); @@ -975,19 +950,21 @@ private boolean resizeHashTable() throws IOException { this.buckets[i] = seg; oldSegment = false; // we write on at most one old segment } - int hashOffset = 0; - int hash = 0; - int pointerOffset = 0; - long pointer = 0; + int hashOffset; + int hash; + int pointerOffset; + long pointer; IntArrayList hashList = new IntArrayList(NUM_ENTRIES_PER_BUCKET); LongArrayList pointerList = new LongArrayList(NUM_ENTRIES_PER_BUCKET); IntArrayList overflowHashes = new IntArrayList(64); LongArrayList overflowPointers = new LongArrayList(64); + // go over all buckets and split them between old and new buckets - for(int i = 0; i < numPartitions; i++) { + for (int i = 0; i < numPartitions; i++) { InMemoryPartition partition = this.partitions.get(i); final MemorySegment[] overflowSegments = partition.overflowSegments; - int posHashCode = 0; + + int posHashCode; for (int j = 0, bucket = i; j < this.buckets.length && bucket < oldNumBuckets; j++) { MemorySegment segment = this.buckets[j]; // go over all buckets in the segment belonging to the partition @@ -1021,7 +998,7 @@ private boolean resizeHashTable() throws IOException { } final int overflowSegNum = (int) (forwardPointer >>> 32); segment = overflowSegments[overflowSegNum]; - bucketOffset = (int)(forwardPointer & 0xffffffff); + bucketOffset = (int) forwardPointer; countInSegment = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET); pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET; hashOffset = bucketOffset + BUCKET_HEADER_LENGTH; @@ -1038,25 +1015,29 @@ private boolean resizeHashTable() throws IOException { } int newSegmentIndex = (bucket + oldNumBuckets) / bucketsPerSegment; MemorySegment newSegment = this.buckets[newSegmentIndex]; + // we need to avoid overflows in the first run int oldBucketCount = 0; int newBucketCount = 0; - while(!hashList.isEmpty()) { + while (!hashList.isEmpty()) { hash = hashList.removeLast(); pointer = pointerList.removeLong(pointerList.size()-1); posHashCode = hash % this.numBuckets; - if(posHashCode == bucket && oldBucketCount < NUM_ENTRIES_PER_BUCKET) { + if (posHashCode == bucket && oldBucketCount < NUM_ENTRIES_PER_BUCKET) { bucketOffset = (bucket % bucketsPerSegment) * HASH_BUCKET_SIZE; - insertBucketEntryFromStart(partition, segment, bucketOffset, hash, pointer); + insertBucketEntryFromStart(segment, bucketOffset, hash, pointer, partition.getPartitionNumber()); oldBucketCount++; - } else if(posHashCode == (bucket + oldNumBuckets) && newBucketCount < NUM_ENTRIES_PER_BUCKET) { + } + else if (posHashCode == (bucket + oldNumBuckets) && newBucketCount < NUM_ENTRIES_PER_BUCKET) { bucketOffset = ((bucket + oldNumBuckets) % bucketsPerSegment) * HASH_BUCKET_SIZE; - insertBucketEntryFromStart(partition, newSegment, bucketOffset, hash, pointer); + insertBucketEntryFromStart(newSegment, bucketOffset, hash, pointer, partition.getPartitionNumber()); newBucketCount++; - } else if(posHashCode == (bucket + oldNumBuckets) || posHashCode == bucket) { + } + else if (posHashCode == (bucket + oldNumBuckets) || posHashCode == bucket) { overflowHashes.add(hash); overflowPointers.add(pointer); - } else { + } + else { throw new IOException("Accessed wrong bucket. Target: " + bucket + " or " + (bucket + oldNumBuckets) + " Hit: " + posHashCode); } } @@ -1067,9 +1048,9 @@ private boolean resizeHashTable() throws IOException { // reset partition's overflow buckets and reclaim their memory this.availableMemory.addAll(partition.resetOverflowBuckets()); // clear overflow lists - int bucketArrayPos = 0; - int bucketInSegmentPos = 0; - MemorySegment bucket = null; + int bucketArrayPos; + int bucketInSegmentPos; + MemorySegment bucket; while(!overflowHashes.isEmpty()) { hash = overflowHashes.removeLast(); pointer = overflowPointers.removeLong(overflowPointers.size()-1); @@ -1077,7 +1058,7 @@ private boolean resizeHashTable() throws IOException { bucketArrayPos = posHashCode >>> this.bucketsPerSegmentBits; bucketInSegmentPos = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS; bucket = this.buckets[bucketArrayPos]; - insertBucketEntryFromStart(partition, bucket, bucketInSegmentPos, hash, pointer); + insertBucketEntryFromStart(bucket, bucketInSegmentPos, hash, pointer, partition.getPartitionNumber()); } overflowHashes.clear(); overflowPointers.clear(); @@ -1095,7 +1076,7 @@ private boolean resizeHashTable() throws IOException { */ private void compactPartition(final int partitionNumber) throws IOException { // do nothing if table was closed, parameter is invalid or no garbage exists - if(this.closed.get() || partitionNumber >= this.partitions.size() || this.partitions.get(partitionNumber).isCompacted()) { + if (this.closed || partitionNumber >= this.partitions.size() || this.partitions.get(partitionNumber).isCompacted()) { return; } // release all segments owned by compaction partition @@ -1106,9 +1087,9 @@ private void compactPartition(final int partitionNumber) throws IOException { final int numPartitions = this.partitions.size(); InMemoryPartition partition = this.partitions.remove(partitionNumber); MemorySegment[] overflowSegments = partition.overflowSegments; - long pointer = 0L; - int pointerOffset = 0; - int bucketOffset = 0; + long pointer; + int pointerOffset; + int bucketOffset; final int bucketsPerSegment = this.bucketsPerSegmentMask + 1; for (int i = 0, bucket = partitionNumber; i < this.buckets.length && bucket < this.numBuckets; i++) { MemorySegment segment = this.buckets[i]; @@ -1138,7 +1119,7 @@ private void compactPartition(final int partitionNumber) throws IOException { } final int overflowSegNum = (int) (forwardPointer >>> 32); segment = overflowSegments[overflowSegNum]; - bucketOffset = (int)(forwardPointer & 0xffffffff); + bucketOffset = (int) forwardPointer; countInSegment = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET); pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET; numInSegment = 0; @@ -1152,7 +1133,7 @@ private void compactPartition(final int partitionNumber) throws IOException { this.partitions.get(partitionNumber).overflowSegments = partition.overflowSegments; this.partitions.get(partitionNumber).numOverflowSegments = partition.numOverflowSegments; this.partitions.get(partitionNumber).nextOverflowBucket = partition.nextOverflowBucket; - this.partitions.get(partitionNumber).setCompaction(true); + this.partitions.get(partitionNumber).setIsCompacted(true); //this.partitions.get(partitionNumber).pushDownPages(); this.compactionMemory = partition; this.compactionMemory.resetRecordCounter(); @@ -1168,22 +1149,6 @@ private void compactPartition(final int partitionNumber) throws IOException { this.compactionMemory.pushDownPages(); } - /** - * Compacts partition but may not reclaim all garbage - * - * @param partitionNumber partition number - * @throws IOException - */ - @SuppressWarnings("unused") - private void fastCompactPartition(int partitionNumber) throws IOException { - // stop if no garbage exists - if(this.partitions.get(partitionNumber).isCompacted()) { - return; - } - //TODO IMPLEMENT ME - return; - } - /** * This function hashes an integer value. It is adapted from Bob Jenkins' website * http://www.burtleburtle.net/bob/hash/integer.html. @@ -1193,7 +1158,7 @@ private void fastCompactPartition(int partitionNumber) throws IOException { * @param code The integer to be hashed. * @return The hash code for the integer. */ - private static final int hash(int code) { + private static int hash(int code) { code = (code + 0x7ed55d16) + (code << 12); code = (code ^ 0xc761c23c) ^ (code >>> 19); code = (code + 0x165667b1) + (code << 5); @@ -1235,7 +1200,7 @@ public T next(T reuse) throws IOException { @Override public T next() throws IOException { - if(done || this.table.closed.get()) { + if (done || this.table.closed) { return null; } else if(!cache.isEmpty()) { return cache.remove(cache.size()-1); @@ -1294,7 +1259,7 @@ private boolean fillCache() throws IOException { } final int overflowSegNum = (int) (forwardPointer >>> 32); bucket = overflowSegments[overflowSegNum]; - bucketOffset = (int)(forwardPointer & 0xffffffff); + bucketOffset = (int) forwardPointer; countInSegment = bucket.getInt(bucketOffset + HEADER_COUNT_OFFSET); posInSegment = bucketOffset + BUCKET_POINTER_START_OFFSET; numInSegment = 0; @@ -1326,7 +1291,7 @@ private HashTableProber(TypeComparator probeTypeComparator, TypePairComparat } public T getMatchFor(PT probeSideRecord, T reuse) { - if(closed.get()) { + if (closed) { return null; } final int searchHashCode = hash(this.probeTypeComparator.hash(probeSideRecord)); @@ -1391,7 +1356,7 @@ public T getMatchFor(PT probeSideRecord, T reuse) { final int overflowSegNum = (int) (forwardPointer >>> 32); bucket = overflowSegments[overflowSegNum]; - bucketInSegmentOffset = (int) (forwardPointer & 0xffffffff); + bucketInSegmentOffset = (int) forwardPointer; countInSegment = bucket.getInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET); posInSegment = bucketInSegmentOffset + BUCKET_HEADER_LENGTH; numInSegment = 0; @@ -1399,7 +1364,7 @@ public T getMatchFor(PT probeSideRecord, T reuse) { } public T getMatchFor(PT probeSideRecord) { - if(closed.get()) { + if (closed) { return null; } final int searchHashCode = hash(this.probeTypeComparator.hash(probeSideRecord)); @@ -1464,7 +1429,7 @@ public T getMatchFor(PT probeSideRecord) { final int overflowSegNum = (int) (forwardPointer >>> 32); bucket = overflowSegments[overflowSegNum]; - bucketInSegmentOffset = (int) (forwardPointer & 0xffffffff); + bucketInSegmentOffset = (int) forwardPointer; countInSegment = bucket.getInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET); posInSegment = bucketInSegmentOffset + BUCKET_HEADER_LENGTH; numInSegment = 0; @@ -1472,49 +1437,11 @@ public T getMatchFor(PT probeSideRecord) { } public void updateMatch(T record) throws IOException { - if(closed.get()) { + if (closed) { return; } - long newPointer; - try { - newPointer = this.partition.appendRecord(record); - } catch (EOFException e) { - // system is out of memory so we attempt to reclaim memory with a copy compact run - try { - int partitionNumber = this.partition.getPartitionNumber(); - compactPartition(partitionNumber); - // retry append - this.partition = partitions.get(partitionNumber); - newPointer = this.partition.appendRecord(record); - } catch (EOFException ex) { - throw new RuntimeException("Memory ran out. Compaction failed. " + - getMemoryConsumptionString() + - " Message: " + ex.getMessage()); - } catch (IndexOutOfBoundsException ex) { - throw new RuntimeException("Memory ran out. Compaction failed. " + - getMemoryConsumptionString() + - " Message: " + ex.getMessage()); - } - } catch (IndexOutOfBoundsException e) { - // system is out of memory so we attempt to reclaim memory with a copy compact run - try { - int partitionNumber = this.partition.getPartitionNumber(); - compactPartition(partitionNumber); - // retry append - this.partition = partitions.get(partitionNumber); - newPointer = this.partition.appendRecord(record); - } catch (EOFException ex) { - throw new RuntimeException("Memory ran out. Compaction failed. " + - getMemoryConsumptionString() + - " Message: " + ex.getMessage()); - } catch (IndexOutOfBoundsException ex) { - throw new RuntimeException("Memory ran out. Compaction failed. " + - getMemoryConsumptionString() + - " Message: " + ex.getMessage()); - } - } + long newPointer = insertRecordIntoPartition(record, this.partition, true); this.bucket.putLong(this.pointerOffsetInBucket, newPointer); - this.partition.setCompaction(false); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java index 7fb997e0fa9c8..ffb66fc3b85e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java @@ -199,7 +199,7 @@ public boolean isCompacted() { * * @param compacted compaction status */ - public void setCompaction(boolean compacted) { + public void setIsCompacted(boolean compacted) { this.compacted = compacted; } @@ -281,9 +281,9 @@ public void clearAllMemory(List target) { * @param numberOfSegments allocation count */ public void allocateSegments(int numberOfSegments) { - while(getBlockCount() < numberOfSegments) { + while (getBlockCount() < numberOfSegments) { MemorySegment next = this.availableMemory.nextSegment(); - if(next != null) { + if (next != null) { this.partitionPages.add(next); } else { return; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java index 94167969abd18..7f07cfbf2c692 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java @@ -886,7 +886,7 @@ final void insertBucketEntry(final HashPartition p, final MemorySegment // forward pointer set final int overflowSegNum = (int) (originalForwardPointer >>> 32); - final int segOffset = (int) (originalForwardPointer & 0xffffffff); + final int segOffset = (int) originalForwardPointer; final MemorySegment seg = p.overflowSegments[overflowSegNum]; final short obCount = seg.getShort(segOffset + HEADER_COUNT_OFFSET); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java index 999c4b0aa26e0..27d958aa2602d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java @@ -71,10 +71,12 @@ private void grow(final int length) { public static final IntArrayList EMPTY = new IntArrayList(0) { + @Override public boolean add(int number) { throw new UnsupportedOperationException(); } - + + @Override public int removeLast() { throw new UnsupportedOperationException(); }; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java new file mode 100644 index 0000000000000..e3b697e6d9e2d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongComparator; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.TupleComparator; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.util.MutableObjectIterator; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; + +public class CompactingHashTableTest { + + private final TypeSerializer> serializer; + private final TypeComparator> comparator; + + private final TypeComparator probeComparator; + + private final TypePairComparator> pairComparator; + + + public CompactingHashTableTest() { + TypeSerializer[] fieldSerializers = { LongSerializer.INSTANCE, StringSerializer.INSTANCE }; + @SuppressWarnings("unchecked") + Class> clazz = (Class>) (Class) Tuple2.class; + this.serializer = new TupleSerializer>(clazz, fieldSerializers); + + TypeComparator[] comparators = { new LongComparator(true) }; + TypeSerializer[] comparatorSerializers = { LongSerializer.INSTANCE }; + + this.comparator = new TupleComparator>(new int[] {0}, comparators, comparatorSerializers); + + this.probeComparator = new LongComparator(true); + + this.pairComparator = new TypePairComparator>() { + + private long ref; + + @Override + public void setReference(Long reference) { + ref = reference; + } + + @Override + public boolean equalToReference(Tuple2 candidate) { + //noinspection UnnecessaryUnboxing + return candidate.f0.longValue() == ref; + } + + @Override + public int compareToReference(Tuple2 candidate) { + long x = ref; + long y = candidate.f0; + return (x < y) ? -1 : ((x == y) ? 0 : 1); + } + }; + } + + // ------------------------------------------------------------------------ + // tests + // ------------------------------------------------------------------------ + + @Test + public void testHashTableGrowthWithInsert() { + try { + final int numElements = 1000000; + + List memory = getMemory(10000, 32 * 1024); + + // we create a hash table that thinks the records are super large. that makes it choose initially + // a lot of memory for the partition buffers, and start with a smaller hash table. that way + // we trigger a hash table growth early. + CompactingHashTable> table = new CompactingHashTable>( + serializer, comparator, memory, 10000); + table.open(); + + for (long i = 0; i < numElements; i++) { + table.insert(new Tuple2(i, String.valueOf(i))); + } + + // make sure that all elements are contained via the entry iterator + { + BitSet bitSet = new BitSet(numElements); + MutableObjectIterator> iter = table.getEntryIterator(); + Tuple2 next; + while ((next = iter.next()) != null) { + assertNotNull(next.f0); + assertNotNull(next.f1); + assertEquals(next.f0.longValue(), Long.parseLong(next.f1)); + + bitSet.set(next.f0.intValue()); + } + + assertEquals(numElements, bitSet.cardinality()); + } + + // make sure all entries are contained via the prober + { + CompactingHashTable>.HashTableProber proper = + table.getProber(probeComparator, pairComparator); + + for (long i = 0; i < numElements; i++) { + assertNotNull(proper.getMatchFor(i)); + assertNull(proper.getMatchFor(i + numElements)); + } + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + /** + * This test validates that records are not lost via "insertOrReplace()" as in bug [FLINK-2361] + */ + @Test + public void testHashTableGrowthWithInsertOrReplace() { + try { + final int numElements = 1000000; + + List memory = getMemory(10000, 32 * 1024); + + // we create a hash table that thinks the records are super large. that makes it choose initially + // a lot of memory for the partition buffers, and start with a smaller hash table. that way + // we trigger a hash table growth early. + CompactingHashTable> table = new CompactingHashTable>( + serializer, comparator, memory, 10000); + table.open(); + + for (long i = 0; i < numElements; i++) { + table.insertOrReplaceRecord(new Tuple2(i, String.valueOf(i))); + } + + // make sure that all elements are contained via the entry iterator + { + BitSet bitSet = new BitSet(numElements); + MutableObjectIterator> iter = table.getEntryIterator(); + Tuple2 next; + while ((next = iter.next()) != null) { + assertNotNull(next.f0); + assertNotNull(next.f1); + assertEquals(next.f0.longValue(), Long.parseLong(next.f1)); + + bitSet.set(next.f0.intValue()); + } + + assertEquals(numElements, bitSet.cardinality()); + } + + // make sure all entries are contained via the prober + { + CompactingHashTable>.HashTableProber proper = + table.getProber(probeComparator, pairComparator); + + for (long i = 0; i < numElements; i++) { + assertNotNull(proper.getMatchFor(i)); + assertNull(proper.getMatchFor(i + numElements)); + } + } + + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + /** + * This test validates that new inserts (rather than updates) in "insertOrReplace()" properly + * react to out of memory conditions. + */ + @Test + public void testInsertsWithInsertOrReplace() { + try { + final int numElements = 1000; + + final String longString = getLongString(10000); + List memory = getMemory(1000, 32 * 1024); + + // we create a hash table that thinks the records are super large. that makes it choose initially + // a lot of memory for the partition buffers, and start with a smaller hash table. that way + // we trigger a hash table growth early. + CompactingHashTable> table = new CompactingHashTable>( + serializer, comparator, memory, 100); + table.open(); + + // first, we insert some elements + for (long i = 0; i < numElements; i++) { + table.insertOrReplaceRecord(new Tuple2(i, longString)); + } + + // now, we replace the same elements, causing fragmentation + for (long i = 0; i < numElements; i++) { + table.insertOrReplaceRecord(new Tuple2(i, longString)); + } + + // now we insert an additional set of elements. without compaction during this insertion, + // the memory will run out + for (long i = 0; i < numElements; i++) { + table.insertOrReplaceRecord(new Tuple2(i + numElements, longString)); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private static List getMemory(int numSegments, int segmentSize) { + ArrayList list = new ArrayList(numSegments); + for (int i = 0; i < numSegments; i++) { + list.add(new MemorySegment(new byte[segmentSize])); + } + return list; + } + + private static String getLongString(int length) { + StringBuilder bld = new StringBuilder(length); + for (int i = 0; i < length; i++) { + bld.append('a'); + } + return bld.toString(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java index a8941a4ba9958..0c656d6e00042 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java @@ -27,10 +27,6 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.operators.hash.AbstractHashTableProber; -import org.apache.flink.runtime.operators.hash.AbstractMutableHashTable; -import org.apache.flink.runtime.operators.hash.CompactingHashTable; -import org.apache.flink.runtime.operators.hash.MutableHashTable; import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator; import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator; import org.apache.flink.runtime.operators.testutils.types.IntPair; @@ -38,6 +34,7 @@ import org.apache.flink.runtime.operators.testutils.types.IntPairPairComparator; import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer; import org.apache.flink.util.MutableObjectIterator; + import org.junit.Test; import static org.junit.Assert.*; @@ -72,8 +69,8 @@ public void testCompactingHashMapPerformance() { MutableObjectIterator updateTester = new UniformIntPairGenerator(NUM_PAIRS, 1, false); - long start = 0L; - long end = 0L; + long start; + long end; long first = System.currentTimeMillis(); @@ -105,7 +102,7 @@ public void testCompactingHashMapPerformance() { start = System.currentTimeMillis(); while(updater.next(target) != null) { target.setValue(target.getValue()*-1); - table.insertOrReplaceRecord(target, temp); + table.insertOrReplaceRecord(target); } end = System.currentTimeMillis(); System.out.println("Update done. Time: " + (end-start) + " ms"); @@ -147,8 +144,8 @@ public void testMutableHashMapPerformance() { MutableObjectIterator updateTester = new UniformIntPairGenerator(NUM_PAIRS, 1, false); - long start = 0L; - long end = 0L; + long start; + long end; long first = System.currentTimeMillis(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java index 2ebcd43c5600a..3dcf688c8999a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java @@ -27,9 +27,6 @@ import org.apache.flink.api.common.typeutils.TypePairComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.operators.hash.AbstractHashTableProber; -import org.apache.flink.runtime.operators.hash.AbstractMutableHashTable; -import org.apache.flink.runtime.operators.hash.CompactingHashTable; import org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator; import org.apache.flink.runtime.operators.testutils.types.IntList; import org.apache.flink.runtime.operators.testutils.types.IntListComparator; @@ -45,7 +42,9 @@ import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator; import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer; import org.apache.flink.util.MutableObjectIterator; + import org.junit.Test; + import org.powermock.reflect.Whitebox; import static org.junit.Assert.*; @@ -235,9 +234,8 @@ public void testVariableLengthBuildAndRetrieve() { final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd); // test replacing - IntList tempHolder = new IntList(); for (int i = 0; i < NUM_LISTS; i++) { - table.insertOrReplaceRecord(overwriteLists[i], tempHolder); + table.insertOrReplaceRecord(overwriteLists[i]); } for (int i = 0; i < NUM_LISTS; i++) { @@ -278,10 +276,9 @@ public void testVariableLengthBuildAndRetrieveMajorityUpdated() { final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd); // test replacing - IntList tempHolder = new IntList(); for (int i = 0; i < NUM_LISTS; i++) { if( i % 100 != 0) { - table.insertOrReplaceRecord(overwriteLists[i], tempHolder); + table.insertOrReplaceRecord(overwriteLists[i]); lists[i] = overwriteLists[i]; } } @@ -327,10 +324,9 @@ public void testVariableLengthBuildAndRetrieveMinorityUpdated() { final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS/STEP_SIZE, rnd); // test replacing - IntList tempHolder = new IntList(); for (int i = 0; i < NUM_LISTS; i += STEP_SIZE) { overwriteLists[i/STEP_SIZE].setKey(overwriteLists[i/STEP_SIZE].getKey()*STEP_SIZE); - table.insertOrReplaceRecord(overwriteLists[i/STEP_SIZE], tempHolder); + table.insertOrReplaceRecord(overwriteLists[i/STEP_SIZE]); lists[i] = overwriteLists[i/STEP_SIZE]; } @@ -379,9 +375,8 @@ public void testRepeatedBuildAndRetrieve() { for(int k = 0; k < NUM_REWRITES; k++) { overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd); // test replacing - IntList tempHolder = new IntList(); for (int i = 0; i < NUM_LISTS; i++) { - table.insertOrReplaceRecord(overwriteLists[i], tempHolder); + table.insertOrReplaceRecord(overwriteLists[i]); } for (int i = 0; i < NUM_LISTS; i++) { @@ -409,11 +404,7 @@ public void testProberUpdate() { table.open(); for (int i = 0; i < NUM_LISTS; i++) { - try { - table.insert(lists[i]); - } catch (Exception e) { - throw e; - } + table.insert(lists[i]); } final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd); @@ -630,9 +621,8 @@ public void testResizeWithCompaction(){ final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd); // test replacing - IntList tempHolder = new IntList(); for (int i = 0; i < NUM_LISTS; i++) { - table.insertOrReplaceRecord(overwriteLists[i], tempHolder); + table.insertOrReplaceRecord(overwriteLists[i]); } Field list = Whitebox.getField(CompactingHashTable.class, "partitions"); @@ -691,7 +681,7 @@ public void testVariableLengthStringBuildAndRetrieve() { while(updater.next(target) != null) { target.setValue(target.getValue()); - table.insertOrReplaceRecord(target, temp); + table.insertOrReplaceRecord(target); } while (updateTester.next(target) != null) {