diff --git a/integration/commands_administration_test.go b/integration/commands_administration_test.go index b0ed592e1be..af80cbc76cf 100644 --- a/integration/commands_administration_test.go +++ b/integration/commands_administration_test.go @@ -1711,47 +1711,67 @@ func TestCommandsAdministrationCompactForce(t *testing.T) { func TestCommandsAdministrationCompactCapped(t *testing.T) { t.Parallel() - ctx, coll := setup.Setup(t) - for name, tc := range map[string]struct { //nolint:vet // for readability force bool + cleanupPercentage uint8 // optional, default value is 20 maxDocuments int64 sizeInBytes int64 insertDocuments int32 - expectedDocuments int64 // insertDocuments - insertDocuments*0.2 (cleanup 20%) + 1 (extra insert after compact) + expectedDocuments int64 skipForMongoDB string // optional, skip test for MongoDB backend with a specific reason }{ "OverflowDocs": { - force: true, - maxDocuments: 10, - sizeInBytes: 100000, - insertDocuments: 12, // overflows capped collection max documents + force: true, + maxDocuments: 10, + sizeInBytes: 100000, + insertDocuments: 12, + // cleanup will be based on max documents + // maxDocuments + 1 (extra insert after compact) expectedDocuments: 11, }, "OverflowSize": { - force: true, - maxDocuments: 1000, - sizeInBytes: 256, - insertDocuments: 20, // overflows capped collection size + force: true, + sizeInBytes: 256, + insertDocuments: 20, + // cleanup will be based on size + // [insertDocuments * 0.2 (cleanup 20%)] + 1 (extra insert after compact) expectedDocuments: 17, }, + "Cleanup10Percent": { + force: true, + cleanupPercentage: 10, + sizeInBytes: 50, + insertDocuments: 5, + // cleanup will be based on size + // [insertDocuments * 0.1 (cleanup 10%) ≈ 0] + 1 (extra insert after compact) + expectedDocuments: 6, + skipForMongoDB: "MongoDB cleans up collection precisely close to sizeInBytes, not based on percentage", + }, "ForceFalse": { force: false, maxDocuments: 10, sizeInBytes: 100000, insertDocuments: 12, // overflows capped collection max documents expectedDocuments: 11, - skipForMongoDB: "Only {force:true} can be run on active replica set primary", + skipForMongoDB: "Compact command with {force:false} cannot be executed on active replica set primary", }, } { name, tc := name, tc t.Run(name, func(t *testing.T) { + t.Parallel() + if tc.skipForMongoDB != "" { setup.SkipForMongoDB(t, tc.skipForMongoDB) } - t.Parallel() + beOpts := setup.NewBackendOpts() + if tc.cleanupPercentage != 0 { + beOpts.CappedCleanupPercentage = tc.cleanupPercentage + } + + s := setup.SetupWithOpts(t, &setup.SetupOpts{BackendOptions: beOpts}) + ctx, coll := s.Ctx, s.Collection collName := testutil.CollectionName(t) + name diff --git a/integration/setup/listener.go b/integration/setup/listener.go index 1eba1d53d55..e795d810575 100644 --- a/integration/setup/listener.go +++ b/integration/setup/listener.go @@ -95,7 +95,7 @@ func listenerMongoDBURI(tb testtb.TB, hostPort, unixSocketPath string, tlsAndAut // setupListener starts in-process FerretDB server that runs until ctx is canceled. // It returns basic MongoDB URI for that listener. -func setupListener(tb testtb.TB, ctx context.Context, logger *zap.Logger) string { +func setupListener(tb testtb.TB, ctx context.Context, logger *zap.Logger, opts *BackendOpts) string { tb.Helper() _, span := otel.Tracer("").Start(ctx, "setupListener") @@ -168,6 +168,8 @@ func setupListener(tb testtb.TB, ctx context.Context, logger *zap.Logger) string sp, err := state.NewProvider("") require.NoError(tb, err) + require.NotNil(tb, opts) + handlerOpts := ®istry.NewHandlerOpts{ Logger: logger, ConnMetrics: listenerMetrics.ConnMetrics, @@ -180,8 +182,8 @@ func setupListener(tb testtb.TB, ctx context.Context, logger *zap.Logger) string TestOpts: registry.TestOpts{ DisablePushdown: *disablePushdownF, - CappedCleanupPercentage: 20, - CappedCleanupInterval: 0, + CappedCleanupPercentage: opts.CappedCleanupPercentage, + CappedCleanupInterval: opts.CappedCleanupInterval, EnableNewAuth: true, }, } diff --git a/integration/setup/setup.go b/integration/setup/setup.go index 1c76b22a4f9..6cd47492855 100644 --- a/integration/setup/setup.go +++ b/integration/setup/setup.go @@ -24,6 +24,7 @@ import ( "runtime/trace" "slices" "strings" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -94,6 +95,18 @@ type SetupOpts struct { // SetupUser true creates a user and returns an authenticated client. SetupUser bool + + // Options to override default backend configuration. + BackendOptions *BackendOpts +} + +// BackendOpts represents backend configuration used for test setup. +type BackendOpts struct { + // Capped collections cleanup interval. + CappedCleanupInterval time.Duration + + // Percentage of documents to cleanup for capped collections. If not set, defaults to 20. + CappedCleanupPercentage uint8 } // SetupResult represents setup results. @@ -103,6 +116,14 @@ type SetupResult struct { MongoDBURI string } +// NewBackendOpts returns BackendOpts with default values set. +func NewBackendOpts() *BackendOpts { + return &BackendOpts{ + CappedCleanupInterval: time.Duration(0), + CappedCleanupPercentage: uint8(20), + } +} + // IsUnixSocket returns true if MongoDB URI is a Unix domain socket. func (s *SetupResult) IsUnixSocket(tb testtb.TB) bool { tb.Helper() @@ -142,7 +163,11 @@ func SetupWithOpts(tb testtb.TB, opts *SetupOpts) *SetupResult { uri := *targetURLF if uri == "" { - uri = setupListener(tb, setupCtx, logger) + if opts.BackendOptions == nil { + opts.BackendOptions = NewBackendOpts() + } + + uri = setupListener(tb, setupCtx, logger, opts.BackendOptions) } else { uri = toAbsolutePathURI(tb, *targetURLF) } diff --git a/integration/setup/setup_compat.go b/integration/setup/setup_compat.go index fb86f7aa49c..d88184777dc 100644 --- a/integration/setup/setup_compat.go +++ b/integration/setup/setup_compat.go @@ -103,7 +103,7 @@ func SetupCompatWithOpts(tb testtb.TB, opts *SetupCompatOpts) *SetupCompatResult var targetClient *mongo.Client if *targetURLF == "" { - uri := setupListener(tb, setupCtx, logger) + uri := setupListener(tb, setupCtx, logger, NewBackendOpts()) targetClient = setupClient(tb, setupCtx, uri) } else { targetClient = setupClient(tb, setupCtx, *targetURLF) diff --git a/internal/handler/handler.go b/internal/handler/handler.go index afcad5878b4..37cd03ea2cb 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -246,69 +246,69 @@ func (h *Handler) cleanupAllCappedCollections(ctx context.Context) error { func (h *Handler) cleanupCappedCollection(ctx context.Context, db backends.Database, cInfo *backends.CollectionInfo, force bool) (int32, int64, error) { //nolint:lll // for readability must.BeTrue(cInfo.Capped()) + var docsDeleted int32 + var bytesFreed int64 + var statsBefore, statsAfter *backends.CollectionStatsResult + coll, err := db.Collection(cInfo.Name) if err != nil { return 0, 0, lazyerrors.Error(err) } - statsBefore, err := coll.Stats(ctx, &backends.CollectionStatsParams{Refresh: true}) + statsBefore, err = coll.Stats(ctx, &backends.CollectionStatsParams{Refresh: true}) if err != nil { return 0, 0, lazyerrors.Error(err) } - h.L.Debug("cleanupCappedCollection: stats before", zap.Any("stats", statsBefore)) - if statsBefore.SizeCollection < cInfo.CappedSize && statsBefore.CountDocuments < cInfo.CappedDocuments { - return 0, 0, nil - } + // In order to be more precise w.r.t number of documents getting dropped and to avoid + // deleting too many documents unnecessarily, + // + // - First, drop the surplus documents, if document count exceeds capped configuration. + // - Collect stats again. + // - If collection size still exceeds the capped size, then drop the documents based on + // CappedCleanupPercentage. - res, err := coll.Query(ctx, &backends.QueryParams{ - Sort: must.NotFail(types.NewDocument("$natural", int64(1))), - Limit: int64(float64(statsBefore.CountDocuments) * float64(h.CappedCleanupPercentage) / 100), - OnlyRecordIDs: true, - }) - if err != nil { - return 0, 0, lazyerrors.Error(err) - } - - defer res.Iter.Close() - - var recordIDs []int64 - for { - var doc *types.Document - if _, doc, err = res.Iter.Next(); err != nil { - if errors.Is(err, iterator.ErrIteratorDone) { - break - } + if count := getDocCleanupCount(cInfo, statsBefore); count > 0 { + err = deleteFirstNDocuments(ctx, coll, count) + if err != nil { + return 0, 0, lazyerrors.Error(err) + } + statsAfter, err = coll.Stats(ctx, &backends.CollectionStatsParams{Refresh: true}) + if err != nil { return 0, 0, lazyerrors.Error(err) } - recordIDs = append(recordIDs, doc.RecordID()) - } + h.L.Debug("cleanupCappedCollection: stats after document count reduction", zap.Any("stats", statsAfter)) + + docsDeleted += int32(count) + bytesFreed += (statsBefore.SizeTotal - statsAfter.SizeTotal) - if len(recordIDs) == 0 { - h.L.Debug("cleanupCappedCollection: no documents to delete") - return 0, 0, nil + statsBefore = statsAfter } - deleteRes, err := coll.DeleteAll(ctx, &backends.DeleteAllParams{RecordIDs: recordIDs}) - if err != nil { - return 0, 0, lazyerrors.Error(err) + if count := getSizeCleanupCount(cInfo, statsBefore, h.CappedCleanupPercentage); count > 0 { + err = deleteFirstNDocuments(ctx, coll, count) + if err != nil { + return 0, 0, lazyerrors.Error(err) + } + + docsDeleted += int32(count) } if _, err = coll.Compact(ctx, &backends.CompactParams{Full: force}); err != nil { return 0, 0, lazyerrors.Error(err) } - statsAfter, err := coll.Stats(ctx, &backends.CollectionStatsParams{Refresh: true}) + statsAfter, err = coll.Stats(ctx, &backends.CollectionStatsParams{Refresh: true}) if err != nil { return 0, 0, lazyerrors.Error(err) } - h.L.Debug("cleanupCappedCollection: stats after", zap.Any("stats", statsAfter)) + h.L.Debug("cleanupCappedCollection: stats after compact", zap.Any("stats", statsAfter)) - bytesFreed := statsBefore.SizeTotal - statsAfter.SizeTotal + bytesFreed += (statsBefore.SizeTotal - statsAfter.SizeTotal) // There's a possibility that the size of a collection might be greater at the // end of a compact operation if the collection is being actively written to at @@ -317,5 +317,69 @@ func (h *Handler) cleanupCappedCollection(ctx context.Context, db backends.Datab bytesFreed = 0 } - return deleteRes.Deleted, bytesFreed, nil + return docsDeleted, bytesFreed, nil +} + +// getDocCleanupCount returns the number of documents to be deleted during capped collection cleanup +// based on document count of the collection and capped configuration. +func getDocCleanupCount(cInfo *backends.CollectionInfo, cStats *backends.CollectionStatsResult) int64 { + if cInfo.CappedDocuments == 0 || cInfo.CappedDocuments >= cStats.CountDocuments { + return 0 + } + + return (cStats.CountDocuments - cInfo.CappedDocuments) +} + +// getSizeCleanupCount returns the number of documents to be deleted during capped collection cleanup +// based collection size, capped configuration and cleanup percentage. +func getSizeCleanupCount(cInfo *backends.CollectionInfo, cStats *backends.CollectionStatsResult, cleanupPercent uint8) int64 { + if cInfo.CappedSize >= cStats.SizeCollection { + return 0 + } + + return int64(float64(cStats.CountDocuments) * float64(cleanupPercent) / 100) +} + +// deleteFirstNDocuments drops first n documents (based on order of insertion) from the collection. +func deleteFirstNDocuments(ctx context.Context, coll backends.Collection, n int64) error { + if n == 0 { + return nil + } + + res, err := coll.Query(ctx, &backends.QueryParams{ + Sort: must.NotFail(types.NewDocument("$natural", int64(1))), + Limit: n, + OnlyRecordIDs: true, + }) + if err != nil { + return lazyerrors.Error(err) + } + + defer res.Iter.Close() + + var recordIDs []int64 + + for { + var doc *types.Document + + _, doc, err = res.Iter.Next() + if err != nil { + if errors.Is(err, iterator.ErrIteratorDone) { + break + } + + return lazyerrors.Error(err) + } + + recordIDs = append(recordIDs, doc.RecordID()) + } + + if len(recordIDs) > 0 { + _, err := coll.DeleteAll(ctx, &backends.DeleteAllParams{RecordIDs: recordIDs}) + if err != nil { + return lazyerrors.Error(err) + } + } + + return nil }