diff --git a/plugins/source.go b/plugins/source.go index e5b20994d3..3069b8a73b 100644 --- a/plugins/source.go +++ b/plugins/source.go @@ -31,10 +31,6 @@ type SourcePlugin struct { tables schema.Tables } -const ( - defaultConcurrency = 500000 -) - // Add internal columns func addInternalColumns(tables []*schema.Table) { for _, table := range tables { @@ -106,25 +102,25 @@ func (p *SourcePlugin) Version() string { // Sync is syncing data from the requested tables in spec to the given channel func (p *SourcePlugin) Sync(ctx context.Context, logger zerolog.Logger, spec specs.Source, res chan<- *schema.Resource) (*schema.SyncSummary, error) { - c, err := p.newExecutionClient(ctx, logger, spec) + spec.SetDefaults() + if err := spec.Validate(); err != nil { + return nil, fmt.Errorf("invalid spec: %w", err) + } + tableNames, err := p.listAndValidateTables(spec.Tables, spec.SkipTables) if err != nil { - return nil, fmt.Errorf("failed to create execution client for source plugin %s: %w", p.name, err) + return nil, err } - // limiter used to limit the amount of resources fetched concurrently - concurrency := spec.Concurrency - if concurrency == 0 { - concurrency = defaultConcurrency + c, err := p.newExecutionClient(ctx, logger, spec) + if err != nil { + return nil, fmt.Errorf("failed to create execution client for source plugin %s: %w", p.name, err) } - logger.Info().Uint64("concurrency", concurrency).Msg("starting source plugin sync") - goroutinesSem := semaphore.NewWeighted(helpers.Uint64ToInt64(concurrency)) + logger.Info().Interface("spec", spec).Msg("starting sync") + tableSem := semaphore.NewWeighted(helpers.Uint64ToInt64(spec.TableConcurrency)) wg := sync.WaitGroup{} + resourceSem := semaphore.NewWeighted(helpers.Uint64ToInt64(spec.ResourceConcurrency)) summary := schema.SyncSummary{} startTime := time.Now() - tableNames, err := p.listAndValidateTables(spec.Tables, spec.SkipTables) - if err != nil { - return nil, err - } logger.Debug().Interface("tables", tableNames).Msg("got table names") @@ -141,16 +137,16 @@ func (p *SourcePlugin) Sync(ctx context.Context, logger zerolog.Logger, spec spe for _, client := range clients { client := client wg.Add(1) - if err := goroutinesSem.Acquire(ctx, 1); err != nil { + if err := tableSem.Acquire(ctx, 1); err != nil { // This means context was cancelled return nil, err } go func() { defer wg.Done() - defer goroutinesSem.Release(1) + defer tableSem.Release(1) // TODO: prob introduce client.Identify() to be used in logs - tableSummary := table.Resolve(ctx, client, nil, res) + tableSummary := table.Resolve(ctx, client, nil, resourceSem, res) atomic.AddUint64(&summary.Resources, tableSummary.Resources) atomic.AddUint64(&summary.Errors, tableSummary.Errors) atomic.AddUint64(&summary.Panics, tableSummary.Panics) diff --git a/plugins/source_test.go b/plugins/source_test.go index c7df50417d..3a5b649c20 100644 --- a/plugins/source_test.go +++ b/plugins/source_test.go @@ -53,8 +53,10 @@ func TestSync(t *testing.T) { ) spec := specs.Source{ - Name: "testSource", - Tables: []string{"*"}, + Name: "testSource", + Tables: []string{"*"}, + Version: "v1.0.0", + Destinations: []string{"test"}, } resources := make(chan *schema.Resource) diff --git a/schema/table.go b/schema/table.go index b13559629f..fa45a7301d 100644 --- a/schema/table.go +++ b/schema/table.go @@ -12,6 +12,7 @@ import ( "github.com/cloudquery/plugin-sdk/helpers" "github.com/getsentry/sentry-go" "github.com/thoas/go-funk" + "golang.org/x/sync/semaphore" ) // TableResolver is the main entry point when a table is sync is called. @@ -64,9 +65,6 @@ type Table struct { // Parent is the parent table in case this table is called via parent table (i.e. relation) Parent *Table `json:"-"` - // Serial is used to force a signature change, which forces new table creation and cascading removal of old table and relations - Serial string `json:"-"` - columnsMap map[string]int } @@ -165,13 +163,16 @@ func (t Table) TableNames() []string { } // Call the table resolver with with all of it's relation for every reolved resource -func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, resolvedResources chan<- *Resource) (summary SyncSummary) { +func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, resourcesSem *semaphore.Weighted, resolvedResources chan<- *Resource) (summary SyncSummary) { tableStartTime := time.Now() meta.Logger().Info().Str("table", t.Name).Msg("table resolver started") res := make(chan interface{}) startTime := time.Now() + wg := sync.WaitGroup{} + wg.Add(1) go func(sum *SyncSummary) { + defer wg.Done() defer func() { if err := recover(); err != nil { stack := fmt.Sprintf("%s\n%s", err, string(debug.Stack())) @@ -199,9 +200,26 @@ func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, r continue } for i := range objects { - summary.Merge(t.resolveObject(ctx, meta, parent, objects[i], resolvedResources)) + i := i + + // right now we support concurrency only for objects/resources of parent tables + if resourcesSem == nil { + summary.Merge(t.resolveObject(ctx, meta, parent, objects[i], resolvedResources)) + } else { + if err := resourcesSem.Acquire(ctx, 1); err != nil { + meta.Logger().Error().Err(err).Msg("failed to acquire semaphore") + return summary + } + wg.Add(1) + go func() { + defer wg.Done() + //nolint:all + summary.Merge(t.resolveObject(ctx, meta, parent, objects[i], resolvedResources)) + }() + } } } + wg.Wait() meta.Logger().Info().Str("table", t.Name).Int("total_resources", tableResources).TimeDiff("duration", time.Now(), tableStartTime).Msg("fetch table finished") return summary @@ -270,7 +288,7 @@ func (t Table) resolveObject(ctx context.Context, meta ClientMeta, parent *Resou resolvedResources <- resource for _, rel := range t.Relations { - summary.Merge(rel.Resolve(ctx, meta, resource, resolvedResources)) + summary.Merge(rel.Resolve(ctx, meta, resource, nil, resolvedResources)) } return summary diff --git a/schema/table_test.go b/schema/table_test.go index 7ddaa17341..c750c69f47 100644 --- a/schema/table_test.go +++ b/schema/table_test.go @@ -217,7 +217,7 @@ func TestTableExecution(t *testing.T) { var summary SyncSummary go func() { defer close(resources) - summary = tc.Table.Resolve(ctx, m, nil, resources) + summary = tc.Table.Resolve(ctx, m, nil, nil, resources) }() var i = uint64(0) for resource := range resources { diff --git a/serve/source_test.go b/serve/source_test.go index d30a068afc..45b6b2515c 100644 --- a/serve/source_test.go +++ b/serve/source_test.go @@ -142,11 +142,12 @@ func TestServeSource(t *testing.T) { resources := make(chan []byte, 1) if err := c.Sync(ctx, specs.Source{ - Name: "testSourcePlugin", - Version: "v1.0.0", - Registry: specs.RegistryGithub, - Tables: []string{"*"}, - Spec: TestSourcePluginSpec{Accounts: []string{"cloudquery/plugin-sdk"}}, + Name: "testSourcePlugin", + Version: "v1.0.0", + Registry: specs.RegistryGithub, + Tables: []string{"*"}, + Spec: TestSourcePluginSpec{Accounts: []string{"cloudquery/plugin-sdk"}}, + Destinations: []string{"test"}, }, resources); err != nil { t.Fatal(err) diff --git a/specs/source.go b/specs/source.go index 6e8d99ed45..0aaf93f1b2 100644 --- a/specs/source.go +++ b/specs/source.go @@ -7,6 +7,11 @@ import ( "strings" ) +const ( + defaultTableConcurrency = 500000 + defaultResourceConcurrency = 500000 +) + // Source is the spec for a source plugin type Source struct { // Name of the source plugin to use @@ -20,8 +25,9 @@ type Source struct { // For the gRPC registry the path will be the address of the gRPC server: host:port Path string `json:"path,omitempty"` // Registry can be github,local,grpc. - Registry Registry `json:"registry,omitempty"` - Concurrency uint64 `json:"concurrency,omitempty"` + Registry Registry `json:"registry,omitempty"` + TableConcurrency uint64 `json:"table_concurrency,omitempty"` + ResourceConcurrency uint64 `json:"resource_concurrency,omitempty"` // Tables to sync from the source plugin Tables []string `json:"tables,omitempty"` // SkipTables defines tables to skip when syncing data. Useful if a glob pattern is used in Tables @@ -46,6 +52,13 @@ func (s *Source) SetDefaults() { if s.Tables == nil { s.Tables = []string{"*"} } + + if s.TableConcurrency == 0 { + s.TableConcurrency = defaultTableConcurrency + } + if s.ResourceConcurrency == 0 { + s.ResourceConcurrency = defaultResourceConcurrency + } } // UnmarshalSpec unmarshals the internal spec into the given interface diff --git a/specs/source_test.go b/specs/source_test.go index d065068d76..2d47836b6d 100644 --- a/specs/source_test.go +++ b/specs/source_test.go @@ -117,12 +117,14 @@ spec: `, "", &Source{ - Name: "test", - Registry: RegistryGithub, - Path: "cloudquery/test", - Version: "v1.1.0", - Tables: []string{"*"}, - Destinations: []string{"test"}, + Name: "test", + Registry: RegistryGithub, + Path: "cloudquery/test", + TableConcurrency: defaultTableConcurrency, + ResourceConcurrency: defaultResourceConcurrency, + Version: "v1.1.0", + Tables: []string{"*"}, + Destinations: []string{"test"}, }, }, } diff --git a/specs/testdata/dir/aws.yml b/specs/testdata/dir/aws.yml index f10fb8f674..55f71013f4 100644 --- a/specs/testdata/dir/aws.yml +++ b/specs/testdata/dir/aws.yml @@ -2,6 +2,6 @@ kind: source spec: name: aws version: v1.0.0 - concurrency: 10 + table_concurrency: 10 registry: local destinations: [postgresql] diff --git a/specs/testdata/gcp.yml b/specs/testdata/gcp.yml index 9d27a4431e..377907e25b 100644 --- a/specs/testdata/gcp.yml +++ b/specs/testdata/gcp.yml @@ -2,7 +2,7 @@ kind: source spec: name: gcp version: v1.0.0 - concurrency: 10 + table_concurrency: 10 registry: local destinations: [postgresqlv2] ---