Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b8a45b4
created HeadConvertingQueue
cherep58 Sep 1, 2025
8356af0
Merge branch 'rebuild_heads_container' of https://github.com/deckhous…
cherep58 Sep 1, 2025
57ac6f4
added timeInterval cache for HeadDataStorage
cherep58 Sep 1, 2025
0a5919d
Merge branch 'rebuild_heads_container' of https://github.com/deckhous…
cherep58 Sep 2, 2025
56d3e48
rewrite Keeper
cherep58 Sep 5, 2025
b19f67f
Merge branch 'rebuild_heads_container' of https://github.com/deckhous…
cherep58 Sep 8, 2025
b037698
Merge branch 'rebuild_heads_container' of https://github.com/deckhous…
cherep58 Sep 10, 2025
0922eb0
Merge branch 'rebuild_heads_container' of https://github.com/deckhous…
cherep58 Sep 11, 2025
50d3818
Merge branch 'rebuild_heads_container' of https://github.com/deckhous…
cherep58 Sep 11, 2025
e169cf0
Merge branch 'rebuild_heads_container' of https://github.com/deckhous…
cherep58 Sep 16, 2025
afd5990
changes in cppbridge/relabeler
cherep58 Sep 18, 2025
4f161a1
added data loading/unloading
cherep58 Sep 18, 2025
af3cdcf
created unit tests for block/writer
cherep58 Sep 18, 2025
44de84e
Merge branch 'rebuild_heads_container' of https://github.com/deckhous…
cherep58 Sep 19, 2025
36b3895
created persistener
cherep58 Sep 23, 2025
2a5b5a1
fixed data race in LoadAndQuerySeriesDataTask
cherep58 Sep 24, 2025
da08883
fixed compilation error
cherep58 Sep 24, 2025
d8d2770
added mock
cherep58 Sep 24, 2025
dae2a80
fixed compilation error
cherep58 Sep 24, 2025
e09eed7
fix adapter
u-veles-a Sep 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pp-pkg/storage/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (ar *Adapter) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error)
ahead := ar.proxy.Get()
queriers = append(
queriers,
querier.NewQuerier(ahead, querier.NewNoOpShardedDeduplicator, mint, maxt, nil, ar.activeQuerierMetrics),
querier.NewChunkQuerier(ahead, querier.NewNoOpShardedDeduplicator, mint, maxt, nil),
)

for head := range ar.proxy.RangeQueriableHeads(mint, maxt) {
Expand All @@ -179,7 +179,7 @@ func (ar *Adapter) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error)

queriers = append(
queriers,
querier.NewQuerier(head, querier.NewNoOpShardedDeduplicator, mint, maxt, nil, ar.storageQuerierMetrics),
querier.NewChunkQuerier(head, querier.NewNoOpShardedDeduplicator, mint, maxt, nil),
)
}

Expand All @@ -198,7 +198,7 @@ func (ar *Adapter) Close() error {
}

