From 706f4785224be2ec5bece9967233a13e2ef2242b Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Sun, 9 Oct 2022 14:48:00 +0300 Subject: [PATCH 1/4] feat: Support table_concurrency and resource_concurrency --- plugins/source.go | 30 ++++++++++++++---------------- plugins/source_test.go | 2 ++ schema/table.go | 29 +++++++++++++++++++++++------ schema/table_test.go | 2 +- serve/source_test.go | 1 + specs/source.go | 15 ++++++++++++++- specs/source_test.go | 2 ++ specs/testdata/dir/aws.yml | 2 +- specs/testdata/gcp.yml | 2 +- 9 files changed, 59 insertions(+), 26 deletions(-) diff --git a/plugins/source.go b/plugins/source.go index e5b20994d3..78d2e472bc 100644 --- a/plugins/source.go +++ b/plugins/source.go @@ -31,9 +31,7 @@ type SourcePlugin struct { tables schema.Tables } -const ( - defaultConcurrency = 500000 -) + // Add internal columns func addInternalColumns(tables []*schema.Table) { @@ -106,25 +104,25 @@ func (p *SourcePlugin) Version() string { // Sync is syncing data from the requested tables in spec to the given channel func (p *SourcePlugin) Sync(ctx context.Context, logger zerolog.Logger, spec specs.Source, res chan<- *schema.Resource) (*schema.SyncSummary, error) { - c, err := p.newExecutionClient(ctx, logger, spec) + spec.SetDefaults() + if err := spec.Validate(); err != nil { + return nil, fmt.Errorf("invalid spec: %w", err) + } + tableNames, err := p.listAndValidateTables(spec.Tables, spec.SkipTables) if err != nil { - return nil, fmt.Errorf("failed to create execution client for source plugin %s: %w", p.name, err) + return nil, err } - // limiter used to limit the amount of resources fetched concurrently - concurrency := spec.Concurrency - if concurrency == 0 { - concurrency = defaultConcurrency + c, err := p.newExecutionClient(ctx, logger, spec) + if err != nil { + return nil, fmt.Errorf("failed to create execution client for source plugin %s: %w", p.name, err) } - logger.Info().Uint64("concurrency", concurrency).Msg("starting source plugin sync") - goroutinesSem := semaphore.NewWeighted(helpers.Uint64ToInt64(concurrency)) + logger.Info().Interface("spec", spec).Msg("starting sync") + goroutinesSem := semaphore.NewWeighted(helpers.Uint64ToInt64(spec.TableConcurrency)) wg := sync.WaitGroup{} + resourcesSem := semaphore.NewWeighted(helpers.Uint64ToInt64(spec.ResourceConcurrency)) summary := schema.SyncSummary{} startTime := time.Now() - tableNames, err := p.listAndValidateTables(spec.Tables, spec.SkipTables) - if err != nil { - return nil, err - } logger.Debug().Interface("tables", tableNames).Msg("got table names") @@ -150,7 +148,7 @@ func (p *SourcePlugin) Sync(ctx context.Context, logger zerolog.Logger, spec spe defer goroutinesSem.Release(1) // TODO: prob introduce client.Identify() to be used in logs - tableSummary := table.Resolve(ctx, client, nil, res) + tableSummary := table.Resolve(ctx, client, nil, resourcesSem, res) atomic.AddUint64(&summary.Resources, tableSummary.Resources) atomic.AddUint64(&summary.Errors, tableSummary.Errors) atomic.AddUint64(&summary.Panics, tableSummary.Panics) diff --git a/plugins/source_test.go b/plugins/source_test.go index c7df50417d..9fd5b0bb1c 100644 --- a/plugins/source_test.go +++ b/plugins/source_test.go @@ -55,6 +55,8 @@ func TestSync(t *testing.T) { spec := specs.Source{ Name: "testSource", Tables: []string{"*"}, + Version: "v1.0.0", + Destinations: []string{"test"}, } resources := make(chan *schema.Resource) diff --git a/schema/table.go b/schema/table.go index b13559629f..fec84f82c3 100644 --- a/schema/table.go +++ b/schema/table.go @@ -12,6 +12,7 @@ import ( "github.com/cloudquery/plugin-sdk/helpers" "github.com/getsentry/sentry-go" "github.com/thoas/go-funk" + "golang.org/x/sync/semaphore" ) // TableResolver is the main entry point when a table is sync is called. @@ -64,9 +65,6 @@ type Table struct { // Parent is the parent table in case this table is called via parent table (i.e. relation) Parent *Table `json:"-"` - // Serial is used to force a signature change, which forces new table creation and cascading removal of old table and relations - Serial string `json:"-"` - columnsMap map[string]int } @@ -165,13 +163,16 @@ func (t Table) TableNames() []string { } // Call the table resolver with with all of it's relation for every reolved resource -func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, resolvedResources chan<- *Resource) (summary SyncSummary) { +func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, resourcesSem *semaphore.Weighted, resolvedResources chan<- *Resource) (summary SyncSummary) { tableStartTime := time.Now() meta.Logger().Info().Str("table", t.Name).Msg("table resolver started") res := make(chan interface{}) startTime := time.Now() + wg := sync.WaitGroup{} + wg.Add(1) go func(sum *SyncSummary) { + defer wg.Done() defer func() { if err := recover(); err != nil { stack := fmt.Sprintf("%s\n%s", err, string(debug.Stack())) @@ -199,9 +200,25 @@ func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, r continue } for i := range objects { - summary.Merge(t.resolveObject(ctx, meta, parent, objects[i], resolvedResources)) + i := i + + // write now we support only concurrency + if resourcesSem == nil { + summary.Merge(t.resolveObject(ctx, meta, parent, objects[i], resolvedResources)) + } else { + if err := resourcesSem.Acquire(ctx, 1); err != nil { + meta.Logger().Error().Err(err).Msg("failed to acquire semaphore") + return summary + } + wg.Add(1) + go func() { + defer wg.Done() + summary.Merge(t.resolveObject(ctx, meta, parent, objects[i], resolvedResources)) + }() + } } } + wg.Wait() meta.Logger().Info().Str("table", t.Name).Int("total_resources", tableResources).TimeDiff("duration", time.Now(), tableStartTime).Msg("fetch table finished") return summary @@ -270,7 +287,7 @@ func (t Table) resolveObject(ctx context.Context, meta ClientMeta, parent *Resou resolvedResources <- resource for _, rel := range t.Relations { - summary.Merge(rel.Resolve(ctx, meta, resource, resolvedResources)) + summary.Merge(rel.Resolve(ctx, meta, resource, nil, resolvedResources)) } return summary diff --git a/schema/table_test.go b/schema/table_test.go index 7ddaa17341..c750c69f47 100644 --- a/schema/table_test.go +++ b/schema/table_test.go @@ -217,7 +217,7 @@ func TestTableExecution(t *testing.T) { var summary SyncSummary go func() { defer close(resources) - summary = tc.Table.Resolve(ctx, m, nil, resources) + summary = tc.Table.Resolve(ctx, m, nil, nil, resources) }() var i = uint64(0) for resource := range resources { diff --git a/serve/source_test.go b/serve/source_test.go index d30a068afc..c4a4cd1075 100644 --- a/serve/source_test.go +++ b/serve/source_test.go @@ -147,6 +147,7 @@ func TestServeSource(t *testing.T) { Registry: specs.RegistryGithub, Tables: []string{"*"}, Spec: TestSourcePluginSpec{Accounts: []string{"cloudquery/plugin-sdk"}}, + Destinations: []string{"test"}, }, resources); err != nil { t.Fatal(err) diff --git a/specs/source.go b/specs/source.go index 6e8d99ed45..6ff308ee2f 100644 --- a/specs/source.go +++ b/specs/source.go @@ -7,6 +7,11 @@ import ( "strings" ) +const ( + defaultTableConcurrency = 500000 + defaultResourceConcurrency = 500000 +) + // Source is the spec for a source plugin type Source struct { // Name of the source plugin to use @@ -21,7 +26,8 @@ type Source struct { Path string `json:"path,omitempty"` // Registry can be github,local,grpc. Registry Registry `json:"registry,omitempty"` - Concurrency uint64 `json:"concurrency,omitempty"` + TableConcurrency uint64 `json:"table_concurrency,omitempty"` + ResourceConcurrency uint64 `json:"resource_concurrency,omitempty"` // Tables to sync from the source plugin Tables []string `json:"tables,omitempty"` // SkipTables defines tables to skip when syncing data. Useful if a glob pattern is used in Tables @@ -46,6 +52,13 @@ func (s *Source) SetDefaults() { if s.Tables == nil { s.Tables = []string{"*"} } + + if s.TableConcurrency == 0 { + s.TableConcurrency = defaultTableConcurrency + } + if s.ResourceConcurrency == 0 { + s.ResourceConcurrency = defaultResourceConcurrency + } } // UnmarshalSpec unmarshals the internal spec into the given interface diff --git a/specs/source_test.go b/specs/source_test.go index d065068d76..17c686ec5c 100644 --- a/specs/source_test.go +++ b/specs/source_test.go @@ -120,6 +120,8 @@ spec: Name: "test", Registry: RegistryGithub, Path: "cloudquery/test", + TableConcurrency: defaultTableConcurrency, + ResourceConcurrency: defaultResourceConcurrency, Version: "v1.1.0", Tables: []string{"*"}, Destinations: []string{"test"}, diff --git a/specs/testdata/dir/aws.yml b/specs/testdata/dir/aws.yml index f10fb8f674..55f71013f4 100644 --- a/specs/testdata/dir/aws.yml +++ b/specs/testdata/dir/aws.yml @@ -2,6 +2,6 @@ kind: source spec: name: aws version: v1.0.0 - concurrency: 10 + table_concurrency: 10 registry: local destinations: [postgresql] diff --git a/specs/testdata/gcp.yml b/specs/testdata/gcp.yml index 9d27a4431e..377907e25b 100644 --- a/specs/testdata/gcp.yml +++ b/specs/testdata/gcp.yml @@ -2,7 +2,7 @@ kind: source spec: name: gcp version: v1.0.0 - concurrency: 10 + table_concurrency: 10 registry: local destinations: [postgresqlv2] --- From fab2ab73bf3a39ee348713b6d9f2b794ea12b508 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Sun, 9 Oct 2022 15:20:49 +0300 Subject: [PATCH 2/4] fix lint --- plugins/source.go | 2 -- plugins/source_test.go | 6 +++--- schema/table.go | 5 +++-- serve/source_test.go | 10 +++++----- specs/source.go | 8 ++++---- specs/source_test.go | 14 +++++++------- 6 files changed, 22 insertions(+), 23 deletions(-) diff --git a/plugins/source.go b/plugins/source.go index 78d2e472bc..080a912cf0 100644 --- a/plugins/source.go +++ b/plugins/source.go @@ -31,8 +31,6 @@ type SourcePlugin struct { tables schema.Tables } - - // Add internal columns func addInternalColumns(tables []*schema.Table) { for _, table := range tables { diff --git a/plugins/source_test.go b/plugins/source_test.go index 9fd5b0bb1c..3a5b649c20 100644 --- a/plugins/source_test.go +++ b/plugins/source_test.go @@ -53,9 +53,9 @@ func TestSync(t *testing.T) { ) spec := specs.Source{ - Name: "testSource", - Tables: []string{"*"}, - Version: "v1.0.0", + Name: "testSource", + Tables: []string{"*"}, + Version: "v1.0.0", Destinations: []string{"test"}, } diff --git a/schema/table.go b/schema/table.go index fec84f82c3..90c45b94bf 100644 --- a/schema/table.go +++ b/schema/table.go @@ -201,8 +201,8 @@ func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, r } for i := range objects { i := i - - // write now we support only concurrency + + // write now we support only concurrency if resourcesSem == nil { summary.Merge(t.resolveObject(ctx, meta, parent, objects[i], resolvedResources)) } else { @@ -213,6 +213,7 @@ func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, r wg.Add(1) go func() { defer wg.Done() + //nolint:all summary.Merge(t.resolveObject(ctx, meta, parent, objects[i], resolvedResources)) }() } diff --git a/serve/source_test.go b/serve/source_test.go index c4a4cd1075..45b6b2515c 100644 --- a/serve/source_test.go +++ b/serve/source_test.go @@ -142,11 +142,11 @@ func TestServeSource(t *testing.T) { resources := make(chan []byte, 1) if err := c.Sync(ctx, specs.Source{ - Name: "testSourcePlugin", - Version: "v1.0.0", - Registry: specs.RegistryGithub, - Tables: []string{"*"}, - Spec: TestSourcePluginSpec{Accounts: []string{"cloudquery/plugin-sdk"}}, + Name: "testSourcePlugin", + Version: "v1.0.0", + Registry: specs.RegistryGithub, + Tables: []string{"*"}, + Spec: TestSourcePluginSpec{Accounts: []string{"cloudquery/plugin-sdk"}}, Destinations: []string{"test"}, }, resources); err != nil { diff --git a/specs/source.go b/specs/source.go index 6ff308ee2f..0aaf93f1b2 100644 --- a/specs/source.go +++ b/specs/source.go @@ -8,7 +8,7 @@ import ( ) const ( - defaultTableConcurrency = 500000 + defaultTableConcurrency = 500000 defaultResourceConcurrency = 500000 ) @@ -25,9 +25,9 @@ type Source struct { // For the gRPC registry the path will be the address of the gRPC server: host:port Path string `json:"path,omitempty"` // Registry can be github,local,grpc. - Registry Registry `json:"registry,omitempty"` - TableConcurrency uint64 `json:"table_concurrency,omitempty"` - ResourceConcurrency uint64 `json:"resource_concurrency,omitempty"` + Registry Registry `json:"registry,omitempty"` + TableConcurrency uint64 `json:"table_concurrency,omitempty"` + ResourceConcurrency uint64 `json:"resource_concurrency,omitempty"` // Tables to sync from the source plugin Tables []string `json:"tables,omitempty"` // SkipTables defines tables to skip when syncing data. Useful if a glob pattern is used in Tables diff --git a/specs/source_test.go b/specs/source_test.go index 17c686ec5c..2d47836b6d 100644 --- a/specs/source_test.go +++ b/specs/source_test.go @@ -117,14 +117,14 @@ spec: `, "", &Source{ - Name: "test", - Registry: RegistryGithub, - Path: "cloudquery/test", - TableConcurrency: defaultTableConcurrency, + Name: "test", + Registry: RegistryGithub, + Path: "cloudquery/test", + TableConcurrency: defaultTableConcurrency, ResourceConcurrency: defaultResourceConcurrency, - Version: "v1.1.0", - Tables: []string{"*"}, - Destinations: []string{"test"}, + Version: "v1.1.0", + Tables: []string{"*"}, + Destinations: []string{"test"}, }, }, } From fc52e63b2566a653daf6e7d38af9894e31d94629 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Mon, 10 Oct 2022 11:14:00 +0300 Subject: [PATCH 3/4] Update schema/table.go Co-authored-by: Kemal <223029+disq@users.noreply.github.com> --- schema/table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/schema/table.go b/schema/table.go index 90c45b94bf..f27e5ffb55 100644 --- a/schema/table.go +++ b/schema/table.go @@ -202,7 +202,7 @@ func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, r for i := range objects { i := i - // write now we support only concurrency + // right now we support only concurrency if resourcesSem == nil { summary.Merge(t.resolveObject(ctx, meta, parent, objects[i], resolvedResources)) } else { From 59e41fbcb8470d2a88dda8edaa53a718f5619cae Mon Sep 17 00:00:00 2001 From: Yevgeny Pats <16490766+yevgenypats@users.noreply.github.com> Date: Mon, 10 Oct 2022 13:57:56 +0300 Subject: [PATCH 4/4] fix review --- plugins/source.go | 10 +++++----- schema/table.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/plugins/source.go b/plugins/source.go index 080a912cf0..3069b8a73b 100644 --- a/plugins/source.go +++ b/plugins/source.go @@ -116,9 +116,9 @@ func (p *SourcePlugin) Sync(ctx context.Context, logger zerolog.Logger, spec spe return nil, fmt.Errorf("failed to create execution client for source plugin %s: %w", p.name, err) } logger.Info().Interface("spec", spec).Msg("starting sync") - goroutinesSem := semaphore.NewWeighted(helpers.Uint64ToInt64(spec.TableConcurrency)) + tableSem := semaphore.NewWeighted(helpers.Uint64ToInt64(spec.TableConcurrency)) wg := sync.WaitGroup{} - resourcesSem := semaphore.NewWeighted(helpers.Uint64ToInt64(spec.ResourceConcurrency)) + resourceSem := semaphore.NewWeighted(helpers.Uint64ToInt64(spec.ResourceConcurrency)) summary := schema.SyncSummary{} startTime := time.Now() @@ -137,16 +137,16 @@ func (p *SourcePlugin) Sync(ctx context.Context, logger zerolog.Logger, spec spe for _, client := range clients { client := client wg.Add(1) - if err := goroutinesSem.Acquire(ctx, 1); err != nil { + if err := tableSem.Acquire(ctx, 1); err != nil { // This means context was cancelled return nil, err } go func() { defer wg.Done() - defer goroutinesSem.Release(1) + defer tableSem.Release(1) // TODO: prob introduce client.Identify() to be used in logs - tableSummary := table.Resolve(ctx, client, nil, resourcesSem, res) + tableSummary := table.Resolve(ctx, client, nil, resourceSem, res) atomic.AddUint64(&summary.Resources, tableSummary.Resources) atomic.AddUint64(&summary.Errors, tableSummary.Errors) atomic.AddUint64(&summary.Panics, tableSummary.Panics) diff --git a/schema/table.go b/schema/table.go index f27e5ffb55..fa45a7301d 100644 --- a/schema/table.go +++ b/schema/table.go @@ -202,7 +202,7 @@ func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, r for i := range objects { i := i - // right now we support only concurrency + // right now we support concurrency only for objects/resources of parent tables if resourcesSem == nil { summary.Merge(t.resolveObject(ctx, meta, parent, objects[i], resolvedResources)) } else {