Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: clear profile buffer at flush #3179

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 4 additions & 6 deletions pkg/parquet/row_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ import (
"github.com/grafana/pyroscope/pkg/util/loser"
)

const (
defaultRowBufferSize = 64
)

var (
_ parquet.RowReader = (*emptyRowReader)(nil)
_ parquet.RowReader = (*ErrRowReader)(nil)
Expand Down Expand Up @@ -45,7 +41,7 @@ func NewMergeRowReader(readers []parquet.RowReader, maxValue parquet.Row, less f
}
its := make([]iter.Iterator[parquet.Row], len(readers))
for i := range readers {
its[i] = NewBufferedRowReaderIterator(readers[i], defaultRowBufferSize)
its[i] = NewBufferedRowReaderIterator(readers[i], 1)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Experiments show that buffering here is not helpful; we reference many column readers but could release them earlier, allowing the GC to free unused objects. Rows in our case are too large (approximately 10KB) for batching.

We may want to refactor this piece: a buffer of size 1 does not significantly harm performance, but it also does not provide any benefits.

}

return NewIteratorRowReader(
Expand Down Expand Up @@ -143,8 +139,10 @@ func (r *BufferedRowReaderIterator) Close() error {
return r.err
}

// TODO: refactor: the ReadAll and ReadAllWithBufferSize functions are only used in tests.

func ReadAll(r parquet.RowReader) ([]parquet.Row, error) {
return ReadAllWithBufferSize(r, defaultRowBufferSize)
return ReadAllWithBufferSize(r, 1)
}

func ReadAllWithBufferSize(r parquet.RowReader, bufferSize int) ([]parquet.Row, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/parquet/row_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func CopyAsRowGroups(dst RowWriterFlusher, src parquet.RowReader, rowGroupNumCou
if rowGroupNumCount <= 0 {
panic("rowGroupNumCount must be positive")
}
bufferSize := defaultRowBufferSize
bufferSize := 1
// We clamp the buffer to the rowGroupNumCount to avoid allocating a buffer that is too large.
if rowGroupNumCount < bufferSize {
bufferSize = rowGroupNumCount
Expand Down
46 changes: 17 additions & 29 deletions pkg/phlaredb/profile_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/pyroscope/pkg/phlaredb/block"
"github.com/grafana/pyroscope/pkg/phlaredb/query"
schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
"github.com/grafana/pyroscope/pkg/slices"
"github.com/grafana/pyroscope/pkg/util/build"
)

Expand All @@ -40,7 +41,6 @@ type profileStore struct {

path string
persister schemav1.Persister[*schemav1.Profile]
writer *parquet.GenericWriter[*schemav1.Profile]

// lock serializes appends to the slice. Every new profile is appended
// to the slice and to the index (has its own lock). In practice, it's
Expand Down Expand Up @@ -86,10 +86,6 @@ func newProfileStore(phlarectx context.Context) *profileStore {
}
s.flushWg.Add(1)
go s.cutRowGroupLoop()
// Initialize writer on /dev/null
// TODO: Reuse parquet.Writer beyond life time of the head.
s.writer = newParquetProfileWriter(io.Discard)

return s
}

Expand Down Expand Up @@ -204,13 +200,7 @@ func (s *profileStore) DeleteRowGroups() error {
}

func (s *profileStore) prepareFile(path string) (f *os.File, err error) {
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644)
if err != nil {
return nil, err
}
s.writer.Reset(file)

return file, err
return os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644)
}

// cutRowGroups gets called, when a patrticular row group has been finished
Expand Down Expand Up @@ -254,12 +244,13 @@ func (s *profileStore) cutRowGroup(count int) (err error) {
return err
}

n, err := parquet.CopyRows(s.writer, schemav1.NewInMemoryProfilesRowReader(s.flushBuffer))
w := newParquetProfileWriter(f)
n, err := parquet.CopyRows(w, schemav1.NewInMemoryProfilesRowReader(s.flushBuffer))
if err != nil {
return errors.Wrap(err, "write row group segments to disk")
}

if err := s.writer.Close(); err != nil {
if err = w.Close(); err != nil {
return errors.Wrap(err, "close row group segment writer")
}

Expand Down Expand Up @@ -288,14 +279,19 @@ func (s *profileStore) cutRowGroup(count int) (err error) {
s.rowGroups = append(s.rowGroups, rowGroup)
// Cutting the index is relatively quick op (no I/O).
err = s.index.cutRowGroup(s.flushBuffer)

// The buffer references profiles, so we should reset it in order for
// the GC to reclaim the memory.
slices.Clear(s.flushBuffer)
s.flushBuffer = s.flushBuffer[:0]
slices.Clear(s.flushBufferLbs)
s.flushBufferLbs = s.flushBufferLbs[:0]
s.profilesLock.Lock()
defer s.profilesLock.Unlock()
for i := range s.slice[:count] {
s.metrics.samples.Sub(float64(len(s.slice[i].Samples.StacktraceIDs)))
}
// reset slice and metrics
s.slice = copySlice(s.slice[count:])
s.slice = s.slice[:copy(s.slice, s.slice[count:])]
currentSize := s.size.Sub(size)
if err != nil {
return err
Expand Down Expand Up @@ -371,23 +367,21 @@ func (s *profileStore) loadProfilesToFlush(count int) uint64 {
}

func (s *profileStore) writeRowGroups(path string, rowGroups []parquet.RowGroup) (n uint64, numRowGroups uint64, err error) {
fileCloser, err := s.prepareFile(path)
f, err := s.prepareFile(path)
if err != nil {
return 0, 0, err
}
defer runutil.CloseWithErrCapture(&err, fileCloser, "closing parquet file")
defer runutil.CloseWithErrCapture(&err, f, "closing parquet file")
readers := make([]parquet.RowReader, len(rowGroups))
for i, rg := range rowGroups {
readers[i] = rg.Rows()
}
n, numRowGroups, err = phlareparquet.CopyAsRowGroups(s.writer, schemav1.NewMergeProfilesRowReader(readers), s.cfg.MaxBufferRowCount)

if err := s.writer.Close(); err != nil {
w := newParquetProfileWriter(f)
n, numRowGroups, err = phlareparquet.CopyAsRowGroups(w, schemav1.NewMergeProfilesRowReader(readers), s.cfg.MaxBufferRowCount)
if err = w.Close(); err != nil {
return 0, 0, err
}

s.rowsFlushed += n

return n, numRowGroups, nil
}

Expand Down Expand Up @@ -548,9 +542,3 @@ func (r *seriesIDRowsRewriter) ReadRows(rows []parquet.Row) (int, error) {

return n, nil
}

func copySlice[T any](in []T) []T {
out := make([]T, len(in))
copy(out, in)
return out
}
16 changes: 5 additions & 11 deletions pkg/phlaredb/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
"github.com/grafana/pyroscope/pkg/phlaredb/tsdb"
"github.com/grafana/pyroscope/pkg/phlaredb/tsdb/index"
"github.com/grafana/pyroscope/pkg/slices"
)

// delta encoding for ranges
Expand Down Expand Up @@ -489,21 +490,14 @@ func (pi *profilesIndex) cutRowGroup(rgProfiles []schemav1.InMemoryProfile) erro

for fp, ps := range pi.profilesPerFP {
count := countPerFP[fp]
// empty all in memory profiles
for i := range ps.profiles[:count] {
// Allow GC to evict the object.
ps.profiles[i] = nil
}
ps.profiles = ps.profiles[count:]

n := copy(ps.profiles, ps.profiles[count:])
slices.Clear(ps.profiles[n:])
ps.profiles = ps.profiles[:n]
// attach rowGroup and rowNum information
kolesnikovae marked this conversation as resolved.
Show resolved Hide resolved
rowRange := rowRangePerFP[ps.fp]

ps.profilesOnDisk = append(
ps.profilesOnDisk,
rowRange,
rowRangePerFP[ps.fp],
)

}

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/phlaredb/schemas/v1/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ const profileSize = uint64(unsafe.Sizeof(InMemoryProfile{}))
func (p InMemoryProfile) Size() uint64 {
size := profileSize + uint64(cap(p.Comments)*8)
// 4 bytes for stacktrace id and 8 bytes for each stacktrace value
return size + uint64(cap(p.Samples.StacktraceIDs)*(4+8))
return size + uint64(cap(p.Samples.StacktraceIDs)*(4+8)+cap(p.Samples.Spans)*8)
}

func (p InMemoryProfile) Timestamp() model.Time {
Expand Down