// HeadQuerier returns [storage.Querier] from active head.
func (ar *Adapter) HeadQuerier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
func (ar *Adapter) HeadQuerier(mint, maxt int64) (storage.Querier, error) {
return querier.NewQuerier(
ar.proxy.Get(),
querier.NewNoOpShardedDeduplicator,
Expand All @@ -210,7 +210,7 @@ func (ar *Adapter) HeadQuerier(ctx context.Context, mint, maxt int64) (storage.Q
}

// HeadStatus returns stats of Head.
func (ar *Adapter) HeadStatus(ctx context.Context, limit int) (querier.HeadStatus, error) {
func (ar *Adapter) HeadStatus(ctx context.Context, limit int) (*querier.HeadStatus, error) {
return querier.QueryHeadStatus(ctx, ar.proxy.Get(), limit)
}

Expand Down
32 changes: 27 additions & 5 deletions pp/go/cppbridge/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ type TimeInterval struct {
MaxT int64
}

func NewInvalidTimeInterval() TimeInterval {
return TimeInterval{
MinT: math.MaxInt64,
MaxT: math.MinInt64,
}
}

func (t *TimeInterval) IsInvalid() bool {
return t.MinT == math.MaxInt64 && t.MaxT == math.MinInt64
}

type Sample struct {
Timestamp int64
Value float64
Expand All @@ -39,13 +50,15 @@ type Sample struct {
type HeadDataStorage struct {
dataStorage uintptr
gcDestroyDetector *uint64
timeInterval TimeInterval
}

// NewHeadDataStorage - constructor.
func NewHeadDataStorage() *HeadDataStorage {
ds := &HeadDataStorage{
dataStorage: seriesDataDataStorageCtor(),
gcDestroyDetector: &gcDestroyDetector,
timeInterval: NewInvalidTimeInterval(),
}

runtime.SetFinalizer(ds, func(ds *HeadDataStorage) {
Expand All @@ -58,12 +71,16 @@ func NewHeadDataStorage() *HeadDataStorage {
// Reset - resets data storage.
func (ds *HeadDataStorage) Reset() {
seriesDataDataStorageReset(ds.dataStorage)
ds.timeInterval = NewInvalidTimeInterval()
}

func (ds *HeadDataStorage) TimeInterval() TimeInterval {
res := seriesDataDataStorageTimeInterval(ds.dataStorage)
runtime.KeepAlive(ds)
return res
func (ds *HeadDataStorage) TimeInterval(invalidateCache bool) TimeInterval {
if invalidateCache || ds.timeInterval.IsInvalid() {
ds.timeInterval = seriesDataDataStorageTimeInterval(ds.dataStorage)
runtime.KeepAlive(ds)
}

return ds.timeInterval
}

func (ds *HeadDataStorage) GetQueriedSeriesBitset() []byte {
Expand Down Expand Up @@ -237,7 +254,7 @@ type HeadDataStorageSerializedChunks struct {

type HeadDataStorageSerializedChunkMetadata [SerializedChunkMetadataSize]byte

func (cm *HeadDataStorageSerializedChunkMetadata) SeriesID() uint32 {
func (cm HeadDataStorageSerializedChunkMetadata) SeriesID() uint32 {
return *(*uint32)(unsafe.Pointer(&cm[0]))
}

Expand All @@ -257,6 +274,11 @@ func (r *HeadDataStorageSerializedChunks) Data() []byte {
return r.data
}

func (r *HeadDataStorageSerializedChunks) Metadata(chunkIndex int) HeadDataStorageSerializedChunkMetadata {
offset := Uint32Size + chunkIndex*SerializedChunkMetadataSize
return HeadDataStorageSerializedChunkMetadata(r.data[offset : offset+SerializedChunkMetadataSize])
}

type HeadDataStorageSerializedChunkIndex struct {
m map[uint32][]int
}
Expand Down
7 changes: 6 additions & 1 deletion pp/go/cppbridge/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,15 @@ func (s *HeadSuite) TestTimeInterval() {
encoder.Encode(1, 3, 1.0)

// Act
timeInterval := dataStorage.TimeInterval()
timeInterval := dataStorage.TimeInterval(false)
encoder.Encode(1, 4, 1.0)
cachedTimeInterval := dataStorage.TimeInterval(false)
actualTimeInterval := dataStorage.TimeInterval(true)

// Assert
s.Equal(cppbridge.TimeInterval{MinT: 1, MaxT: 3}, timeInterval)
s.Equal(cppbridge.TimeInterval{MinT: 1, MaxT: 3}, cachedTimeInterval)
s.Equal(cppbridge.TimeInterval{MinT: 1, MaxT: 4}, actualTimeInterval)
}

func (s *HeadSuite) TestInstantQuery() {
Expand Down
2 changes: 1 addition & 1 deletion pp/go/cppbridge/head_wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ func TestHeadWalDecoder_DecodeToDataStorage(t *testing.T) {

// Assert
require.NoError(t, err)
require.Equal(t, cppbridge.TimeInterval{MinT: 1660828401000, MaxT: 1660828410000}, dataStorage.TimeInterval())
require.Equal(t, cppbridge.TimeInterval{MinT: 1660828401000, MaxT: 1660828410000}, dataStorage.TimeInterval(false))
}
2 changes: 1 addition & 1 deletion pp/go/relabeler/block/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (w *Writer) createWriters(shard relabeler.Shard) (blockWriters, error) {
var writers blockWriters

shard.DataStorageRLock()
timeInterval := shard.DataStorage().TimeInterval()
timeInterval := shard.DataStorage().TimeInterval(false)
shard.DataStorageRUnlock()

quantStart := (timeInterval.MinT / w.blockDurationMs) * w.blockDurationMs
Expand Down
4 changes: 2 additions & 2 deletions pp/go/relabeler/head/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ func (ds *DataStorage) CreateRevertableLoader(lss *cppbridge.LabelSetStorage, ls
return ds.dataStorage.CreateRevertableLoader(lss, lsIdBatchSize)
}

func (ds *DataStorage) TimeInterval() cppbridge.TimeInterval {
return ds.dataStorage.TimeInterval()
func (ds *DataStorage) TimeInterval(invalidateCache bool) cppbridge.TimeInterval {
return ds.dataStorage.TimeInterval(invalidateCache)
}

func (ds *DataStorage) GetQueriedSeriesBitset() []byte {
Expand Down
2 changes: 1 addition & 1 deletion pp/go/relabeler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type DataStorage interface {
CreateUnusedSeriesDataUnloader() *cppbridge.UnusedSeriesDataUnloader
CreateLoader(queriers []uintptr) *cppbridge.UnloadedDataLoader
CreateRevertableLoader(lss *cppbridge.LabelSetStorage, lsIdBatchSize uint32) *cppbridge.UnloadedDataRevertableLoader
TimeInterval() cppbridge.TimeInterval
TimeInterval(invalidateCache bool) cppbridge.TimeInterval
GetQueriedSeriesBitset() []byte
}

Expand Down
2 changes: 1 addition & 1 deletion pp/go/storage/appender/appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestXxx(t *testing.T) {
lss := &shard.LSS{}
ds := shard.NewDataStorage()
wl := &testWal{}
sd := shard.NewShard(lss, ds, wl, 0)
sd := shard.NewShard(lss, ds, nil, nil, wl, 0)
id := "test-head-id"
generation := uint64(0)

Expand Down
142 changes: 142 additions & 0 deletions pp/go/storage/block/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package block

import (
"fmt"
"io"
"math"
"unsafe"

"github.com/prometheus/prometheus/pp/go/cppbridge"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)

type Chunk struct {
rc *cppbridge.RecodedChunk
}

func (c *Chunk) MinT() int64 {
return c.rc.MinT
}

func (c *Chunk) MaxT() int64 {
return c.rc.MaxT
}

func (c *Chunk) SeriesID() uint32 {
return c.rc.SeriesId
}

func (c *Chunk) Encoding() chunkenc.Encoding {
return chunkenc.EncXOR
}

func (c *Chunk) SampleCount() uint8 {
return c.rc.SamplesCount
}

func (c *Chunk) Bytes() []byte {
return c.rc.ChunkData
}

type ChunkIterator struct {
r *cppbridge.ChunkRecoder
rc *cppbridge.RecodedChunk
}

func NewChunkIterator(lss *cppbridge.LabelSetStorage, lsIdBatchSize uint32, ds *cppbridge.HeadDataStorage, minT, maxT int64) ChunkIterator {
return ChunkIterator{r: cppbridge.NewChunkRecoder(lss, lsIdBatchSize, ds, cppbridge.TimeInterval{MinT: minT, MaxT: maxT})}
}

func (i *ChunkIterator) Next() bool {
if i.rc != nil && !i.rc.HasMoreData {
return false
}

rc := i.r.RecodeNextChunk()
i.rc = &rc
return rc.SeriesId != math.MaxUint32
}

func (i *ChunkIterator) NextBatch() bool {
i.rc.HasMoreData = i.r.NextBatch()
return i.rc.HasMoreData
}

func (i *ChunkIterator) At() Chunk {
return Chunk{rc: i.rc}
}

type IndexWriter struct {
cppIndexWriter *cppbridge.IndexWriter
isPrefixWritten bool
}

func (iw *IndexWriter) WriteSeriesTo(id uint32, chunks []ChunkMetadata, w io.Writer) (n int64, err error) {
if !iw.isPrefixWritten {
var bytesWritten int
bytesWritten, err = w.Write(iw.cppIndexWriter.WriteHeader())
n += int64(bytesWritten)
if err != nil {
return n, fmt.Errorf("failed to write header: %w", err)
}

bytesWritten, err = w.Write(iw.cppIndexWriter.WriteSymbols())
n += int64(bytesWritten)
if err != nil {
return n, fmt.Errorf("failed to write symbols: %w", err)
}
iw.isPrefixWritten = true
}

bytesWritten, err := w.Write(iw.cppIndexWriter.WriteSeries(id, *(*[]cppbridge.ChunkMetadata)(unsafe.Pointer(&chunks))))
n += int64(bytesWritten)
if err != nil {
return n, fmt.Errorf("failed to write series: %w", err)
}

return n, nil
}

func (iw *IndexWriter) WriteRestTo(w io.Writer) (n int64, err error) {
bytesWritten, err := w.Write(iw.cppIndexWriter.WriteLabelIndices())
n += int64(bytesWritten)
if err != nil {
return n, fmt.Errorf("failed to write label indicies: %w", err)
}

for {
data, hasMoreData := iw.cppIndexWriter.WriteNextPostingsBatch(1 << 20)
bytesWritten, err = w.Write(data)
if err != nil {
return n, fmt.Errorf("failed to write postings: %w", err)
}
n += int64(bytesWritten)
if !hasMoreData {
break
}
}

bytesWritten, err = w.Write(iw.cppIndexWriter.WriteLabelIndicesTable())
if err != nil {
return n, fmt.Errorf("failed to write label indicies table: %w", err)
}
n += int64(bytesWritten)

bytesWritten, err = w.Write(iw.cppIndexWriter.WritePostingsTableOffsets())
if err != nil {
return n, fmt.Errorf("failed to write posting table offsets: %w", err)
}
n += int64(bytesWritten)

bytesWritten, err = w.Write(iw.cppIndexWriter.WriteTableOfContents())
if err != nil {
return n, fmt.Errorf("failed to write table of content: %w", err)
}
n += int64(bytesWritten)

return n, nil
}

func NewIndexWriter(lss *cppbridge.LabelSetStorage) IndexWriter {
return IndexWriter{cppIndexWriter: cppbridge.NewIndexWriter(lss)}
}
Loading
Loading