Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
@SuppressWarnings({"rawtypes", "unchecked"})
public class MutableSegmentImpl implements MutableSegment {

public static final String TOMBSTONE_KEY = "__tombstone_marker__";
private static final String RECORD_ID_MAP = "__recordIdMap__";
private static final int EXPECTED_COMPRESSION = 1000;
private static final int MIN_ROWS_TO_INDEX = 1000_000; // Min size of recordIdMap for updatable metrics.
Expand Down Expand Up @@ -518,7 +519,14 @@ public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata)
// Update number of documents indexed before handling the upsert metadata so that the record becomes queryable
// once validated
canTakeMore = numDocsIndexed++ < _capacity;
_partitionUpsertMetadataManager.addRecord(this, recordInfo);

if (row.getFieldToValueMap().containsKey(TOMBSTONE_KEY)) {
_partitionUpsertMetadataManager.deleteRecord(this, recordInfo);
} else {
if (_partitionUpsertMetadataManager.isValidForPartialUpsertInsert(row, recordInfo)) {
_partitionUpsertMetadataManager.addRecord(this, recordInfo);
}
}
} else {
// Update dictionary first
updateDictionary(row);
Expand Down Expand Up @@ -597,6 +605,7 @@ private RecordInfo multiComparisonRecordInfo(PrimaryKey primaryKey, int docId, G
}
Preconditions.checkState(comparableIndex != -1,
"Documents must have exactly 1 non-null comparison column value");

return new RecordInfo(primaryKey, docId, new ComparisonColumns(comparisonValues, comparableIndex));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ protected void doAddSegment(ImmutableSegment segment) {
if (validDocIds != null && validDocIds.isEmpty()) {
_logger.info("Skip adding segment: {} without valid doc, current primary key count: {}",
segment.getSegmentName(), getNumPrimaryKeys());
immutableSegmentImpl.enableUpsert(this, new ThreadSafeMutableRoaringBitmap());
immutableSegmentImpl.enableUpsert(this, new ThreadSafeMutableRoaringBitmap());
return;
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,45 @@ protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDoc
}
}

@Override
public void deleteRecord(MutableSegment segment, RecordInfo recordInfo) {
ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds());
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
(primaryKey, currentRecordLocation) -> {
if (currentRecordLocation == null) {
_logger.warn(
"Delete invoked for non existent record " + recordInfo.getPrimaryKey() + " " + recordInfo.getDocId()
+ " " + recordInfo.getComparisonValue());

return null;
} else {
// Existing primary key

// If the comparison value is lesser for the delete record, that signifies that the delete record
// is now invalid since a new row with the same primary key has arrived after the delete record.
if (recordInfo.getComparisonValue().compareTo(currentRecordLocation.getComparisonValue()) >= 0
&& !currentRecordLocation.getIsTombstoneMarker()) {
IndexSegment currentSegment = currentRecordLocation.getSegment();
int currentDocId = currentRecordLocation.getDocId();
// Delete involves two steps:
// 1. Removing the docID from validDocIds.
// 2. Marking the RecordLocation as a tombstone marker
if (segment == currentSegment) {
validDocIds.remove(currentDocId);
} else {
Objects.requireNonNull(currentSegment.getValidDocIds()).remove(currentDocId);
}
// The reason why we cannot put a null as the record location and need the entire record location of
// the last record
// since it might be needed to handle an out-of-order update request
return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue(), true);
} else {
return currentRecordLocation;
}
}
});
}

@Override
protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds());
Expand Down Expand Up @@ -219,6 +258,12 @@ protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
@Override
protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {
assert _partialUpsertHandler != null;

if (record.getFieldToValueMap()
.containsKey(org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl.TOMBSTONE_KEY)) {
return record;
}

AtomicReference<GenericRow> previousRecordReference = new AtomicReference<>();
RecordLocation currentRecordLocation = _primaryKeyToRecordLocationMap.computeIfPresent(
HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), (pk, recordLocation) -> {
Expand All @@ -243,16 +288,42 @@ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {
}
}

@Override
public boolean isValidForPartialUpsertInsert(GenericRow row, RecordInfo recordInfo) {

if (_partialUpsertHandler == null) {
return true;
}

// Do not check for comparison value as this check will be done upstream anyway
RecordLocation currentRecordLocation =
_primaryKeyToRecordLocationMap.get(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction));

// If this is a new primary key or this is a full insert and the previous record has been deleted, this
// is a valid insert
if (currentRecordLocation != null && currentRecordLocation.getIsTombstoneMarker()) {
return _partialUpsertHandler.isValidInsertRecordForDeletedRow(row);
}

return true;
}

@VisibleForTesting
static class RecordLocation {
private final IndexSegment _segment;
private final int _docId;
private final Comparable _comparisonValue;
private final boolean _isTombstoneMarker;

public RecordLocation(IndexSegment indexSegment, int docId, Comparable comparisonValue) {
this(indexSegment, docId, comparisonValue, false);
}

public RecordLocation(IndexSegment indexSegment, int docId, Comparable comparisonValue, boolean isTombstoneMarker) {
_segment = indexSegment;
_docId = docId;
_comparisonValue = comparisonValue;
_isTombstoneMarker = isTombstoneMarker;
}

public IndexSegment getSegment() {
Expand All @@ -266,5 +337,9 @@ public int getDocId() {
public Comparable getComparisonValue() {
return _comparisonValue;
}

public boolean getIsTombstoneMarker() {
return _isTombstoneMarker;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ public class PartialUpsertHandler {
private final PartialUpsertMerger _defaultPartialUpsertMerger;
private final List<String> _comparisonColumns;
private final List<String> _primaryKeyColumns;
private final Schema _schema;

public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies,
UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> comparisonColumns) {
_defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
_comparisonColumns = comparisonColumns;
_primaryKeyColumns = schema.getPrimaryKeyColumns();
_schema = schema;

for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
_column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue()));
Expand Down Expand Up @@ -73,12 +75,34 @@ public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) {
newRecord.removeNullValueField(column);
} else {
PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger);
newRecord.putValue(column,
merger.merge(previousRecord.getValue(column), newRecord.getValue(column)));
newRecord.putValue(column, merger.merge(previousRecord.getValue(column), newRecord.getValue(column)));
}
}
}
}
return newRecord;
}

/**
* A deleted partial upsert table record can have another insert with the same primary key.
* We do not allow further updates on deleted records but do allow inserting a new record
* with the same primary key.
*
* This check sees if all partial upsert columns are present in the row. If this is true,
* then this is an insert record. There is a chance that this might be an update row which
* just happens to have all fields present.
*
* TODO: Is this correct? Should we introduce an op field in the table schema to distinguish?
*/
public boolean isValidInsertRecordForDeletedRow(GenericRow row) {
for (String column : _schema.getColumnNames()) {
if (!row.getFieldToValueMap().containsKey(column) || row.getFieldToValueMap().get(column) == null) {
if (!row.isNullValue(column)) {
return false;
}
}
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ public interface PartitionUpsertMetadataManager extends Closeable {
*/
void addRecord(MutableSegment segment, RecordInfo recordInfo);

/**
* Delete a given record with a specified primary key. In the RecordInfo instance, set
* the primary key and the corresponding doc id to be deleted
*/
void deleteRecord(MutableSegment segment, RecordInfo recordInfo);

/**
* Replaces the upsert metadata for the old segment with the new immutable segment.
*/
Expand All @@ -84,6 +90,15 @@ public interface PartitionUpsertMetadataManager extends Closeable {
*/
GenericRow updateRecord(GenericRow record, RecordInfo recordInfo);

/**
* Check if given record is valid for partial update insert *if* the primary
* key already exists but the record is tombstoned
*
* We do not allow any modifications to a deleted partial upsert row but allow
* new records to be inserted with the same primary key.
*/
public boolean isValidForPartialUpsertInsert(GenericRow record, RecordInfo recordInfo);

/**
* Stops the metadata manager. After invoking this method, no access to the metadata will be accepted.
*/
Expand Down
Loading