Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: clear profile buffer at flush #3179

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 4 additions & 6 deletions pkg/parquet/row_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ import (
"github.com/grafana/pyroscope/pkg/util/loser"
)

const (
defaultRowBufferSize = 64
)

var (
_ parquet.RowReader = (*emptyRowReader)(nil)
_ parquet.RowReader = (*ErrRowReader)(nil)
Expand Down Expand Up @@ -45,7 +41,7 @@ func NewMergeRowReader(readers []parquet.RowReader, maxValue parquet.Row, less f
}
its := make([]iter.Iterator[parquet.Row], len(readers))
for i := range readers {
its[i] = NewBufferedRowReaderIterator(readers[i], defaultRowBufferSize)
its[i] = NewBufferedRowReaderIterator(readers[i], 1)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Experiments show that buffering here is not helpful; we reference many column readers but could release them earlier, allowing the GC to free unused objects. Rows in our case are too large (approximately 10KB) for batching.

We may want to refactor this piece: a buffer of size 1 does not significantly harm performance, but it also does not provide any benefits.

}

return NewIteratorRowReader(
Expand Down Expand Up @@ -143,8 +139,10 @@ func (r *BufferedRowReaderIterator) Close() error {
return r.err
}

// TODO: refactor: the ReadAll and ReadAllWithBufferSize functions are only used in tests.

func ReadAll(r parquet.RowReader) ([]parquet.Row, error) {
return ReadAllWithBufferSize(r, defaultRowBufferSize)
return ReadAllWithBufferSize(r, 1)
}

func ReadAllWithBufferSize(r parquet.RowReader, bufferSize int) ([]parquet.Row, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/parquet/row_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func CopyAsRowGroups(dst RowWriterFlusher, src parquet.RowReader, rowGroupNumCou
if rowGroupNumCount <= 0 {
panic("rowGroupNumCount must be positive")
}
bufferSize := defaultRowBufferSize
bufferSize := 1
// We clamp the buffer to the rowGroupNumCount to avoid allocating a buffer that is too large.
if rowGroupNumCount < bufferSize {
bufferSize = rowGroupNumCount
Expand Down
2 changes: 1 addition & 1 deletion pkg/phlaredb/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func newSampleDict(samples schemav1.Samples) map[uint32]uint64 {
return dict
}

func (d *deltaProfiles) computeDelta(ps schemav1.InMemoryProfile) schemav1.Samples {
func (d *deltaProfiles) computeDelta(ps *schemav1.InMemoryProfile) schemav1.Samples {
d.mtx.Lock()
defer d.mtx.Unlock()

Expand Down
4 changes: 2 additions & 2 deletions pkg/phlaredb/downsample/downsample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestDownsampler_ProfileCounts(t *testing.T) {
}

func TestDownsampler_Aggregation(t *testing.T) {
profiles := make([]schemav1.InMemoryProfile, 0)
profiles := make([]*schemav1.InMemoryProfile, 0)
builder := testhelper.NewProfileBuilder(1703853310000000000).CPUProfile() // 2023-12-29T12:35:10Z
builder.ForStacktraceString("a", "b", "c").AddSamples(30)
builder.ForStacktraceString("a", "b", "c", "d").AddSamples(20)
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestDownsampler_Aggregation(t *testing.T) {
}

func TestDownsampler_VaryingFingerprints(t *testing.T) {
profiles := make([]schemav1.InMemoryProfile, 0)
profiles := make([]*schemav1.InMemoryProfile, 0)
for i := 0; i < 5; i++ {
builder := testhelper.NewProfileBuilder(1703853310000000000).CPUProfile() // 2023-12-29T12:35:10Z
builder.ForStacktraceString("a", "b", "c").AddSamples(30)
Expand Down
2 changes: 1 addition & 1 deletion pkg/phlaredb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (h *Head) Ingest(ctx context.Context, p *profilev1.Profile, id uuid.UUID, e
continue
}

if err := h.profiles.ingest(ctx, []schemav1.InMemoryProfile{profile}, lbls[idxType], metricName); err != nil {
if err := h.profiles.ingest(ctx, []*schemav1.InMemoryProfile{profile}, lbls[idxType], metricName); err != nil {
return err
}

Expand Down
58 changes: 23 additions & 35 deletions pkg/phlaredb/profile_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/pyroscope/pkg/phlaredb/block"
"github.com/grafana/pyroscope/pkg/phlaredb/query"
schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
"github.com/grafana/pyroscope/pkg/slices"
"github.com/grafana/pyroscope/pkg/util/build"
)

Expand All @@ -40,14 +41,13 @@ type profileStore struct {

path string
persister schemav1.Persister[*schemav1.Profile]
writer *parquet.GenericWriter[*schemav1.Profile]

// lock serializes appends to the slice. Every new profile is appended
// to the slice and to the index (has its own lock). In practice, it's
// only purpose is to accommodate the parquet writer: slice is never
// accessed for reads.
profilesLock sync.Mutex
slice []schemav1.InMemoryProfile
slice []*schemav1.InMemoryProfile

// Rows lock synchronises access to the on-disk row groups.
// When the in-memory index (profiles) is being flushed on disk,
Expand All @@ -62,7 +62,7 @@ type profileStore struct {
flushQueue chan int // channel to signal that a flush is needed for slice[:n]
closeOnce sync.Once
flushWg sync.WaitGroup
flushBuffer []schemav1.InMemoryProfile
flushBuffer []*schemav1.InMemoryProfile
flushBufferLbs []phlaremodel.Labels
onFlush func()
}
Expand All @@ -86,10 +86,6 @@ func newProfileStore(phlarectx context.Context) *profileStore {
}
s.flushWg.Add(1)
go s.cutRowGroupLoop()
// Initialize writer on /dev/null
// TODO: Reuse parquet.Writer beyond life time of the head.
s.writer = newParquetProfileWriter(io.Discard)

return s
}

Expand Down Expand Up @@ -204,13 +200,7 @@ func (s *profileStore) DeleteRowGroups() error {
}

func (s *profileStore) prepareFile(path string) (f *os.File, err error) {
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644)
if err != nil {
return nil, err
}
s.writer.Reset(file)

return file, err
return os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644)
}

// cutRowGroups gets called, when a patrticular row group has been finished
Expand Down Expand Up @@ -254,12 +244,13 @@ func (s *profileStore) cutRowGroup(count int) (err error) {
return err
}

n, err := parquet.CopyRows(s.writer, schemav1.NewInMemoryProfilesRowReader(s.flushBuffer))
w := newParquetProfileWriter(f)
n, err := parquet.CopyRows(w, schemav1.NewInMemoryProfilesRowReader(s.flushBuffer))
if err != nil {
return errors.Wrap(err, "write row group segments to disk")
}

if err := s.writer.Close(); err != nil {
if err = w.Close(); err != nil {
return errors.Wrap(err, "close row group segment writer")
}

Expand Down Expand Up @@ -288,14 +279,19 @@ func (s *profileStore) cutRowGroup(count int) (err error) {
s.rowGroups = append(s.rowGroups, rowGroup)
// Cutting the index is relatively quick op (no I/O).
err = s.index.cutRowGroup(s.flushBuffer)

// The buffer references profiles, so we should reset it in order for
// the GC to reclaim the memory.
slices.Clear(s.flushBuffer)
s.flushBuffer = s.flushBuffer[:0]
slices.Clear(s.flushBufferLbs)
s.flushBufferLbs = s.flushBufferLbs[:0]
s.profilesLock.Lock()
defer s.profilesLock.Unlock()
for i := range s.slice[:count] {
s.metrics.samples.Sub(float64(len(s.slice[i].Samples.StacktraceIDs)))
}
// reset slice and metrics
s.slice = copySlice(s.slice[count:])
s.slice = s.slice[:copy(s.slice, s.slice[count:])]
currentSize := s.size.Sub(size)
if err != nil {
return err
Expand All @@ -309,7 +305,7 @@ func (s *profileStore) cutRowGroup(count int) (err error) {
}

type byLabels struct {
p []schemav1.InMemoryProfile
p []*schemav1.InMemoryProfile
lbs []phlaremodel.Labels
}

Expand Down Expand Up @@ -345,7 +341,7 @@ func (by byLabels) Less(i, j int) bool {
// loadProfilesToFlush loads and sort profiles to flush into flushBuffer and returns the size of the profiles.
func (s *profileStore) loadProfilesToFlush(count int) uint64 {
if cap(s.flushBuffer) < count {
s.flushBuffer = make([]schemav1.InMemoryProfile, 0, count)
s.flushBuffer = make([]*schemav1.InMemoryProfile, 0, count)
}
if cap(s.flushBufferLbs) < count {
s.flushBufferLbs = make([]phlaremodel.Labels, 0, count)
Expand All @@ -371,27 +367,25 @@ func (s *profileStore) loadProfilesToFlush(count int) uint64 {
}

func (s *profileStore) writeRowGroups(path string, rowGroups []parquet.RowGroup) (n uint64, numRowGroups uint64, err error) {
fileCloser, err := s.prepareFile(path)
f, err := s.prepareFile(path)
if err != nil {
return 0, 0, err
}
defer runutil.CloseWithErrCapture(&err, fileCloser, "closing parquet file")
defer runutil.CloseWithErrCapture(&err, f, "closing parquet file")
readers := make([]parquet.RowReader, len(rowGroups))
for i, rg := range rowGroups {
readers[i] = rg.Rows()
}
n, numRowGroups, err = phlareparquet.CopyAsRowGroups(s.writer, schemav1.NewMergeProfilesRowReader(readers), s.cfg.MaxBufferRowCount)

if err := s.writer.Close(); err != nil {
w := newParquetProfileWriter(f)
n, numRowGroups, err = phlareparquet.CopyAsRowGroups(w, schemav1.NewMergeProfilesRowReader(readers), s.cfg.MaxBufferRowCount)
if err = w.Close(); err != nil {
return 0, 0, err
}

s.rowsFlushed += n

return n, numRowGroups, nil
}

func (s *profileStore) ingest(_ context.Context, profiles []schemav1.InMemoryProfile, lbs phlaremodel.Labels, profileName string) error {
func (s *profileStore) ingest(_ context.Context, profiles []*schemav1.InMemoryProfile, lbs phlaremodel.Labels, profileName string) error {
s.profilesLock.Lock()
defer s.profilesLock.Unlock()

Expand All @@ -406,7 +400,7 @@ func (s *profileStore) ingest(_ context.Context, profiles []schemav1.InMemoryPro
}

// add profile to the index
s.index.Add(&p, lbs, profileName)
s.index.Add(p, lbs, profileName)

// increase size of stored data
addedBytes := profiles[pos].Size()
Expand Down Expand Up @@ -548,9 +542,3 @@ func (r *seriesIDRowsRewriter) ReadRows(rows []parquet.Row) (int, error) {

return n, nil
}

func copySlice[T any](in []T) []T {
out := make([]T, len(in))
copy(out, in)
return out
}
8 changes: 4 additions & 4 deletions pkg/phlaredb/profile_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func TestProfileStore_RowGroupSplitting(t *testing.T) {

for i := 0; i < 100; i++ {
p := tc.values(i)
require.NoError(t, store.ingest(ctx, []schemav1.InMemoryProfile{p.p}, p.lbls, p.profileName))
require.NoError(t, store.ingest(ctx, []*schemav1.InMemoryProfile{&p.p}, p.lbls, p.profileName))
for store.flushing.Load() {
time.Sleep(time.Millisecond)
}
Expand Down Expand Up @@ -302,7 +302,7 @@ func TestProfileStore_Ingestion_SeriesIndexes(t *testing.T) {

for i := 0; i < 9; i++ {
p := threeProfileStreams(i)
require.NoError(t, store.ingest(ctx, []schemav1.InMemoryProfile{p.p}, p.lbls, p.profileName))
require.NoError(t, store.ingest(ctx, []*schemav1.InMemoryProfile{&p.p}, p.lbls, p.profileName))
}

// flush profiles and ensure the correct number of files are created
Expand Down Expand Up @@ -345,7 +345,7 @@ func BenchmarkFlush(b *testing.B) {
for i := 0; i < 10^6; i++ {
p := threeProfileStreams(i)
p.p.Samples = samples
require.NoError(b, store.ingest(ctx, []schemav1.InMemoryProfile{p.p}, p.lbls, p.profileName))
require.NoError(b, store.ingest(ctx, []*schemav1.InMemoryProfile{&p.p}, p.lbls, p.profileName))
}
require.NoError(b, store.cutRowGroup(len(store.slice)))
}
Expand Down Expand Up @@ -602,7 +602,7 @@ func TestRemoveFailedSegment(t *testing.T) {
require.NoError(t, store.Init(dir, defaultParquetConfig, contextHeadMetrics(context.Background())))
// fake a failed segment
_, err := os.Create(dir + "/profiles.0.parquet")
require.NoError(t, store.ingest(context.Background(), []schemav1.InMemoryProfile{{}}, phlaremodel.LabelsFromStrings(), "memory"))
require.NoError(t, store.ingest(context.Background(), []*schemav1.InMemoryProfile{{}}, phlaremodel.LabelsFromStrings(), "memory"))
require.NoError(t, err)
err = store.cutRowGroup(1)
require.NoError(t, err)
Expand Down
19 changes: 6 additions & 13 deletions pkg/phlaredb/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
"github.com/grafana/pyroscope/pkg/phlaredb/tsdb"
"github.com/grafana/pyroscope/pkg/phlaredb/tsdb/index"
"github.com/grafana/pyroscope/pkg/slices"
)

// delta encoding for ranges
Expand Down Expand Up @@ -461,7 +462,7 @@ func (pi *profilesIndex) writeTo(ctx context.Context, path string) ([][]rowRange
return rangesPerRG, writer.Close()
}

func (pi *profilesIndex) cutRowGroup(rgProfiles []schemav1.InMemoryProfile) error {
func (pi *profilesIndex) cutRowGroup(rgProfiles []*schemav1.InMemoryProfile) error {
pi.mutex.Lock()
defer pi.mutex.Unlock()

Expand Down Expand Up @@ -489,21 +490,13 @@ func (pi *profilesIndex) cutRowGroup(rgProfiles []schemav1.InMemoryProfile) erro

for fp, ps := range pi.profilesPerFP {
count := countPerFP[fp]
// empty all in memory profiles
for i := range ps.profiles[:count] {
// Allow GC to evict the object.
ps.profiles[i] = nil
}
ps.profiles = ps.profiles[count:]

// attach rowGroup and rowNum information
rowRange := rowRangePerFP[ps.fp]

n := copy(ps.profiles, ps.profiles[count:])
slices.Clear(ps.profiles[n:])
ps.profiles = ps.profiles[:n]
ps.profilesOnDisk = append(
ps.profilesOnDisk,
rowRange,
rowRangePerFP[ps.fp],
)

}

return nil
Expand Down
10 changes: 5 additions & 5 deletions pkg/phlaredb/schemas/v1/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ var (
)

func init() {
maxProfileRow = deconstructMemoryProfile(InMemoryProfile{
maxProfileRow = deconstructMemoryProfile(&InMemoryProfile{
SeriesIndex: math.MaxUint32,
TimeNanos: math.MaxInt64,
}, maxProfileRow)
Expand Down Expand Up @@ -490,7 +490,7 @@ const profileSize = uint64(unsafe.Sizeof(InMemoryProfile{}))
func (p InMemoryProfile) Size() uint64 {
size := profileSize + uint64(cap(p.Comments)*8)
// 4 bytes for stacktrace id and 8 bytes for each stacktrace value
return size + uint64(cap(p.Samples.StacktraceIDs)*(4+8))
return size + uint64(cap(p.Samples.StacktraceIDs)*(4+8)+cap(p.Samples.Spans)*8)
}

func (p InMemoryProfile) Timestamp() model.Time {
Expand All @@ -514,14 +514,14 @@ func copySlice[T any](in []T) []T {
return out
}

func NewInMemoryProfilesRowReader(slice []InMemoryProfile) *SliceRowReader[InMemoryProfile] {
return &SliceRowReader[InMemoryProfile]{
func NewInMemoryProfilesRowReader(slice []*InMemoryProfile) *SliceRowReader[*InMemoryProfile] {
return &SliceRowReader[*InMemoryProfile]{
slice: slice,
serialize: deconstructMemoryProfile,
}
}

func deconstructMemoryProfile(imp InMemoryProfile, row parquet.Row) parquet.Row {
func deconstructMemoryProfile(imp *InMemoryProfile, row parquet.Row) parquet.Row {
var (
col = -1
newCol = func() int {
Expand Down