From a88498df3d495aaec935cf6f6ecee9050b413a3e Mon Sep 17 00:00:00 2001 From: Priyen Patel Date: Mon, 4 May 2026 16:21:42 -0400 Subject: [PATCH 1/3] Add UPSERT_NEW_KEYS_INSERTED and UPSERT_EXISTING_KEYS_UPDATED server metrics Emits per-table counters in both ConcurrentMapPartitionUpsertMetadataManager and its consistent-deletes variant to distinguish append (new PK) from update (existing PK) traffic in upsert tables. Out-of-order records are counted by neither metric, consistent with the existing UPSERT_OUT_OF_ORDER meter. Prometheus metric names: pinot_server_upsertNewKeysInserted pinot_server_upsertExistingKeysUpdated Co-authored-by: Priyen Patel --- .../java/org/apache/pinot/common/metrics/ServerMeter.java | 2 ++ .../ConcurrentMapPartitionUpsertMetadataManager.java | 7 +++++++ ...PartitionUpsertMetadataManagerForConsistentDeletes.java | 7 +++++++ 3 files changed, 16 insertions(+) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 2e40a399f988..806803ed24c8 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -73,6 +73,8 @@ public enum ServerMeter implements AbstractMetrics.Meter { UPSERT_MISSED_VALID_DOC_ID_SNAPSHOT_COUNT("segments", false), UPSERT_MISSED_QUERYABLE_DOC_ID_SNAPSHOT_COUNT("segments", false), UPSERT_PRELOAD_FAILURE("count", false), + UPSERT_NEW_KEYS_INSERTED("rows", false), + UPSERT_EXISTING_KEYS_UPDATED("rows", false), ROWS_WITH_ERRORS("rows", false), LLC_CONTROLLER_RESPONSE_NOT_SENT("messages", true), LLC_CONTROLLER_RESPONSE_COMMIT("messages", true), diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index a06bd82395d9..7aa8d9f52e3a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -363,6 +363,7 @@ protected void clearPrevKeyToRecordLocation() { @Override protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { AtomicBoolean isOutOfOrderRecord = new AtomicBoolean(false); + AtomicBoolean isNewKey = new AtomicBoolean(false); ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds()); ThreadSafeMutableRoaringBitmap queryableDocIds = segment.getQueryableDocIds(); int newDocId = recordInfo.getDocId(); @@ -404,11 +405,17 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { } } else { // New primary key + isNewKey.set(true); addDocId(segment, validDocIds, queryableDocIds, newDocId, recordInfo); return new RecordLocation(segment, newDocId, newComparisonValue); } }); + if (isNewKey.get()) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_NEW_KEYS_INSERTED, 1L); + } else if (!isOutOfOrderRecord.get()) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_EXISTING_KEYS_UPDATED, 1L); + } updatePrimaryKeyGauge(); return !isOutOfOrderRecord.get(); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java index 9a1c4f133cb8..992ade78d354 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java @@ -486,6 +486,7 @@ protected void clearPrevKeyToRecordLocation() { @Override protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { AtomicBoolean isOutOfOrderRecord = new AtomicBoolean(false); + AtomicBoolean isNewKey = new AtomicBoolean(false); ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds()); ThreadSafeMutableRoaringBitmap queryableDocIds = segment.getQueryableDocIds(); int newDocId = recordInfo.getDocId(); @@ -537,11 +538,17 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { } } else { // New primary key + isNewKey.set(true); addDocId(segment, validDocIds, queryableDocIds, newDocId, recordInfo); return new RecordLocation(segment, newDocId, newComparisonValue, 1); } }); + if (isNewKey.get()) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_NEW_KEYS_INSERTED, 1L); + } else if (!isOutOfOrderRecord.get()) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_EXISTING_KEYS_UPDATED, 1L); + } updatePrimaryKeyGauge(); return !isOutOfOrderRecord.get(); } From 03d4f3eb489c4329d31e51576039b17fb5e819bf Mon Sep 17 00:00:00 2001 From: Priyen Patel Date: Wed, 6 May 2026 10:56:42 -0400 Subject: [PATCH 2/3] Inline upsert metrics into compute() lambda, add UPSERT_KEYS_DELETED meter Address PR feedback: - Guard insert/update metrics against delete records by moving metric emission inline next to addDocId/replaceDocId inside the compute() lambda. - Add dedicated UPSERT_KEYS_DELETED counter for tombstone ingestion. - Remove isNewKey AtomicBoolean since it's no longer needed. Co-authored-by: Priyen Patel --- .../pinot/common/metrics/ServerMeter.java | 1 + ...rentMapPartitionUpsertMetadataManager.java | 17 ++++++++------ ...rtMetadataManagerForConsistentDeletes.java | 22 +++++++++++++------ 3 files changed, 26 insertions(+), 14 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 806803ed24c8..737a622de470 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -75,6 +75,7 @@ public enum ServerMeter implements AbstractMetrics.Meter { UPSERT_PRELOAD_FAILURE("count", false), UPSERT_NEW_KEYS_INSERTED("rows", false), UPSERT_EXISTING_KEYS_UPDATED("rows", false), + UPSERT_KEYS_DELETED("rows", false), ROWS_WITH_ERRORS("rows", false), LLC_CONTROLLER_RESPONSE_NOT_SENT("messages", true), LLC_CONTROLLER_RESPONSE_COMMIT("messages", true), diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index 7aa8d9f52e3a..c69228dcc9d9 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -363,7 +363,6 @@ protected void clearPrevKeyToRecordLocation() { @Override protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { AtomicBoolean isOutOfOrderRecord = new AtomicBoolean(false); - AtomicBoolean isNewKey = new AtomicBoolean(false); ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds()); ThreadSafeMutableRoaringBitmap queryableDocIds = segment.getQueryableDocIds(); int newDocId = recordInfo.getDocId(); @@ -396,6 +395,11 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { } replaceDocId(segment, validDocIds, queryableDocIds, currentSegment, currentDocId, newDocId, recordInfo); } + if (recordInfo.isDeleteRecord()) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_DELETED, 1L); + } else { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_EXISTING_KEYS_UPDATED, 1L); + } return newRecordLocation; } else { // Out-of-order record @@ -405,17 +409,16 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { } } else { // New primary key - isNewKey.set(true); addDocId(segment, validDocIds, queryableDocIds, newDocId, recordInfo); + if (recordInfo.isDeleteRecord()) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_DELETED, 1L); + } else { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_NEW_KEYS_INSERTED, 1L); + } return new RecordLocation(segment, newDocId, newComparisonValue); } }); - if (isNewKey.get()) { - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_NEW_KEYS_INSERTED, 1L); - } else if (!isOutOfOrderRecord.get()) { - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_EXISTING_KEYS_UPDATED, 1L); - } updatePrimaryKeyGauge(); return !isOutOfOrderRecord.get(); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java index 992ade78d354..01e8523a1bc8 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java @@ -486,7 +486,6 @@ protected void clearPrevKeyToRecordLocation() { @Override protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { AtomicBoolean isOutOfOrderRecord = new AtomicBoolean(false); - AtomicBoolean isNewKey = new AtomicBoolean(false); ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds()); ThreadSafeMutableRoaringBitmap queryableDocIds = segment.getQueryableDocIds(); int newDocId = recordInfo.getDocId(); @@ -509,6 +508,11 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { int currentDocId = currentRecordLocation.getDocId(); if (segment == currentSegment) { replaceDocId(segment, validDocIds, queryableDocIds, currentDocId, newDocId, recordInfo); + if (recordInfo.isDeleteRecord()) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_DELETED, 1L); + } else { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_EXISTING_KEYS_UPDATED, 1L); + } return new RecordLocation(segment, newDocId, newComparisonValue, currentRecordLocation.getDistinctSegmentCount()); } else { @@ -520,6 +524,11 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { _previousKeyToRecordLocationMap.put(primaryKey, currentRecordLocation); } replaceDocId(segment, validDocIds, queryableDocIds, currentSegment, currentDocId, newDocId, recordInfo); + if (recordInfo.isDeleteRecord()) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_DELETED, 1L); + } else { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_EXISTING_KEYS_UPDATED, 1L); + } return new RecordLocation(segment, newDocId, newComparisonValue, RecordLocation.incrementSegmentCount(currentRecordLocation.getDistinctSegmentCount())); } @@ -538,17 +547,16 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { } } else { // New primary key - isNewKey.set(true); addDocId(segment, validDocIds, queryableDocIds, newDocId, recordInfo); + if (recordInfo.isDeleteRecord()) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_DELETED, 1L); + } else { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_NEW_KEYS_INSERTED, 1L); + } return new RecordLocation(segment, newDocId, newComparisonValue, 1); } }); - if (isNewKey.get()) { - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_NEW_KEYS_INSERTED, 1L); - } else if (!isOutOfOrderRecord.get()) { - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_EXISTING_KEYS_UPDATED, 1L); - } updatePrimaryKeyGauge(); return !isOutOfOrderRecord.get(); } From bfb60fc406e523631b0a9dac3d4c1b3c3b752a55 Mon Sep 17 00:00:00 2001 From: Priyen Patel Date: Wed, 6 May 2026 11:47:33 -0400 Subject: [PATCH 3/3] Extract emitUpsertMetrics helper and rename metrics for consistency Consolidate duplicated upsert metric emission into a shared BasePartitionUpsertMetadataManager.emitUpsertMetrics() method. Rename UPSERT_NEW_KEYS_INSERTED/UPSERT_EXISTING_KEYS_UPDATED to UPSERT_KEYS_INSERTED/UPSERT_KEYS_UPDATED for consistent naming. Co-Authored-By: Claude Opus 4.6 Committed-By-Agent: claude --- .../pinot/common/metrics/ServerMeter.java | 4 ++-- .../BasePartitionUpsertMetadataManager.java | 10 ++++++++++ ...rrentMapPartitionUpsertMetadataManager.java | 12 ++---------- ...ertMetadataManagerForConsistentDeletes.java | 18 +++--------------- 4 files changed, 17 insertions(+), 27 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 737a622de470..f69256090725 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -73,8 +73,8 @@ public enum ServerMeter implements AbstractMetrics.Meter { UPSERT_MISSED_VALID_DOC_ID_SNAPSHOT_COUNT("segments", false), UPSERT_MISSED_QUERYABLE_DOC_ID_SNAPSHOT_COUNT("segments", false), UPSERT_PRELOAD_FAILURE("count", false), - UPSERT_NEW_KEYS_INSERTED("rows", false), - UPSERT_EXISTING_KEYS_UPDATED("rows", false), + UPSERT_KEYS_INSERTED("rows", false), + UPSERT_KEYS_UPDATED("rows", false), UPSERT_KEYS_DELETED("rows", false), ROWS_WITH_ERRORS("rows", false), LLC_CONTROLLER_RESPONSE_NOT_SENT("messages", true), diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index 1381e4e2ab01..7f663a9a204b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -1229,6 +1229,16 @@ protected void removeDocId(IndexSegment segment, int docId) { trackUpdatedSegmentsSinceLastSnapshot(segment); } + protected void emitUpsertMetrics(RecordInfo recordInfo, boolean isNewKey) { + if (recordInfo.isDeleteRecord()) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_DELETED, 1L); + } else if (isNewKey) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_INSERTED, 1L); + } else { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_UPDATED, 1L); + } + } + protected void trackUpdatedSegmentsSinceLastSnapshot(IndexSegment segment) { if (_enableSnapshot && segment instanceof ImmutableSegment) { _updatedSegmentsSinceLastSnapshot.add(segment); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index c69228dcc9d9..e58887943143 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -395,11 +395,7 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { } replaceDocId(segment, validDocIds, queryableDocIds, currentSegment, currentDocId, newDocId, recordInfo); } - if (recordInfo.isDeleteRecord()) { - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_DELETED, 1L); - } else { - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_EXISTING_KEYS_UPDATED, 1L); - } + emitUpsertMetrics(recordInfo, false); return newRecordLocation; } else { // Out-of-order record @@ -410,11 +406,7 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { } else { // New primary key addDocId(segment, validDocIds, queryableDocIds, newDocId, recordInfo); - if (recordInfo.isDeleteRecord()) { - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_DELETED, 1L); - } else { - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_NEW_KEYS_INSERTED, 1L); - } + emitUpsertMetrics(recordInfo, true); return new RecordLocation(segment, newDocId, newComparisonValue); } }); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java index 01e8523a1bc8..b8827b7c9a97 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java @@ -508,11 +508,7 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { int currentDocId = currentRecordLocation.getDocId(); if (segment == currentSegment) { replaceDocId(segment, validDocIds, queryableDocIds, currentDocId, newDocId, recordInfo); - if (recordInfo.isDeleteRecord()) { - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_DELETED, 1L); - } else { - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_EXISTING_KEYS_UPDATED, 1L); - } + emitUpsertMetrics(recordInfo, false); return new RecordLocation(segment, newDocId, newComparisonValue, currentRecordLocation.getDistinctSegmentCount()); } else { @@ -524,11 +520,7 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { _previousKeyToRecordLocationMap.put(primaryKey, currentRecordLocation); } replaceDocId(segment, validDocIds, queryableDocIds, currentSegment, currentDocId, newDocId, recordInfo); - if (recordInfo.isDeleteRecord()) { - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_DELETED, 1L); - } else { - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_EXISTING_KEYS_UPDATED, 1L); - } + emitUpsertMetrics(recordInfo, false); return new RecordLocation(segment, newDocId, newComparisonValue, RecordLocation.incrementSegmentCount(currentRecordLocation.getDistinctSegmentCount())); } @@ -548,11 +540,7 @@ protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { } else { // New primary key addDocId(segment, validDocIds, queryableDocIds, newDocId, recordInfo); - if (recordInfo.isDeleteRecord()) { - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_DELETED, 1L); - } else { - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_NEW_KEYS_INSERTED, 1L); - } + emitUpsertMetrics(recordInfo, true); return new RecordLocation(segment, newDocId, newComparisonValue, 1); } });