Skip to content

[PEP] Add compression ratio calculation and per-column compression stats #18184

@johnsolomonj

Description

@johnsolomonj

Description

Problem

Apache Pinot lacks built-in visibility into compression effectiveness and storage costs. Operators today must:

  1. Manually compute compression ratios by comparing externally-estimated raw data sizes against Pinot API/UI numbers
  2. Guess at codec effectiveness when tuning compression settings (LZ4 vs ZSTD vs SNAPPY) — no per-column breakdown exists
  3. No programmatic access to compression metrics for capacity planning and COGS estimation

These gaps become acute as:

Real-World Impact

In a typical deployment with a large table:

  • After codec tuning, the reported per-replica size was ~30 GB with raw data estimated at ~550 GB
  • The compression ratio was manually estimated at ~18x — but there was no way to verify this from Pinot itself
  • Operators had no visibility into which columns contributed most to the compressed size, making it impossible to identify codec optimization opportunities
  • Without per-column stats, tuning decisions (e.g., switching a high-cardinality string column from LZ4 to ZSTANDARD) were based on guesswork rather than data
  • With the proposed API, operators would get the compression ratio and per-column breakdown directly from Pinot — no manual estimation needed

Industry Context: Compression Visibility in Other Systems

Tracking compression effectiveness is a well-established pattern across columnar and analytical databases. Several open-source systems expose this natively:

  • ClickHouse (Apache 2.0): system.parts, system.columns, and system.parts_columns tables expose per-part and per-column data_compressed_bytes, data_uncompressed_bytes, and compression_codec. Computed at write/merge time by storing both sizes in part checksum metadata. Overhead: ~24 bytes per column per part — negligible.
  • PostgreSQL: pg_relation_size() and pg_total_relation_size() provide table/index on-disk size. The TOAST system compresses large values transparently. pg_column_size() returns the stored (possibly compressed) size of an individual datum — useful for per-row introspection but not a table-wide compressed-vs-uncompressed accounting mechanism. PostgreSQL does not provide native persisted uncompressed-size tracking or compression-ratio rollups; operators estimate ratios externally.
  • Apache Cassandra: nodetool tablestats reports Space used (live) (compressed on-disk) and SSTable Compression Ratio per table. Per-SSTable compression metadata is tracked in CompressionInfo.db. The compression ratio is surfaced via nodetool CLI tooling, not via a CQL-queryable system table.
  • Apache Druid: Segment metadata queries return per-column size stats (e.g., column size within a segment). Druid does not natively persist pre-compression uncompressed sizes or compute compression ratios — these must be estimated externally from ingestion metrics vs stored segment sizes.

The common design pattern across all these systems is: track uncompressed size at write time (not query time), persist it alongside the data, and compute ratios at reporting time. This proposal follows the same pattern for Pinot.

What Pinot Has Today

Metric Status Notes
Per-segment total disk size Available GET /tables/{table}/size — sums sizes from servers for segments currently loaded on those servers
Per-replica segment size Available reportedSizePerReplicaInBytes field in API response (only includes segments loaded on servers, not remote-only tiered segments)
Per-column per-index compressed size Available ColumnMetadata.getIndexSizeFor(type) — stored in V3 index_map.txt
Column cardinality, min/max Available ColumnMetadata
Per-column uncompressed size NOT TRACKED Forward index writer discards this information
Compression ratio NOT AVAILABLE Cannot be computed without uncompressed size
Compression codec per segment NOT IN METADATA Configured in ForwardIndexConfig but not persisted in segment metadata

Size Reporting Nuances

  • reportedSizeInBytes = sum from ALL servers (includes all replicas)
  • reportedSizePerReplicaInBytes = approximates single-replica footprint
  • Segment count in UI = unique segments (NOT multiplied by replicas)
  • Tiered storage (OSS): OSS Pinot tiered storage is server-tag based — segments are moved between local data directories associated with different server tags via TierBasedSegmentDirectoryLoader. This is a local directory move, not an upload to S3. Remote object-store tiers (S3) are a managed/vendor extension, not part of OSS.
  • reportedSizePerReplicaInBytes only covers segments currently loaded on servers. Segments that have been moved off-server (remote tier, deep store only) are NOT included in this number.
  • Limitation: Upsert tables cannot configure multiple tiers (TableConfigUtils enforces this).

REALTIME, OFFLINE, and hybrid tables can use tiered storage (except upsert tables). Once segments age past the TierConfig.segmentAge threshold, they are moved to the configured tier's data directory.


Proposed Solution

Scope

This proposal focuses on OSS Pinot backend API changes — extending existing controller/server API responses with compression and storage analytics data. No UI changes are proposed in this phase. UI integration can be wired up in a follow-up once the API is available and validated by operators.

