diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java index 3839613d1a99..3bd6501c97d2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java @@ -120,6 +120,7 @@ @SuppressWarnings({"rawtypes", "unchecked"}) public class MutableSegmentImpl implements MutableSegment { + public static final String TOMBSTONE_KEY = "__tombstone_marker__"; private static final String RECORD_ID_MAP = "__recordIdMap__"; private static final int EXPECTED_COMPRESSION = 1000; private static final int MIN_ROWS_TO_INDEX = 1000_000; // Min size of recordIdMap for updatable metrics. @@ -518,7 +519,14 @@ public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata) // Update number of documents indexed before handling the upsert metadata so that the record becomes queryable // once validated canTakeMore = numDocsIndexed++ < _capacity; - _partitionUpsertMetadataManager.addRecord(this, recordInfo); + + if (row.getFieldToValueMap().containsKey(TOMBSTONE_KEY)) { + _partitionUpsertMetadataManager.deleteRecord(this, recordInfo); + } else { + if (_partitionUpsertMetadataManager.isValidForPartialUpsertInsert(row, recordInfo)) { + _partitionUpsertMetadataManager.addRecord(this, recordInfo); + } + } } else { // Update dictionary first updateDictionary(row); @@ -597,6 +605,7 @@ private RecordInfo multiComparisonRecordInfo(PrimaryKey primaryKey, int docId, G } Preconditions.checkState(comparableIndex != -1, "Documents must have exactly 1 non-null comparison column value"); + return new RecordInfo(primaryKey, docId, new ComparisonColumns(comparisonValues, comparableIndex)); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index e979823c2e05..1e452b7a042e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -124,7 +124,7 @@ protected void doAddSegment(ImmutableSegment segment) { if (validDocIds != null && validDocIds.isEmpty()) { _logger.info("Skip adding segment: {} without valid doc, current primary key count: {}", segment.getSegmentName(), getNumPrimaryKeys()); - immutableSegmentImpl.enableUpsert(this, new ThreadSafeMutableRoaringBitmap()); + immutableSegmentImpl.enableUpsert(this, new ThreadSafeMutableRoaringBitmap()); return; } } else { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index f7529a001161..6bac6042cd6e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -181,6 +181,45 @@ protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDoc } } + @Override + public void deleteRecord(MutableSegment segment, RecordInfo recordInfo) { + ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds()); + _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), + (primaryKey, currentRecordLocation) -> { + if (currentRecordLocation == null) { + _logger.warn( + "Delete invoked for non existent record " + recordInfo.getPrimaryKey() + " " + recordInfo.getDocId() + + " " + recordInfo.getComparisonValue()); + + return null; + } else { + // Existing primary key + + // If the comparison value is lesser for the delete record, that signifies that the delete record + // is now invalid since a new row with the same primary key has arrived after the delete record. + if (recordInfo.getComparisonValue().compareTo(currentRecordLocation.getComparisonValue()) >= 0 + && !currentRecordLocation.getIsTombstoneMarker()) { + IndexSegment currentSegment = currentRecordLocation.getSegment(); + int currentDocId = currentRecordLocation.getDocId(); + // Delete involves two steps: + // 1. Removing the docID from validDocIds. + // 2. Marking the RecordLocation as a tombstone marker + if (segment == currentSegment) { + validDocIds.remove(currentDocId); + } else { + Objects.requireNonNull(currentSegment.getValidDocIds()).remove(currentDocId); + } + // The reason why we cannot put a null as the record location and need the entire record location of + // the last record + // since it might be needed to handle an out-of-order update request + return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue(), true); + } else { + return currentRecordLocation; + } + } + }); + } + @Override protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) { ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds()); @@ -219,6 +258,12 @@ protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) { @Override protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) { assert _partialUpsertHandler != null; + + if (record.getFieldToValueMap() + .containsKey(org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl.TOMBSTONE_KEY)) { + return record; + } + AtomicReference previousRecordReference = new AtomicReference<>(); RecordLocation currentRecordLocation = _primaryKeyToRecordLocationMap.computeIfPresent( HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), (pk, recordLocation) -> { @@ -243,16 +288,42 @@ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) { } } + @Override + public boolean isValidForPartialUpsertInsert(GenericRow row, RecordInfo recordInfo) { + + if (_partialUpsertHandler == null) { + return true; + } + + // Do not check for comparison value as this check will be done upstream anyway + RecordLocation currentRecordLocation = + _primaryKeyToRecordLocationMap.get(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction)); + + // If this is a new primary key or this is a full insert and the previous record has been deleted, this + // is a valid insert + if (currentRecordLocation != null && currentRecordLocation.getIsTombstoneMarker()) { + return _partialUpsertHandler.isValidInsertRecordForDeletedRow(row); + } + + return true; + } + @VisibleForTesting static class RecordLocation { private final IndexSegment _segment; private final int _docId; private final Comparable _comparisonValue; + private final boolean _isTombstoneMarker; public RecordLocation(IndexSegment indexSegment, int docId, Comparable comparisonValue) { + this(indexSegment, docId, comparisonValue, false); + } + + public RecordLocation(IndexSegment indexSegment, int docId, Comparable comparisonValue, boolean isTombstoneMarker) { _segment = indexSegment; _docId = docId; _comparisonValue = comparisonValue; + _isTombstoneMarker = isTombstoneMarker; } public IndexSegment getSegment() { @@ -266,5 +337,9 @@ public int getDocId() { public Comparable getComparisonValue() { return _comparisonValue; } + + public boolean getIsTombstoneMarker() { + return _isTombstoneMarker; + } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java index f18a95ca00a4..060dd79cd8a2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java @@ -37,12 +37,14 @@ public class PartialUpsertHandler { private final PartialUpsertMerger _defaultPartialUpsertMerger; private final List _comparisonColumns; private final List _primaryKeyColumns; + private final Schema _schema; public PartialUpsertHandler(Schema schema, Map partialUpsertStrategies, UpsertConfig.Strategy defaultPartialUpsertStrategy, List comparisonColumns) { _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy); _comparisonColumns = comparisonColumns; _primaryKeyColumns = schema.getPrimaryKeyColumns(); + _schema = schema; for (Map.Entry entry : partialUpsertStrategies.entrySet()) { _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue())); @@ -73,12 +75,34 @@ public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) { newRecord.removeNullValueField(column); } else { PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger); - newRecord.putValue(column, - merger.merge(previousRecord.getValue(column), newRecord.getValue(column))); + newRecord.putValue(column, merger.merge(previousRecord.getValue(column), newRecord.getValue(column))); } } } } return newRecord; } + + /** + * A deleted partial upsert table record can have another insert with the same primary key. + * We do not allow further updates on deleted records but do allow inserting a new record + * with the same primary key. + * + * This check sees if all partial upsert columns are present in the row. If this is true, + * then this is an insert record. There is a chance that this might be an update row which + * just happens to have all fields present. + * + * TODO: Is this correct? Should we introduce an op field in the table schema to distinguish? + */ + public boolean isValidInsertRecordForDeletedRow(GenericRow row) { + for (String column : _schema.getColumnNames()) { + if (!row.getFieldToValueMap().containsKey(column) || row.getFieldToValueMap().get(column) == null) { + if (!row.isNullValue(column)) { + return false; + } + } + } + + return true; + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java index dd07143fd37d..d7fbb0c1044e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java @@ -69,6 +69,12 @@ public interface PartitionUpsertMetadataManager extends Closeable { */ void addRecord(MutableSegment segment, RecordInfo recordInfo); + /** + * Delete a given record with a specified primary key. In the RecordInfo instance, set + * the primary key and the corresponding doc id to be deleted + */ + void deleteRecord(MutableSegment segment, RecordInfo recordInfo); + /** * Replaces the upsert metadata for the old segment with the new immutable segment. */ @@ -84,6 +90,15 @@ public interface PartitionUpsertMetadataManager extends Closeable { */ GenericRow updateRecord(GenericRow record, RecordInfo recordInfo); + /** + * Check if given record is valid for partial update insert *if* the primary + * key already exists but the record is tombstoned + * + * We do not allow any modifications to a deleted partial upsert row but allow + * new records to be inserted with the same primary key. + */ + public boolean isValidForPartialUpsertInsert(GenericRow record, RecordInfo recordInfo); + /** * Stops the metadata manager. After invoking this method, no access to the metadata will be accepted. */ diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java index 4f5ad8bc6c85..d772f08c7055 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java @@ -42,6 +42,7 @@ import org.testng.Assert; import org.testng.annotations.Test; +import static org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl.TOMBSTONE_KEY; import static org.mockito.Mockito.mock; @@ -73,6 +74,7 @@ private void setup(UpsertConfig upsertConfigWithHash) throws Exception { URL schemaResourceUrl = this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH); URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH); + _schema = Schema.fromFile(new File(schemaResourceUrl.getFile())); _tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfigWithHash) @@ -115,6 +117,50 @@ public void testMultipleComparisonColumns() testUpsertIngestion(createPartialUpsertConfig(HashFunction.MURMUR3)); } + @Test + public void testUpsertDeletion() + throws Exception { + testUpsertDeletion(createFullUpsertConfig(HashFunction.NONE)); + testUpsertDeletion(createFullUpsertConfig(HashFunction.MD5)); + testUpsertDeletion(createFullUpsertConfig(HashFunction.MURMUR3)); + + testUpsertDeletion(createPartialUpsertConfig(HashFunction.NONE)); + testUpsertDeletion(createPartialUpsertConfig(HashFunction.MD5)); + testUpsertDeletion(createPartialUpsertConfig(HashFunction.MURMUR3)); + } + + @Test + public void testUpsertDeletionWithInsertion() + throws Exception { + testUpsertDeletion(createPartialUpsertConfig(HashFunction.NONE)); + testUpsertDeletion(createPartialUpsertConfig(HashFunction.MD5)); + testUpsertDeletion(createPartialUpsertConfig(HashFunction.MURMUR3)); + } + + @Test + public void testUpsertDeletionWithUpdate() + throws Exception { + testUpsertDeletion(createPartialUpsertConfig(HashFunction.NONE)); + testUpsertDeletion(createPartialUpsertConfig(HashFunction.MD5)); + testUpsertDeletion(createPartialUpsertConfig(HashFunction.MURMUR3)); + } + + @Test + public void testUpsertDeletionWithInvalidUpdate() + throws Exception { + testUpsertDeletion(createPartialUpsertConfig(HashFunction.NONE)); + testUpsertDeletion(createPartialUpsertConfig(HashFunction.MD5)); + testUpsertDeletion(createPartialUpsertConfig(HashFunction.MURMUR3)); + } + + @Test + public void testUpsertDeletionWithInvalidDelete() + throws Exception { + testUpsertDeletion(createPartialUpsertConfig(HashFunction.NONE)); + testUpsertDeletion(createPartialUpsertConfig(HashFunction.MD5)); + testUpsertDeletion(createPartialUpsertConfig(HashFunction.MURMUR3)); + } + private void testUpsertIngestion(UpsertConfig upsertConfig) throws Exception { setup(upsertConfig); @@ -141,4 +187,146 @@ private void testUpsertIngestion(UpsertConfig upsertConfig) Assert.assertFalse(bitmap.contains(6)); } } + + private void testUpsertDeletion(UpsertConfig upsertConfig) + throws Exception { + setup(upsertConfig); + + GenericRow row = new GenericRow(); + + int docId = _mutableSegmentImpl.getNumDocsIndexed(); + row.putValue("event_id", "pp"); + row.putValue("description", "foobar"); + row.putValue("secondsSinceEpoch", System.currentTimeMillis()); + row.addNullValueField("otherComparisonColumn"); + + _mutableSegmentImpl.index(row, null); + ImmutableRoaringBitmap bitmap = _mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap(); + + Assert.assertTrue(bitmap.contains(docId)); + + row.putValue(TOMBSTONE_KEY, "true"); + _mutableSegmentImpl.index(row, null); + bitmap = _mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap(); + Assert.assertFalse(bitmap.contains(docId)); + } + + private void testUpsertDeletionWithInsertion(UpsertConfig upsertConfig) + throws Exception { + setup(upsertConfig); + GenericRow row = new GenericRow(); + + row.putValue("event_id", "pp"); + row.putValue("description", "testest"); + row.putValue("secondsSinceEpoch", System.currentTimeMillis()); + row.addNullValueField("otherComparisonColumn"); + + _mutableSegmentImpl.index(row, null); + + row.putValue(TOMBSTONE_KEY, "true"); + + _mutableSegmentImpl.index(row, null); + + row = new GenericRow(); + + int docId = _mutableSegmentImpl.getNumDocsIndexed(); + row.putValue("event_id", "pp"); + row.putValue("description", "foobar"); + row.putValue("secondsSinceEpoch", System.currentTimeMillis()); + row.addNullValueField("otherComparisonColumn"); + + _mutableSegmentImpl.index(row, null); + + ImmutableRoaringBitmap bitmap = _mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap(); + + Assert.assertTrue(bitmap.contains(docId)); + } + + private void testUpsertDeletionWithUpdate(UpsertConfig upsertConfig) + throws Exception { + setup(upsertConfig); + GenericRow row = new GenericRow(); + row.putValue("event_id", "pp"); + row.putValue("description", "testest"); + row.putValue("secondsSinceEpoch", System.currentTimeMillis()); + row.addNullValueField("otherComparisonColumn"); + + _mutableSegmentImpl.index(row, null); + + row.putValue(TOMBSTONE_KEY, "true"); + + _mutableSegmentImpl.index(row, null); + + row = new GenericRow(); + + int docId = _mutableSegmentImpl.getNumDocsIndexed(); + row.putValue("event_id", "pp"); + row.putValue("description", "foobar"); + row.putValue("secondsSinceEpoch", System.currentTimeMillis()); + row.addNullValueField("otherComparisonColumn"); + + _mutableSegmentImpl.index(row, null); + + ImmutableRoaringBitmap bitmap = _mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap(); + + Assert.assertTrue(bitmap.contains(docId)); + } + + private void testUpsertDeletionWithInvalidUpdate(UpsertConfig upsertConfig) + throws Exception { + setup(upsertConfig); + GenericRow row = new GenericRow(); + row.putValue("event_id", "pp"); + row.putValue("description", "testest"); + row.putValue("secondsSinceEpoch", System.currentTimeMillis()); + row.addNullValueField("otherComparisonColumn"); + + _mutableSegmentImpl.index(row, null); + + row.putValue(TOMBSTONE_KEY, "true"); + + _mutableSegmentImpl.index(row, null); + + row = new GenericRow(); + + int docId = _mutableSegmentImpl.getNumDocsIndexed(); + row.putValue("event_id", "pp"); + row.putValue("secondsSinceEpoch", System.currentTimeMillis()); + row.addNullValueField("otherComparisonColumn"); + + _mutableSegmentImpl.index(row, null); + + ImmutableRoaringBitmap bitmap = _mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap(); + + Assert.assertFalse(bitmap.contains(docId)); + } + + private void testUpsertDeletionWithInvalidDelete(UpsertConfig upsertConfig) + throws Exception { + setup(upsertConfig); + GenericRow row = new GenericRow(); + row.putValue("event_id", "pp"); + row.putValue("description", "testest"); + row.putValue("secondsSinceEpoch", System.currentTimeMillis()); + row.addNullValueField("otherComparisonColumn"); + + _mutableSegmentImpl.index(row, null); + + row.putValue(TOMBSTONE_KEY, "true"); + + _mutableSegmentImpl.index(row, null); + + row = new GenericRow(); + + int docId = _mutableSegmentImpl.getNumDocsIndexed(); + row.putValue("event_id", "pp"); + row.putValue("secondsSinceEpoch", System.currentTimeMillis()); + row.addNullValueField("otherComparisonColumn"); + + _mutableSegmentImpl.index(row, null); + + ImmutableRoaringBitmap bitmap = _mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap(); + + Assert.assertFalse(bitmap.contains(docId)); + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java index 1e3f130059b6..d535949a4d75 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java @@ -101,9 +101,9 @@ private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean en upsertMetadataManager.addSegment(segment1, validDocIds1, recordInfoList1.iterator()); // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100} assertEquals(recordLocationMap.size(), 3); - checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction); - checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction); - checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction); + checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction, false); + checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction, false); assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 4, 5}); // Add the second segment @@ -129,10 +129,10 @@ private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean en // segment1: 1 -> {4, 120} // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} assertEquals(recordLocationMap.size(), 4); - checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); - checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction); - checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); - checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction); + checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction, false); + checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction, false); assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); @@ -142,10 +142,10 @@ private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean en // segment1: 1 -> {4, 120} // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} assertEquals(recordLocationMap.size(), 4); - checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); - checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction); - checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); - checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction); + checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction, false); + checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction, false); assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); @@ -157,10 +157,10 @@ private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean en // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} // new segment1: 1 -> {4, 120} assertEquals(recordLocationMap.size(), 4); - checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); - checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); - checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); - checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction); + checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction, false); + checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction, false); assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); @@ -171,10 +171,10 @@ private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean en // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} // new segment1: 1 -> {4, 120} assertEquals(recordLocationMap.size(), 4); - checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); - checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); - checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); - checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction); + checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction, false); + checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction, false); assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); @@ -185,10 +185,10 @@ private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean en // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} // new segment1: 1 -> {4, 120} assertEquals(recordLocationMap.size(), 4); - checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); - checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); - checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); - checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction); + checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction, false); + checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction, false); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); @@ -197,7 +197,7 @@ private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean en // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} (not in the map) // new segment1: 1 -> {4, 120} assertEquals(recordLocationMap.size(), 1); - checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); + checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction, false); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); @@ -208,7 +208,7 @@ private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean en upsertMetadataManager.removeSegment(newSegment1); // new segment1: 1 -> {4, 120} assertEquals(recordLocationMap.size(), 1); - checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); + checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction, false); assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); // Close the metadata manager @@ -218,8 +218,7 @@ private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean en private List getRecordInfoList(int numRecords, int[] primaryKeys, int[] timestamps) { List recordInfoList = new ArrayList<>(); for (int i = 0; i < numRecords; i++) { - recordInfoList.add( - new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new IntWrapper(timestamps[i]))); + recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new IntWrapper(timestamps[i]))); } return recordInfoList; } @@ -282,14 +281,21 @@ private static PrimaryKey makePrimaryKey(int value) { } private static void checkRecordLocation(Map recordLocationMap, int keyValue, - IndexSegment segment, int docId, int comparisonValue, HashFunction hashFunction) { + IndexSegment segment, int docId, int comparisonValue, HashFunction hashFunction, boolean isDeleted) { RecordLocation recordLocation = recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(keyValue), hashFunction)); assertNotNull(recordLocation); - assertSame(recordLocation.getSegment(), segment); + + if (!isDeleted) { + assertSame(recordLocation.getSegment(), segment); + } + assertEquals(recordLocation.getDocId(), docId); - assertEquals(((IntWrapper) recordLocation.getComparisonValue())._value, - comparisonValue); + assertEquals(((IntWrapper) recordLocation.getComparisonValue())._value, comparisonValue); + + if (isDeleted) { + assertTrue(recordLocation.getIsTombstoneMarker()); + } } @Test @@ -325,10 +331,10 @@ private void verifyAddRecord(HashFunction hashFunction) // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100} // segment2: 3 -> {0, 100} - checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction); - checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction); - checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction); - checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction); + checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction, false); + checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction, false); + checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction, false); assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0}); @@ -336,10 +342,10 @@ private void verifyAddRecord(HashFunction hashFunction) // segment1: 0 -> {0, 100}, 1 -> {1, 120} // segment2: 2 -> {1, 120}, 3 -> {0, 100} - checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction); - checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction); - checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction); - checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction); + checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction, false); + checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction, false); assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1}); @@ -347,10 +353,10 @@ private void verifyAddRecord(HashFunction hashFunction) // segment1: 0 -> {0, 100}, 1 -> {1, 120} // segment2: 2 -> {1, 120}, 3 -> {0, 100} - checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction); - checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction); - checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction); - checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction); + checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction, false); + checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction, false); assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1}); @@ -358,13 +364,76 @@ private void verifyAddRecord(HashFunction hashFunction) // segment1: 1 -> {1, 120} // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100} - checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction); - checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction); - checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction); - checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction); + checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction, false); + checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction, false); assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{1}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 3}); + upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(4), 5, new IntWrapper(100))); + upsertMetadataManager.deleteRecord(segment2, new RecordInfo(makePrimaryKey(3), 0, new IntWrapper(100))); + + // segment1: 1 -> {1, 120} + // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100, }, 4 -> {5, 100} + checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction, false); + checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction, true); + checkRecordLocation(recordLocationMap, 4, segment2, 5, 100, hashFunction, false); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{1}); + assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{1, 3, 5}); + + // Out of order delete + upsertMetadataManager.deleteRecord(segment2, new RecordInfo(makePrimaryKey(4), 7, new IntWrapper(80))); + + // segment1: 1 -> {1, 120} + // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100, }, 4 -> {5, 100} + checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction, false); + checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction, true); + checkRecordLocation(recordLocationMap, 4, segment2, 5, 100, hashFunction, false); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{1}); + assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{1, 3, 5}); + + upsertMetadataManager.deleteRecord(segment2, new RecordInfo(makePrimaryKey(1), 1, new IntWrapper(130))); + + // segment1: 1 -> {1, 130, } + // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100, }, 4 -> {5, 100} + checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction, false); + checkRecordLocation(recordLocationMap, 1, segment1, 1, 130, hashFunction, true); + checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction, true); + checkRecordLocation(recordLocationMap, 4, segment2, 5, 100, hashFunction, false); + assertTrue(validDocIds1.getMutableRoaringBitmap().getCardinality() == 0); + assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{1, 3, 5}); + + // Already deleted record + upsertMetadataManager.deleteRecord(segment2, new RecordInfo(makePrimaryKey(1), 50, new IntWrapper(190))); + + // segment1: 1 -> {1, 130, } + // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100, }, 4 -> {5, 100} + checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction, false); + checkRecordLocation(recordLocationMap, 1, segment1, 1, 130, hashFunction, true); + checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction, true); + checkRecordLocation(recordLocationMap, 4, segment2, 5, 100, hashFunction, false); + assertTrue(validDocIds1.getMutableRoaringBitmap().getCardinality() == 0); + assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{1, 3, 5}); + + upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(1), 12, new IntWrapper(170))); + + // segment1: + // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100, }, 4 -> {5, 100}, 1 -> {12, 170} + checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction, false); + checkRecordLocation(recordLocationMap, 1, segment2, 12, 170, hashFunction, false); + checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction, true); + checkRecordLocation(recordLocationMap, 4, segment2, 5, 100, hashFunction, false); + assertTrue(validDocIds1.getMutableRoaringBitmap().getCardinality() == 0); + assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{3, 5, 12}); + // Stop the metadata manager upsertMetadataManager.stop(); @@ -372,12 +441,13 @@ private void verifyAddRecord(HashFunction hashFunction) upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(0), 4, new IntWrapper(120))); // segment1: 1 -> {1, 120} // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100} - checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction); - checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction); - checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction); - checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction); - assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{1}); - assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 3}); + checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction, false); + checkRecordLocation(recordLocationMap, 1, segment2, 12, 170, hashFunction, false); + checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction, false); + checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction, true); + checkRecordLocation(recordLocationMap, 4, segment2, 5, 100, hashFunction, false); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{}); + assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{3, 5, 12}); // Close the metadata manager upsertMetadataManager.close();