Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 15 additions & 19 deletions plugins/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ type SourcePlugin struct {
tables schema.Tables
}

const (
defaultConcurrency = 500000
)

// Add internal columns
func addInternalColumns(tables []*schema.Table) {
for _, table := range tables {
Expand Down Expand Up @@ -106,25 +102,25 @@ func (p *SourcePlugin) Version() string {

// Sync is syncing data from the requested tables in spec to the given channel
func (p *SourcePlugin) Sync(ctx context.Context, logger zerolog.Logger, spec specs.Source, res chan<- *schema.Resource) (*schema.SyncSummary, error) {
c, err := p.newExecutionClient(ctx, logger, spec)
spec.SetDefaults()
if err := spec.Validate(); err != nil {
return nil, fmt.Errorf("invalid spec: %w", err)
}
tableNames, err := p.listAndValidateTables(spec.Tables, spec.SkipTables)
if err != nil {
return nil, fmt.Errorf("failed to create execution client for source plugin %s: %w", p.name, err)
return nil, err
}

// limiter used to limit the amount of resources fetched concurrently
concurrency := spec.Concurrency
if concurrency == 0 {
concurrency = defaultConcurrency
c, err := p.newExecutionClient(ctx, logger, spec)
if err != nil {
return nil, fmt.Errorf("failed to create execution client for source plugin %s: %w", p.name, err)
}
logger.Info().Uint64("concurrency", concurrency).Msg("starting source plugin sync")
goroutinesSem := semaphore.NewWeighted(helpers.Uint64ToInt64(concurrency))
logger.Info().Interface("spec", spec).Msg("starting sync")
tableSem := semaphore.NewWeighted(helpers.Uint64ToInt64(spec.TableConcurrency))
wg := sync.WaitGroup{}
resourceSem := semaphore.NewWeighted(helpers.Uint64ToInt64(spec.ResourceConcurrency))
summary := schema.SyncSummary{}
startTime := time.Now()
tableNames, err := p.listAndValidateTables(spec.Tables, spec.SkipTables)
if err != nil {
return nil, err
}

logger.Debug().Interface("tables", tableNames).Msg("got table names")

Expand All @@ -141,16 +137,16 @@ func (p *SourcePlugin) Sync(ctx context.Context, logger zerolog.Logger, spec spe
for _, client := range clients {
client := client
wg.Add(1)
if err := goroutinesSem.Acquire(ctx, 1); err != nil {
if err := tableSem.Acquire(ctx, 1); err != nil {
// This means context was cancelled
return nil, err
}
go func() {
defer wg.Done()
defer goroutinesSem.Release(1)
defer tableSem.Release(1)
// TODO: prob introduce client.Identify() to be used in logs

tableSummary := table.Resolve(ctx, client, nil, res)
tableSummary := table.Resolve(ctx, client, nil, resourceSem, res)
atomic.AddUint64(&summary.Resources, tableSummary.Resources)
atomic.AddUint64(&summary.Errors, tableSummary.Errors)
atomic.AddUint64(&summary.Panics, tableSummary.Panics)
Expand Down
6 changes: 4 additions & 2 deletions plugins/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ func TestSync(t *testing.T) {
)

spec := specs.Source{
Name: "testSource",
Tables: []string{"*"},
Name: "testSource",
Tables: []string{"*"},
Version: "v1.0.0",
Destinations: []string{"test"},
}

resources := make(chan *schema.Resource)
Expand Down
30 changes: 24 additions & 6 deletions schema/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cloudquery/plugin-sdk/helpers"
"github.com/getsentry/sentry-go"
"github.com/thoas/go-funk"
"golang.org/x/sync/semaphore"
)

// TableResolver is the main entry point when a table is sync is called.
Expand Down Expand Up @@ -64,9 +65,6 @@ type Table struct {
// Parent is the parent table in case this table is called via parent table (i.e. relation)
Parent *Table `json:"-"`

// Serial is used to force a signature change, which forces new table creation and cascading removal of old table and relations
Serial string `json:"-"`

columnsMap map[string]int
}

Expand Down Expand Up @@ -165,13 +163,16 @@ func (t Table) TableNames() []string {
}

// Call the table resolver with with all of it's relation for every reolved resource
func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, resolvedResources chan<- *Resource) (summary SyncSummary) {
func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, resourcesSem *semaphore.Weighted, resolvedResources chan<- *Resource) (summary SyncSummary) {
tableStartTime := time.Now()
meta.Logger().Info().Str("table", t.Name).Msg("table resolver started")

res := make(chan interface{})
startTime := time.Now()
wg := sync.WaitGroup{}
wg.Add(1)
go func(sum *SyncSummary) {
defer wg.Done()
defer func() {
if err := recover(); err != nil {
stack := fmt.Sprintf("%s\n%s", err, string(debug.Stack()))
Expand Down Expand Up @@ -199,9 +200,26 @@ func (t Table) Resolve(ctx context.Context, meta ClientMeta, parent *Resource, r
continue
}
for i := range objects {
summary.Merge(t.resolveObject(ctx, meta, parent, objects[i], resolvedResources))
i := i

// right now we support concurrency only for objects/resources of parent tables
if resourcesSem == nil {
summary.Merge(t.resolveObject(ctx, meta, parent, objects[i], resolvedResources))
} else {
if err := resourcesSem.Acquire(ctx, 1); err != nil {
meta.Logger().Error().Err(err).Msg("failed to acquire semaphore")
return summary
}
wg.Add(1)
go func() {
defer wg.Done()
//nolint:all
summary.Merge(t.resolveObject(ctx, meta, parent, objects[i], resolvedResources))
}()
}
}
}
wg.Wait()
meta.Logger().Info().Str("table", t.Name).Int("total_resources", tableResources).TimeDiff("duration", time.Now(), tableStartTime).Msg("fetch table finished")

return summary
Expand Down Expand Up @@ -270,7 +288,7 @@ func (t Table) resolveObject(ctx context.Context, meta ClientMeta, parent *Resou
resolvedResources <- resource

for _, rel := range t.Relations {
summary.Merge(rel.Resolve(ctx, meta, resource, resolvedResources))
summary.Merge(rel.Resolve(ctx, meta, resource, nil, resolvedResources))
}

return summary
Expand Down
2 changes: 1 addition & 1 deletion schema/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func TestTableExecution(t *testing.T) {
var summary SyncSummary
go func() {
defer close(resources)
summary = tc.Table.Resolve(ctx, m, nil, resources)
summary = tc.Table.Resolve(ctx, m, nil, nil, resources)
}()
var i = uint64(0)
for resource := range resources {
Expand Down
11 changes: 6 additions & 5 deletions serve/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,12 @@ func TestServeSource(t *testing.T) {
resources := make(chan []byte, 1)
if err := c.Sync(ctx,
specs.Source{
Name: "testSourcePlugin",
Version: "v1.0.0",
Registry: specs.RegistryGithub,
Tables: []string{"*"},
Spec: TestSourcePluginSpec{Accounts: []string{"cloudquery/plugin-sdk"}},
Name: "testSourcePlugin",
Version: "v1.0.0",
Registry: specs.RegistryGithub,
Tables: []string{"*"},
Spec: TestSourcePluginSpec{Accounts: []string{"cloudquery/plugin-sdk"}},
Destinations: []string{"test"},
},
resources); err != nil {
t.Fatal(err)
Expand Down
17 changes: 15 additions & 2 deletions specs/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
"strings"
)

const (
defaultTableConcurrency = 500000
defaultResourceConcurrency = 500000
)

// Source is the spec for a source plugin
type Source struct {
// Name of the source plugin to use
Expand All @@ -20,8 +25,9 @@ type Source struct {
// For the gRPC registry the path will be the address of the gRPC server: host:port
Path string `json:"path,omitempty"`
// Registry can be github,local,grpc.
Registry Registry `json:"registry,omitempty"`
Concurrency uint64 `json:"concurrency,omitempty"`
Registry Registry `json:"registry,omitempty"`
TableConcurrency uint64 `json:"table_concurrency,omitempty"`
ResourceConcurrency uint64 `json:"resource_concurrency,omitempty"`
// Tables to sync from the source plugin
Tables []string `json:"tables,omitempty"`
// SkipTables defines tables to skip when syncing data. Useful if a glob pattern is used in Tables
Expand All @@ -46,6 +52,13 @@ func (s *Source) SetDefaults() {
if s.Tables == nil {
s.Tables = []string{"*"}
}

if s.TableConcurrency == 0 {
s.TableConcurrency = defaultTableConcurrency
}
if s.ResourceConcurrency == 0 {
s.ResourceConcurrency = defaultResourceConcurrency
}
}

// UnmarshalSpec unmarshals the internal spec into the given interface
Expand Down
14 changes: 8 additions & 6 deletions specs/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,14 @@ spec:
`,
"",
&Source{
Name: "test",
Registry: RegistryGithub,
Path: "cloudquery/test",
Version: "v1.1.0",
Tables: []string{"*"},
Destinations: []string{"test"},
Name: "test",
Registry: RegistryGithub,
Path: "cloudquery/test",
TableConcurrency: defaultTableConcurrency,
ResourceConcurrency: defaultResourceConcurrency,
Version: "v1.1.0",
Tables: []string{"*"},
Destinations: []string{"test"},
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion specs/testdata/dir/aws.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ kind: source
spec:
name: aws
version: v1.0.0
concurrency: 10
table_concurrency: 10
registry: local
destinations: [postgresql]
2 changes: 1 addition & 1 deletion specs/testdata/gcp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ kind: source
spec:
name: gcp
version: v1.0.0
concurrency: 10
table_concurrency: 10
registry: local
destinations: [postgresqlv2]
---
Expand Down