UI integration readiness (follow-up phase): The API changes are designed to be directly consumable by the Pinot Console UI:

  • Table size display: The UI currently shows "Reported Size" and "Estimated Size" on the table detail page (TenantDetails.tsx) and table listing page (AsyncPinotTables.tsx) by calling GET /tables/{tableName}/size, which returns TableSizeReader.TableSizeDetails. This PEP adds compressionStats and storageBreakdown to that same response class — the UI follow-up just needs to read the new fields. No new API calls needed for the overview.
  • Per-column breakdown: columnCompressionStats is available on BOTH endpoints (GET /tables/{tableName}/size and GET /tables/{tableName}/metadata). The table detail page does not currently call the metadata endpoint for size data, so the UI follow-up would use the existing size endpoint or add a call to the metadata API. This could be rendered as an expandable table below the existing size summary. Important limitation: The current metadata endpoint (getTableAggregateMetadata in PinotTableRestletResource.java) explicitly rejects REALTIME tables (returns 501). Since TenantDetails.tsx renders both OFFLINE and REALTIME tables in the same view, the UI follow-up must either: (a) extend the metadata endpoint to support REALTIME tables, or (b) only show per-column stats for OFFLINE tables with a "not available for realtime" indicator.
  • Backward compatibility: New JSON fields are additive. The existing UI TypeScript type (TableSize in types.d.ts) ignores unknown fields — the current UI continues to work unchanged until updated. No breaking changes.
  • Where to display: The natural placement is on the table detail page summary section (lines 800-878 in TenantDetails.tsx), adjacent to the existing "Reported Size" / "Estimated Size" display. Compression ratio, forward index sizes, storage breakdown, and segmentsWithStats / totalSegments coverage can all be shown with Utils.formatBytes() for human-readable formatting.
  • Note on tooltip: The current UI tooltip for "Reported Size" says "Uncompressed size of all data segments with replication" — this is misleading (it's actually the on-disk compressed size). The UI follow-up should correct this tooltip and add the actual uncompressed forward index size from the new API fields.

Note on pinned indexes: Some managed Pinot deployments (e.g., StarTree) support index-level pinning for tiered storage — where specific indexes (inverted, bloom, etc.) are cached locally while the full segment remains on remote storage (S3). OSS Pinot does not have this capability; its TierBasedSegmentDirectoryLoader moves entire segments between tiers. Pinned index size reporting and breakdown by index type is therefore out of scope for this OSS proposal and can be addressed separately by managed Pinot vendors.

Feature Flag

The feature is gated at the table level via TableConfig, disabled by default:

{
  "tableName": "events_OFFLINE",
  "indexingConfig": {
    "compressionStatsEnabled": true
  }
}

Why table-level, not cluster-level? Offline batch ingestion segments are built externally via SegmentIndexCreationDriverImpl running in Hadoop/Spark/standalone jobs. These processes receive their configuration through SegmentGeneratorConfig (which is derived from TableConfig), and do not read controller/server cluster configs. A cluster-level flag would have no effect on offline batch-created segments. Table-level config propagates correctly to all ingestion paths:

  • Offline batch ingestion: SegmentGeneratorConfig reads from TableConfig → flag available in external jobs
  • Realtime streaming: RealtimeSegmentDataManager passes _tableConfig into RealtimeSegmentConverterSegmentGeneratorConfig → flag available at segment commit time
  • Minion tasks: SegmentProcessorConfig carries TableConfig into SegmentGeneratorConfig → flag available during segment processing

New plumbing required: Today, SegmentGeneratorConfig only copies a fixed list of IndexingConfig properties and does not expose arbitrary config fields to forward index writers. IndexCreationContext (used by BaseSegmentCreator to construct index creators) also has no hook for this flag. The implementation must:

  1. Add compressionStatsEnabled field to IndexingConfig (no @JsonProperty needed — consistent with all other fields in the class; Jackson discovers via getter/setter names, defaults to false)
  2. Plumb the flag through SegmentGeneratorConfigIndexCreationContext (via IndexCreationContext.Builder in SegmentIndexCreationDriverImpl) so that BaseSegmentCreator and forward index writers can read it
  3. Forward index writers check the flag before tracking uncompressed bytes — when false, skip all tracking (zero overhead)

When disabled (default):

  • No uncompressed sizes are captured during segment creation — zero overhead on ingestion
  • Compression ratio and per-column stats fields are absent from API responses
  • Existing behavior is unchanged

When enabled:

  • Uncompressed sizes and codec are tracked during segment creation and persisted to metadata.properties
  • API responses include compressionStats, storageBreakdown, and columnCompressionStats
  • Only newly created segments will have uncompressed size data. Existing segments return INDEX_NOT_FOUND (-1) until they are rebuilt (e.g., via segment refresh, minion task, or re-ingestion)

Toggle OFF after ON: If the flag is disabled after being enabled, the behavior is:

  • Ingestion stops tracking — new segments created after the flag is turned off will NOT have uncompressed sizes (zero overhead)
  • Existing metadata preserved — segments already created with uncompressed sizes retain those fields in metadata.properties (no retroactive deletion)
  • API suppresses stats — when the flag is OFF, the controller does NOT include compressionStats or columnCompressionStats in API responses, even if some segments have the metadata. This avoids confusion where stats appear for a table whose operator has explicitly opted out. The storageBreakdown (tier info) is always reported regardless of the flag since it doesn't depend on ingestion-time tracking.

1. Track Uncompressed Sizes at Segment Creation

Today, raw (uncompressed) data size is not tracked anywhere in Pinot. Operators must estimate it externally (e.g., from source system metrics or ingestion row counts × avg row size). This proposal makes it computable directly from segment metadata.

Ingestion paths covered:

Pinot has multiple ingestion paths that produce immutable segments. The design must cover all of them:

Ingestion Path How Segments Are Created Forward Index Writer
Offline batch ingestion SegmentIndexCreationDriverImpl BaseChunkForwardIndexWriter (raw columns), FixedBitSVForwardIndexWriter / FixedBitMVForwardIndexWriter (dict-encoded)
Realtime streaming (Kafka, Kinesis) MutableSegmentImpl (in-memory) → RealtimeSegmentConverter on commit In-memory MutableForwardIndex while consuming; on commit, goes through SegmentIndexCreationDriverImpl — same as offline
Minion tasks (MergeRollup, RealtimeToOffline) SegmentProcessorFrameworkSegmentIndexCreationDriverImpl Same as offline
CLP compressed columns CLPForwardIndexCreatorV2 Implements ForwardIndexCreator — does NOT extend any chunk writer. Internally owns multiple sub-writers: FixedByteChunkForwardIndexWriter (logtype IDs), VarByteChunkForwardIndexWriterV5 (dict var IDs, encoded vars, raw messages). Each sub-writer must be instrumented individually; summing their uncompressed sizes at close() gives the total for the CLP column.
Iceberg ingestion (StarTree) Pseudo segments with Parquet as source of truth, indexes built separately Needs investigation — may use a different segment creation path

Key insight: All paths that produce persisted immutable segments converge at SegmentIndexCreationDriverImplBaseSegmentCreator. This is the right place to capture uncompressed sizes. Realtime mutable segments don't need tracking — they're in-memory and temporary; uncompressed sizes are captured when they commit to immutable form.

Writers that need instrumentation:

Uncompressed size tracking is limited to raw (non-dictionary) columns in this initial implementation:

  1. Raw columns: BaseChunkForwardIndexWriter and subclasses, plus VarByteChunkForwardIndexWriterV4/V5 — track the cumulative raw ingested data size (sum of actual value byte lengths as received, with no Pinot-internal formatting overhead such as chunk headers, length prefixes, or alignment padding). For fixed-width types this equals rows × dataTypeBytes. For variable-width types (STRING, BYTES) this equals the sum of UTF-8 byte lengths of the values themselves.
  2. Dictionary-encoded columns: Out of scope for uncompressed size tracking in this phase. The forward index writers (FixedBitSVForwardIndexWriter, FixedBitMVForwardIndexWriter) only see dictionary IDs, not original raw values. Capturing true uncompressed sizes for dict-encoded columns would require instrumenting the stats collection phase (before dictionary encoding), which is a larger change. These columns will report INDEX_NOT_FOUND for uncompressed size.

Changes:

  • Extend ColumnMetadata interface in pinot-segment-spi with:
    • getUncompressedForwardIndexSizeBytes() — raw data size before compression
    • getCompressionCodec() — codec used for this column's forward index
  • Persist to metadata.properties during segment creation:
    column.userId.forwardIndex.uncompressedSizeBytes = 419430400
    column.userId.forwardIndex.compressionCodec = LZ4
  • Track cumulative raw ingested data size (sum of actual value byte lengths, no chunk-format overhead) in forward index writers (BaseChunkForwardIndexWriter and subclasses for raw columns)
  • Store as dedicated fields in ColumnMetadataImpl (new _uncompressedForwardIndexSizeBytes long field and _compressionCodec String field) — NOT in _indexTypeSizeList, which is a packed structure (2-byte index type ID + 6-byte size) only for real index entries from index_map.txt and has no room for a codec string or a synthetic "uncompressed" index type
  • BaseSegmentCreator.writeMetadata() persists the new fields — covers all ingestion paths that go through the standard driver
  • All tracking is gated by the table-level config flag (compressionStatsEnabled in TableConfig.indexingConfig) — when disabled, writers skip uncompressed byte tracking and writeMetadata() does not persist the new fields. The flag propagates to offline jobs via SegmentGeneratorConfig.

Backward compatibility: New fields are additive. Old segments without these fields return INDEX_NOT_FOUND. No changes to existing behavior. The feature flag ensures zero impact on clusters that don't opt in.

2. Compute Compression Ratio

The compression ratio is a pure measure of codec effectiveness — how well the compressor shrinks forward index data:

Compression Ratio = Sum of uncompressed forward index sizes / Sum of compressed forward index sizes (per replica)

Where:

  • Numerator = sum of uncompressedForwardIndexSizeBytes (raw ingested data size — actual value byte lengths, no chunk-format overhead) across all columns per segment, aggregated across segments for one replica (newly tracked in Section 1)
  • Denominator = sum of compressed forward index sizes per column per segment, using ColumnMetadata.getIndexSizeFor(StandardIndexes.forward()), aggregated across segments for one replica

Important: The denominator uses per-column forward index compressed sizes (from index_map.txt), NOT reportedSizePerReplicaInBytes. Using reportedSizePerReplicaInBytes would be incorrect because it includes dictionaries, inverted indexes, bloom filters, star-tree indexes, and metadata — components not represented in the raw size numerator. This would artificially deflate the ratio.

Why per-replica, not all replicas? Compression ratio measures codec effectiveness on a single copy of the data. Using all-replica totals would inflate both numerator and denominator equally, producing the same ratio. Per-replica is also consistent with how other OLAP systems compute this.

Handling segments without stats: Segments that lack uncompressed size metadata (old segments created before the flag was enabled, or dict-encoded-only segments) are excluded from both numerator and denominator. They are NOT treated as zero — treating them as zero would drastically under-report the ratio. The API response includes segmentsWithStats / totalSegments so operators know what fraction of the table's data the ratio covers. When all segments have stats, the ratio represents the full table.

This gives operators a clear measure of how effectively their forward index data is being compressed.

3. Expose Storage Breakdown in Existing API Responses

Extend the existing table size and table metadata API responses (e.g., GET /tables/{table}/size, GET /tables/{table}/metadata) with additional fields. No new endpoints — existing responses gain new optional fields.

Tiered storage breakdown:

  • Number and size of segments on default tier (local disk) vs configured named tiers
  • Helps operators understand storage distribution across tiers and estimate cost by storage class
  • Useful for capacity planning and deciding when to tier

How tier metadata is collected: TableSizeReader currently consumes SegmentSizeInfo (segment name + size per server) via ServerTableSizeReader, but SegmentSizeInfo has no tier field. Two approaches:

  1. Extend SegmentSizeInfo with a tier field — each server already knows which tier a segment resides on (tracked via .tier marker files written by TierBasedSegmentDirectoryLoader). The server's TableSizeResource reads segments from the data manager, which has access to the segment's tier placement. This requires changes to both server-side reporting and controller-side aggregation.
  2. Use TableTierReader (already exists in pinot-controller) — queries servers via ServerTableTierReader for per-segment tier info (uses Helix only to discover which servers to query, not for tier data itself). This avoids modifying SegmentSizeInfo but requires a separate server fan-out. Can be combined with size data at the controller level.

The recommended approach is option 1 (extend SegmentSizeInfo) since it co-locates size and tier data in a single server response, avoiding an extra round-trip.

Example API response fields (added to existing table metadata response):

{
  "tableName": "events_OFFLINE",
  "reportedSizePerReplicaInBytes": 34778071450,
  "compressionStats": {
    "rawForwardIndexSizePerReplicaInBytes": 639468544000,
    "compressedForwardIndexSizePerReplicaInBytes": 32395724800,
    "compressionRatio": 19.74,
    "segmentsWithStats": 801,
    "totalSegments": 801,
    "isPartialCoverage": false
  },
  "columnCompressionStats": [ "..." ],
  "storageBreakdown": {
    "tiers": {
      "default": { "count": 0, "sizePerReplicaInBytes": 0 },
      "coldTier": { "count": 801, "sizePerReplicaInBytes": 34778071450 }
    }
  }
}

Design notes on the response structure:

  • compressionStats, columnCompressionStats, and storageBreakdown are separate sibling fields on TableSubTypeSizeDetails (nested under offlineSegments / realtimeSegments in the size API response) and on TableMetadataInfo (metadata API response). columnCompressionStats is NOT nested inside compressionStats — they are siblings.
  • columnCompressionStats is a sorted List (by column name) in external API responses (controller → client). The server→controller transport uses a Map<String, ColumnCompressionStatsInfo> (keyed by column name) on the size endpoint path (SegmentSizeInfo) for efficient aggregation, and a List<ColumnCompressionStatsInfo> on the metadata endpoint path (TableMetadataInfo). The controller converts to sorted list before returning to clients on both paths.
  • storageBreakdown.tiers is keyed by tier name (matching TierConfig.name from TableConfig). The "default" key represents segments on the default local disk tier. Tables can declare multiple named tiers via List<TierConfig>, so the response must support arbitrary tier names rather than hardcoding "local" vs "tiered" buckets. This allows the UI to render a per-tier table dynamically.
  • compressionStats.isPartialCoverage is a boolean flag. On the size endpoint path: true when segmentsWithStats < (totalSegments - missingSegments) — missing segments (not loaded on any server) are excluded from the coverage denominator. On the metadata endpoint path: true when segmentsWithStats < totalSegments (no missing-segment adjustment, since the metadata aggregation doesn't track missing segments). The UI can format its own message based on this structured flag rather than displaying a backend-authored free-form note string.
  • compressedForwardIndexSizePerReplicaInBytes (sum of ColumnMetadata.getIndexSizeFor(forward) across all columns and segments) is different from reportedSizePerReplicaInBytes (total on-disk size including all index types and metadata). The compression ratio uses only the forward index sizes.
  • Immutable DTOs: REST resource DTOs (CompressionStatsSummary, ColumnCompressionStatsInfo, StorageBreakdownInfo) follow Pinot's immutable DTO convention: private final fields, @JsonCreator constructor, no setters. Controller-internal aggregation classes (CompressionStats in TableSizeReader, TableSubTypeSizeDetails) remain mutable (pre-existing Pinot pattern for internal state). Feature flag suppression is handled by constructing DTOs with null at creation time and letting @JsonInclude(NON_NULL) suppress them from JSON — never by casting to ObjectNode and calling .remove() after construction.

Human-readable interpretation:

Compression (per replica, forward index only)
  Raw forward index size:            595.49 GB
  Compressed forward index size:      30.17 GB
  Compression ratio:                  19.74x

Total on-disk size (per replica):     32.39 GB  (includes dicts, inverted, bloom, metadata)

Storage Breakdown (per replica)
  default:       0 segments  (  0.00 GB)
  coldTier:    801 segments  ( 32.39 GB)

Note: compressionStats is gated by the compressionStatsEnabled feature flag — when the flag is OFF or segmentsWithStats == 0, this field is null (suppressed by @JsonInclude(NON_NULL)). columnCompressionStats is preserved even when segmentsWithStats == 0 to provide dict-only column coverage (compressed size, hasDictionary, indexes). storageBreakdown is independent of the compression flag. Non-tiered segments are bucketed into "default", so the breakdown is populated for any table with loaded segments (it is null only when the tier accumulator is empty, e.g., no segments loaded at all).

4. Per-Column Compression Stats API

Expose per-column compression breakdown via BOTH existing endpoints (GET /tables/{table}/size and GET /tables/{table}/metadata). This helps operators identify:

  • Columns with poor compression ratios (candidates for codec change)
  • Columns where dictionary encoding helps vs hurts
  • Which indexes consume the most space per column (for cost-vs-performance decisions)

Example API response (available on both size and metadata endpoints):

{
  "tableName": "events_OFFLINE",
  "columnCompressionStats": [
    {
      "column": "message",
      "compressedSizeInBytes": 16106127360,
      "uncompressedSizeInBytes": 48318382080,
      "compressionRatio": 3.0,
      "codec": "LZ4",
      "hasDictionary": false,
      "indexes": ["forward_index", "inverted_index"]
    },
    {
      "column": "request_id",
      "compressedSizeInBytes": 2147483648,
      "uncompressedSizeInBytes": 34359738368,
      "compressionRatio": 16.0,
      "codec": "ZSTANDARD",
      "hasDictionary": false,
      "indexes": ["forward_index"]
    }
  ]
}

This tells operators: "the message column has only 3x compression with LZ4 — switching to ZSTANDARD could significantly reduce the 15 GB footprint."

5. System Table Integration (Future, depends on #17291)

Once system tables infrastructure lands (tracking in #17291), expose the above as SQL-queryable tables:

  • system.segments_storage — per-segment storage breakdown including tier info
  • system.columns_compression — per-column per-segment compression stats with codec and index info

This is a future enhancement dependent on system tables being available and is not part of the initial implementation.

6. Controller Metrics

Prometheus-compatible gauges for monitoring and alerting:

  • TABLE_COMPRESSION_RATIO_PERCENT ("TableCompressionRatioPercent", global = false) — ratio × 100 (long)
  • TABLE_RAW_FORWARD_INDEX_SIZE_PER_REPLICA — sum of uncompressed forward index sizes per replica
  • TABLE_COMPRESSED_FORWARD_INDEX_SIZE_PER_REPLICA — sum of compressed forward index sizes per replica
  • TABLE_TIERED_STORAGE_SIZE — size of data per tier (including "default" local-disk tier), emitted with tier-suffixed key (e.g., "table_OFFLINE.coldTier", "table_OFFLINE.default")

Emitted via ControllerMetrics.setValueOfTableGauge(). Only emitted when segmentsWithStats > 0 AND compressedSize > 0. Cleared via removeTableGauge() when: flag OFF, OR segmentsWithStats drops to 0, OR table deleted. Tier gauges are independent of compression flag — _emittedTierKeys tracks stale keys for cleanup via clearTierMetrics(), called from SegmentStatusChecker.removeMetricsForTable().


Implementation Breakdown

Data Flow Overview

┌─────────────────────────────────────────────────────────────────────┐
│  SEGMENT CREATION (Ingestion)                                       │
│  Forward index writers → track uncompressed bytes per column         │
│  BaseSegmentCreator.writeMetadata() → persist to metadata.properties│
└──────────────────────────────┬──────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────────────┐
│  METADATA STORAGE (Per-Segment, on disk)                            │
│  metadata.properties: column.<col>.forwardIndex.uncompressedSizeBytes│
│  metadata.properties: column.<col>.forwardIndex.compressionCodec    │
│  ColumnMetadataImpl: loaded at segment load time                     │
└──────────────────────────────┬──────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────────────┐
│  SERVER (Per-Server Reporting)                                       │
│  TableSizeResource → report segment sizes + uncompressed sizes       │
└──────────────────────────────┬──────────────────────────────────────┘
                               │  HTTP fan-out
                               ▼
┌─────────────────────────────────────────────────────────────────────┐
│  CONTROLLER (Aggregation + Computation)                              │
│  ServerTableSizeReader → collect from all servers                     │
│  TableSizeReader → compute compression ratio (raw/compressed)        │
│                  → aggregate tiered storage breakdown                 │
│  Existing REST endpoints → return enriched response                  │
└─────────────────────────────────────────────────────────────────────┘

Phase 1: Track Uncompressed Sizes at Ingestion

Goal: Capture raw (uncompressed) byte count per column during segment creation.

Where data is captured:

Component File What to Change
Forward index writer (V1-V3) pinot-segment-local/.../io/writer/impl/BaseChunkForwardIndexWriter.java Provides _uncompressedSize, _trackUncompressedSize, getUncompressedSize(), and setTrackUncompressedSize(boolean). Does NOT track in writeChunk() (which sees _chunkBuffer.remaining() including chunk headers) — subclasses track per-value at their put*() callsites.
Fixed-byte writer pinot-segment-local/.../io/writer/impl/FixedByteChunkForwardIndexWriter.java Tracks per-value: putInt += Integer.BYTES, putLong += Long.BYTES, putFloat += Float.BYTES, putDouble += Double.BYTES. Each guarded by _trackUncompressedSize.
Var-byte writer (V1-V3) pinot-segment-local/.../io/writer/impl/VarByteChunkForwardIndexWriter.java Tracks in putBytes(byte[] value): _uncompressedSize += value.length. Guarded by _trackUncompressedSize.
Var-byte writer V4 pinot-segment-local/.../io/writer/impl/VarByteChunkForwardIndexWriterV4.java Does NOT extend BaseChunkForwardIndexWriter — has its own _uncompressedSize field. Tracks in putBytes(byte[] bytes): _uncompressedSize += bytes.length. NOT in write() (which sees the rearranged chunk buffer including doc count + offset table). writeHugeChunk() also covered via putBytes().
Var-byte writer V5 pinot-segment-local/.../io/writer/impl/VarByteChunkForwardIndexWriterV5.java Extends V4 — covered by V4's putBytes() tracking.
Var-byte writer V6 pinot-segment-local/.../io/writer/impl/VarByteChunkForwardIndexWriterV6.java Extends V5. Delta-encodes chunk headers when compression is enabled (cumulative offsets → per-entry sizes for better ZSTD/LZ4 compressibility). Inherits V4's putBytes() uncompressed size tracking via V5.
Dict-encoded SV pinot-segment-local/.../io/writer/impl/FixedBitSVForwardIndexWriter.java Out of scope — writes dictionary IDs only, not raw values. Reports INDEX_NOT_FOUND for uncompressed size.
Dict-encoded MV pinot-segment-local/.../io/writer/impl/FixedBitMVForwardIndexWriter.java Out of scope — same as above, writes dictionary IDs only.
Sorted forward index pinot-segment-local/.../segment/creator/impl/fwd/SingleValueSortedForwardIndexCreator.java Out of scope — dict-encoded sorted column, no raw values available at writer level.
CLP V1 pinot-segment-local/.../segment/creator/impl/fwd/CLPForwardIndexCreatorV1.java Not instrumented for uncompressed size — uses SPI default no-op (getUncompressedSize() returns 0). However, codec IS still persisted to metadata.properties (the codec resolution runs for all raw columns regardless of uncompressed size). CLP V1 columns will have compressionCodec = PASS_THROUGH but uncompressedSizeBytes absent.
CLP V2 pinot-segment-local/.../segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java Implements ForwardIndexCreator with sub-writers: FixedByteChunkForwardIndexWriter (logtype IDs), VarByteChunkForwardIndexWriterV5 (dict var IDs, encoded vars, raw messages). getUncompressedSize() sums sub-writer sizes. setTrackUncompressedSize() propagates to all sub-writers. Note: In CLP-encoded mode, the reported "uncompressed size" is the pre-compression sub-stream byte total (encoded logtype IDs, dictVar IDs, encodedVars), NOT the original UTF-8 message size. Only the raw fallback path (_rawMsgFwdIndex.putBytes(message)) tracks true message bytes. This is documented and intentional.
Segment creator pinot-segment-local/.../segment/creator/impl/BaseSegmentCreator.java In writeMetadata(): persist uncompressed size and codec to metadata.properties. Codec resolution uses ForwardIndexType.resolveCompressionType() shared utility (handles CLP codec variants).
Codec resolution pinot-segment-local/.../segment/index/forward/ForwardIndexType.java New shared utility resolveCompressionType(ForwardIndexConfig, @Nullable FieldType): checks CompressionCodec enum first (CLP→PASS_THROUGH, CLPV2/CLPV2_ZSTD→ZSTANDARD, CLPV2_LZ4→LZ4), then getChunkCompressionType(), then field-type default. Used by both BaseSegmentCreator and ForwardIndexHandler.
ForwardIndexConfig pinot-segment-spi/.../index/ForwardIndexConfig.java _chunkCompressionType is @JsonIgnore — the codec is known at creation time but not persisted. Resolved at persist time via ForwardIndexType.resolveCompressionType().

Storage: metadata.properties file inside each segment directory. This is an existing file — we add new key-value pairs:

column.userId.forwardIndex.uncompressedSizeBytes = 419430400
column.userId.forwardIndex.compressionCodec = LZ4

Tech stack: Pure Java, no new dependencies. Uses existing CommonsConfigurationUtils.saveToFile() for persistence.

Phase 2: Load Uncompressed Metadata at Segment Load

Goal: Make uncompressed sizes and codec available at runtime via ColumnMetadata.

Component File What to Change
ColumnMetadata SPI pinot-segment-spi/.../spi/ColumnMetadata.java Add getUncompressedForwardIndexSizeBytes() and getCompressionCodec() to interface.
ColumnMetadataImpl pinot-segment-spi/.../spi/index/metadata/ColumnMetadataImpl.java Read new properties in fromPropertiesConfiguration() (line ~264). Store in new fields. Builder gets corresponding setters.
SegmentMetadataImpl pinot-segment-spi/.../spi/index/metadata/SegmentMetadataImpl.java No direct changes — delegates to ColumnMetadataImpl per column.

Backward compatibility: fromPropertiesConfiguration() must be updated to read the new properties with explicit defaults. Currently it reads each key without a fallback, so missing keys would throw. The implementation must use config.getLong("column.<col>.forwardIndex.uncompressedSizeBytes", ColumnMetadata.INDEX_NOT_FOUND) (where INDEX_NOT_FOUND = -1 is the existing constant in ColumnMetadata) to gracefully handle old segments that lack these fields. Similarly, compressionCodec defaults to null when absent.

Phase 3: Controller Aggregation and Compression Ratio Computation

Goal: Controller collects uncompressed sizes from segment metadata, computes compression ratio, exposes via existing APIs.

Component File What to Change
ServerTableSizeReader pinot-controller/.../api/resources/ServerTableSizeReader.java getSegmentSizeInfoFromServers() (line ~52): already fans out HTTP GETs to servers. Deserialize new uncompressed size fields from responses.
TableSizeReader pinot-controller/.../util/TableSizeReader.java In getTableSubtypeSize() (line ~246): compute compression ratio from aggregated raw vs compressed sizes. Add new fields to TableSubTypeSizeDetails and TableSizeDetails. Tracks _emittedTierKeys (ConcurrentHashMap) for stale tier gauge cleanup. emitTierMetrics() checks leadership on the base tableNameWithType (not tier-suffixed key), then emits with the suffixed key. clearTierMetrics() removes all previously emitted tier keys for a table.
TableSubTypeSizeDetails pinot-controller/.../util/TableSizeReader.java (inner class) Add compressionStats object (with rawForwardIndexSizePerReplicaInBytes, compressedForwardIndexSizePerReplicaInBytes, compressionRatio, segmentsWithStats, totalSegments, isPartialCoverage), columnCompressionStats (sorted List), and storageBreakdown — all as sibling fields on TableSubTypeSizeDetails (nested under offlineSegments / realtimeSegments). compressionStats is null when flag OFF or segmentsWithStats == 0. columnCompressionStats is preserved even when segmentsWithStats == 0 (for dict-only column coverage). storageBreakdown populated for any table with loaded segments (non-tiered segments bucketed into "default").
ServerSegmentMetadataReader pinot-controller/.../util/ServerSegmentMetadataReader.java Aggregates per-column stats across replicas. Dict sentinel preservation: accumulation uses >= 0 guard before summing — INDEX_NOT_FOUND (-1) sentinels are never summed or divided. Output reconstructs -1 for dict columns with no accumulated data. Negative ratio guard: compression ratio only computed when uncompressed > 0 && compressed > 0. compressionStatsSummary only constructed when segmentsWithStats > 0 after replica division. Old raw segment exclusion: columns where codec == null && !hasDictionary are skipped (pre-feature raw segments).
TableSize endpoint pinot-controller/.../api/resources/TableSize.java getTableSize() (line ~84): no code changes needed — it already returns TableSizeDetails which will now have the new fields.
TableMetadataInfo pinot-common/.../restlet/resources/TableMetadataInfo.java Add compressionStats, columnCompressionStats, and storageBreakdown fields for the metadata endpoint response. Immutable DTO: private final fields, @JsonCreator constructor, @JsonInclude(NON_NULL) on nullable fields.
PinotTableRestletResource pinot-controller/.../api/resources/PinotTableRestletResource.java getTableAggregateMetadata() (line ~1465): wire in compression stats, per-column stats, and storage breakdown via TableMetadataReaderServerSegmentMetadataReader (not TableSizeReader). Passes compressionStatsEnabled flag. Safety-net flag check: construct DTOs with null when flag OFF, let @JsonInclude(NON_NULL) suppress them.
SegmentStatusChecker pinot-controller/.../helix/SegmentStatusChecker.java removeMetricsForTable() now calls clearTierMetrics() on the TableSizeReader to remove stale tier gauge keys when a table is deleted.

Compression ratio computation (in ServerSegmentMetadataReader):

// Per-column accumulation across replicas with sentinel preservation
if (uncompressedSize >= 0) {
  accumulatedUncompressed += uncompressedSize;  // only accumulate non-sentinel values
}
// Skip old raw segments (no codec, not dict-encoded)
if (codec == null && !hasDictionary) {
  continue;  // pre-feature raw segment column — exclude from stats
}
// Ratio computation with negative guard
if (accumulatedUncompressed > 0 && accumulatedCompressed > 0) {
  ratio = (double) accumulatedUncompressed / accumulatedCompressed;
}
// compressionStatsSummary only when data exists
if (segmentsWithStats > 0) {
  compressionStatsSummary = new CompressionStatsSummary(...);
}

Tech stack: Jackson JSON serialization for response DTOs. Existing Executor thread pool for parallel server fan-out. No new dependencies.

Phase 4: Per-Column Compression Stats

Goal: Expose per-column compression breakdown via BOTH existing endpoints (GET /tables/{table}/size and GET /tables/{table}/metadata).

Component File What to Change
Server metadata endpoint pinot-server/.../api/resources/TablesResource.java Extend existing to return per-column metadata including uncompressed sizes, codec, and index list per column. Iterate SegmentMetadata.getColumnMetadataMap(). Old raw segments without persisted codec are excluded. Mixed codecs across segments are merged as "MIXED".
ServerSegmentMetadataReader pinot-controller/.../util/ServerSegmentMetadataReader.java Aggregate per-column stats across replicas: sum compressed/uncompressed sizes (with >= 0 sentinel guard), merge codecs (with "MIXED" detection), collect indexes. Dict sentinel -1 preserved through accumulation.
TableMetadataInfo pinot-common/.../restlet/resources/TableMetadataInfo.java Add columnCompressionStats as List<ColumnCompressionStatsInfo> sorted by column name, with per-column compression ratio, codec, hasDictionary, indexes. Includes raw columns with persisted codec and dict-encoded columns with forward index size. Old raw segments without persisted codec (codec == null && !hasDictionary) are excluded. When a column has different codecs across segments/replicas, the codec field is reported as "MIXED".
TableMetadataReader pinot-controller/.../util/TableMetadataReader.java Delegates to ServerSegmentMetadataReader for per-column aggregation. Passes compressionStatsEnabled flag through to control stats collection.

Phase 5: Controller Metrics (Prometheus)

Component File What to Change
ControllerGauge pinot-common/.../metrics/ControllerGauge.java Add new gauge enums: TABLE_COMPRESSION_RATIO_PERCENT (ratio × 100, long), TABLE_RAW_FORWARD_INDEX_SIZE_PER_REPLICA, TABLE_COMPRESSED_FORWARD_INDEX_SIZE_PER_REPLICA, TABLE_TIERED_STORAGE_SIZE. Gauge name is "TableCompressionRatioPercent" with global = false (per-table).
TableSizeReader pinot-controller/.../util/TableSizeReader.java Emit metrics via _controllerMetrics.setValueOfTableGauge() (already done for existing size metrics at line ~113). Tier gauge lifecycle: _emittedTierKeys (ConcurrentHashMap<String, Set<String>>) tracks emitted tier gauge keys per table. emitTierMetrics() checks leader status on the base tableNameWithType (not the tier-suffixed key — isLeaderForTable("table.tier") would never match), then emits with the suffixed key and records it in _emittedTierKeys. clearTierMetrics() iterates all recorded keys for a table and calls removeTableGauge() on each, preventing stale gauges when tiers are removed or tables deleted. Called from SegmentStatusChecker.removeMetricsForTable().

Known Corner Cases

Corner Case Impact Mitigation
VarByteChunkForwardIndexWriterV4/V5 does not extend BaseChunkForwardIndexWriter Uncompressed bytes not captured if only base class is instrumented V4 has its own _uncompressedSize field. Tracks in putBytes() — NOT in write() (which sees rearranged chunk buffer with doc count + offset table). V5 extends V4 and inherits its tracking.
PASS_THROUGH compression (ChunkCompressionType.PASS_THROUGH) compressed == uncompressed; ratio is 1.0x Detect and report correctly — not an error, just means no compression applied (e.g., CLP pre-compressed data)
CLP V2 sub-stream semantics getUncompressedSize() returns sum of 4 sub-writer sizes (logtypeId, dictVarId, encodedVar, rawMsg). In CLP-encoded mode, this is the pre-compression sub-stream byte total, NOT original UTF-8 message size. Documented and intentional. Only the raw fallback path (_rawMsgFwdIndex.putBytes(message)) tracks true message bytes. Ratio reflects codec effectiveness on the CLP-encoded representation.
Segment reload (POST /segments/{table}/{segment}/reload) Uses SegmentPreProcessorForwardIndexHandler which can rewrite forward indexes in-place (e.g., raw→dict or dict→raw transitions) ForwardIndexHandler uses shared ForwardIndexType.resolveCompressionType() for codec resolution when rewriting. If a column transitions from raw→dict, the stored uncompressedSizeBytes is set to INDEX_NOT_FOUND (-1). If dict→raw, the new writer captures and persists uncompressed size.
Old raw segments (created before feature flag enabled) No uncompressed size or codec in metadata.properties Server-side: computeIfAbsent for column accumulators moved inside conditional branches — old raw segments (codec == null && !hasDictionary) excluded. Controller-side: ServerSegmentMetadataReader skips columns where codec == null && !hasDictionary. ColumnMetadataImpl.fromPropertiesConfiguration() returns INDEX_NOT_FOUND (-1) for missing fields.
Dict column sentinel preservation -1 (INDEX_NOT_FOUND) sentinel must not be summed/divided across replicas during aggregation Controller accumulation uses >= 0 guard before summing. Output reconstructs -1 for dict columns with no accumulated data. Prevents sentinel corruption through arithmetic.
Negative compression ratio Could occur if numerator or denominator is corrupted or zero Ratio only computed when uncompressed > 0 && compressed > 0. Otherwise left as 0 or not reported.
Transition window after enabling flag When compressionStatsEnabled is turned on for an existing table, only newly created segments have uncompressed sizes. Old segments lack this metadata. API response includes segmentsWithStats / totalSegments. When segmentsWithStats == 0, compressionStats is null (not an empty object). compressionStatsSummary only constructed when segmentsWithStats > 0 after replica division.
Tier gauge stale key cleanup When tiers are removed from table config or tables are deleted, stale tier gauges would persist in Prometheus _emittedTierKeys (ConcurrentHashMap<String, Set<String>>) tracks all emitted tier keys per table. clearTierMetrics() iterates and removes all. Called from SegmentStatusChecker.removeMetricsForTable(). Leader check uses base tableNameWithType (not tier-suffixed key — isLeaderForTable("table.tier") would never match).
storageBreakdown always-on Tier info must be reported regardless of compression flag Server passes immutableSegment.getTier() in both flag-ON and flag-OFF paths. Non-tiered segments are bucketed into "default", so the breakdown is populated for any table with loaded segments. Flag-OFF path constructs SegmentSizeInfo with explicit tier parameter (no compression stats fields). storageBreakdown is null only when the tier accumulator is empty (no segments loaded at all).
V1/V2 segment format No index_map.txt — compressed index sizes not tracked per index type Uncompressed sizes would still be in metadata.properties. Compressed sizes unavailable per index type.
CLP V1 legacy Not instrumented for uncompressed size (getUncompressedSize() returns 0), but codec IS still persisted (PASS_THROUGH) CLP V1 columns will have codec in metadata but no uncompressed size. On the size endpoint path, they appear in columnCompressionStats with compressed size, codec, and ratio set to 0. On the metadata endpoint path, they are excluded (raw columns require uncompressedSize > 0 to be included).
Metadata caching metadata.properties is read once at segment load, cached in-memory New fields only take effect for newly created segments. A segment reload re-runs SegmentPreProcessor and refreshes cached metadata without server restart.

Summary: Modules Touched

Module Changes Purpose
pinot-segment-spi ColumnMetadata, ColumnMetadataImpl, ForwardIndexConfig, IndexingConfig SPI interfaces + metadata loading + feature flag
pinot-segment-local BaseChunkForwardIndexWriter, FixedByteChunkForwardIndexWriter, VarByteChunkForwardIndexWriter, VarByteChunkForwardIndexWriterV4, VarByteChunkForwardIndexWriterV6, CLPForwardIndexCreatorV2, BaseSegmentCreator, ForwardIndexHandler, ForwardIndexType Ingestion capture (per-value tracking in each writer subclass, V5/V6 inherit from V4) + CLP sub-stream tracking + codec resolution shared utility + reload-time metadata updates
pinot-common TableMetadataInfo, ControllerGauge, CompressionStatsSummary, ColumnCompressionStatsInfo, StorageBreakdownInfo DTOs (immutable, @JsonCreator) + metrics gauges
pinot-server TableSizeResource, TablesResource Server-side reporting (size endpoint + metadata endpoint). Both endpoints report tier info regardless of flag. Old raw segments excluded from stats.
pinot-controller TableSizeReader, ServerTableSizeReader, ServerSegmentMetadataReader, TableMetadataReader, TableSize, PinotTableRestletResource, SegmentStatusChecker Aggregation + API + dict sentinel preservation + negative ratio guards + tier gauge lifecycle + stale key cleanup + mixed codec detection ("MIXED")

Storage & Tech Stack Summary

What Where Format
Uncompressed size + codec metadata.properties inside each segment dir Key-value properties file (existing)
Compressed index sizes index_map.txt inside V3 segment dir Already tracked (existing)
API responses Controller REST endpoints (JSON over HTTP) Jackson-serialized POJOs
Metrics Prometheus gauges via ControllerMetrics pinot_table_* gauge names
Server → Controller transport HTTP GET fan-out via ServerTableSizeReader JSON SegmentSizeInfo DTOs

No new storage systems, databases, or external dependencies. Everything uses Pinot's existing infrastructure: properties files for segment metadata, HTTP for server-to-controller communication, Jackson for serialization, and ControllerMetrics for Prometheus gauges.


Related Issues and PRs

Use Cases

  1. Accurate COGS estimation: Compression ratio and per-column breakdown for informed storage cost projections
  2. Codec optimization: Identify columns with poor compression ratios and switch codecs (e.g., LZ4 -> ZSTANDARD for cold data)
  3. Capacity planning: Right-size clusters by understanding true storage footprint with local vs tiered breakdown
  4. Schema optimization: Identify columns that benefit from dictionary encoding vs raw encoding
  5. Index cost analysis: Per-column index size visibility to evaluate cost-vs-performance trade-offs when adding or removing indexes
  6. Monitoring/alerting: Alert when compression ratio degrades after schema changes or data pattern shifts

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions