Skip to content

Commit

Permalink
Merge pull request #15 from 0x-buidl/chore/cleanups
Browse files Browse the repository at this point in the history
chore: cleanups
  • Loading branch information
lxnre-codes committed Sep 5, 2023
2 parents 459c5f0 + bcdf5c8 commit c2220b2
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 139 deletions.
146 changes: 144 additions & 2 deletions internal/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,56 @@ import (

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

type UnionFindOpts interface {
*mopt.FindOptions | *mopt.FindOneOptions
}

func BuildPopulatePipeline[P UnionFindOpts](
d any, q bson.M, opt P,
) (mongo.Pipeline, *options.AggregateOptions, error) {
pipelineOpts, aggrOpts, queryOpts := mergeFindOptsWithAggregatOpts(opt)
pipeline := append(mongo.Pipeline{bson.D{{Key: "$match", Value: q}}}, pipelineOpts...)
var wg sync.WaitGroup
var mu sync.Mutex
var err error
for _, pop := range *queryOpts.PopulateOption {
wg.Add(1)
func(pop *mopt.PopulateOptions) {
defer wg.Done()
mu.Lock()
if err != nil {
mu.Unlock()
return
}
mu.Unlock()

pipe, pErr := getPopulateStages(d, pop)

mu.Lock()
if pErr != nil {
err = pErr
mu.Unlock()
return
}
pipeline = append(pipeline, pipe...)
mu.Unlock()
}(pop)
}
wg.Wait()

if err != nil {
return nil, nil, err
}

return pipeline, aggrOpts, nil
}

// TODO: custom populate errors
func GetPopulateStages(doc any, opt *mopt.PopulateOptions) (mongo.Pipeline, error) {

func getPopulateStages(doc any, opt *mopt.PopulateOptions) (mongo.Pipeline, error) {
lookupPipeline := mongo.Pipeline{
bson.D{
{
Expand Down Expand Up @@ -42,7 +88,7 @@ func GetPopulateStages(doc any, opt *mopt.PopulateOptions) (mongo.Pipeline, erro
}
mu.Unlock()

pipe, pErr := GetPopulateStages(opt.Schema, p)
pipe, pErr := getPopulateStages(opt.Schema, p)

mu.Lock()
if pErr != nil {
Expand Down Expand Up @@ -184,6 +230,102 @@ func GetPopulateStages(doc any, opt *mopt.PopulateOptions) (mongo.Pipeline, erro
return populatePipeline, nil
}

// TODO: merge these options with aggregate options if possible
// if opt.AllowPartialResults != nil {
// }
// if opt.CursorType != nil {
// }
// if opt.Max != nil {
// }
// if opt.Min != nil {
// }
// if opt.NoCursorTimeout != nil {
// }
// if opt.OplogReplay != nil {
// }
// if opt.ReturnKey != nil {
// }
// if opt.ShowRecordID != nil {
// }
// if opt.Snapshot != nil {
// }

func mergeFindOptsWithAggregatOpts[T UnionFindOpts](
opt T,
) (mongo.Pipeline, *options.AggregateOptions, *mopt.QueryOptions) {
aggOpts, pipelineOpts, queryOpts := options.Aggregate(), mongo.Pipeline{}, mopt.Query()
switch opt := any(opt).(type) {
case *mopt.FindOptions:
if opt.AllowDiskUse != nil {
aggOpts.SetAllowDiskUse(*opt.AllowDiskUse)
}
if opt.BatchSize != nil {
aggOpts.SetBatchSize(*opt.BatchSize)
}
if opt.Collation != nil {
aggOpts.SetCollation(opt.Collation)
}
if opt.Comment != nil {
aggOpts.SetComment(*opt.Comment)
}
if opt.Hint != nil {
aggOpts.SetHint(opt.Hint)
}
if opt.MaxAwaitTime != nil {
aggOpts.SetMaxAwaitTime(*opt.MaxAwaitTime)
}
if opt.MaxTime != nil {
aggOpts.SetMaxTime(*opt.MaxTime)
}
if opt.Let != nil {
aggOpts.SetLet(opt.Let)
}
if opt.Sort != nil {
pipelineOpts = append(pipelineOpts, bson.D{{Key: "$sort", Value: opt.Sort}})
}
if opt.Limit != nil {
pipelineOpts = append(pipelineOpts, bson.D{{Key: "$limit", Value: *opt.Limit}})
}
if opt.Skip != nil {
pipelineOpts = append(pipelineOpts, bson.D{{Key: "$skip", Value: *opt.Skip}})
}
if opt.Projection != nil {
pipelineOpts = append(pipelineOpts, bson.D{{Key: "$project", Value: opt.Projection}})
}
queryOpts = opt.QueryOptions
case *mopt.FindOneOptions:
if opt.BatchSize != nil {
aggOpts.SetBatchSize(*opt.BatchSize)
}
if opt.Collation != nil {
aggOpts.SetCollation(opt.Collation)
}
if opt.Comment != nil {
aggOpts.SetComment(*opt.Comment)
}
if opt.Hint != nil {
aggOpts.SetHint(opt.Hint)
}
if opt.MaxAwaitTime != nil {
aggOpts.SetMaxAwaitTime(*opt.MaxAwaitTime)
}
if opt.MaxTime != nil {
aggOpts.SetMaxTime(*opt.MaxTime)
}
if opt.Sort != nil {
pipelineOpts = append(pipelineOpts, bson.D{{Key: "$sort", Value: opt.Sort}})
}
if opt.Skip != nil {
pipelineOpts = append(pipelineOpts, bson.D{{Key: "$skip", Value: *opt.Skip}})
}
if opt.Projection != nil {
pipelineOpts = append(pipelineOpts, bson.D{{Key: "$project", Value: opt.Projection}})
}
queryOpts = opt.QueryOptions
}
return pipelineOpts, aggOpts, queryOpts
}

func getStructFields(t reflect.Type) map[string]reflect.StructField {
fields := make(map[string]reflect.StructField)
for i := 0; i < t.NumField(); i++ {
Expand Down
108 changes: 0 additions & 108 deletions internal/options.go

This file was deleted.

32 changes: 3 additions & 29 deletions model.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package mgs
import (
"context"
"reflect"
"sync"
"time"

int "github.com/0x-buidl/mgs/internal"
Expand Down Expand Up @@ -502,35 +501,10 @@ func findWithPopulate[U int.UnionFindOpts, T Schema, P IDefaultSchema](
ctx context.Context, c *mongo.Collection,
q bson.M, d T, opt U,
) ([]*Document[T, P], error) {
pipelineOpts, aggrOpts, queryOpts := int.MergeFindOptsWithAggregatOpts(opt)
pipeline := append(mongo.Pipeline{bson.D{{Key: "$match", Value: q}}}, pipelineOpts...)
var wg sync.WaitGroup
var mu sync.Mutex
var err error
for _, pop := range *queryOpts.PopulateOption {
wg.Add(1)
func(pop *mopt.PopulateOptions) {
defer wg.Done()
mu.Lock()
if err != nil {
mu.Unlock()
return
}
mu.Unlock()

pipe, pErr := int.GetPopulateStages(d, pop)

mu.Lock()
if pErr != nil {
err = pErr
mu.Unlock()
return
}
pipeline = append(pipeline, pipe...)
mu.Unlock()
}(pop)
pipeline, aggrOpts, err := int.BuildPopulatePipeline(d, q, opt)
if err != nil {
return nil, err
}
wg.Wait()

docs := make([]*Document[T, P], 0)
cursor, err := c.Aggregate(ctx, pipeline, aggrOpts)
Expand Down

0 comments on commit c2220b2

Please sign in to comment.