diff --git a/plugins/extractors/bigquery/bigquery.go b/plugins/extractors/bigquery/bigquery.go index b7e462df9..d9083a3a6 100644 --- a/plugins/extractors/bigquery/bigquery.go +++ b/plugins/extractors/bigquery/bigquery.go @@ -39,6 +39,8 @@ type Config struct { ServiceAccountBase64 string `mapstructure:"service_account_base64"` ServiceAccountJSON string `mapstructure:"service_account_json"` MaxPageSize int `mapstructure:"max_page_size"` + DatasetPageSize int `mapstructure:"dataset_page_size"` + TablePageSize int `mapstructure:"table_page_size"` TablePattern string `mapstructure:"table_pattern"` Exclude Exclude `mapstructure:"exclude"` IncludeColumnProfile bool `mapstructure:"include_column_profile"` @@ -164,22 +166,28 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error { // Extract checks if the table is valid and extracts the table schema func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error { + pageSize := pickFirstNonZero(e.config.DatasetPageSize, e.config.MaxPageSize, 10) + // Fetch and iterate over datasets - it := e.client.Datasets(ctx) - it.PageInfo().MaxSize = e.getMaxPageSize() + pager := iterator.NewPager(e.client.Datasets(ctx), pageSize, "") for { - ds, err := it.Next() - if errors.Is(err, iterator.Done) { - break - } + var datasets []*bigquery.Dataset + nextToken, err := pager.NextPage(&datasets) if err != nil { return fmt.Errorf("fetch dataset: %w", err) } - if IsExcludedDataset(ds.DatasetID, e.config.Exclude.Datasets) { - e.logger.Debug("excluding dataset from bigquery extract", "dataset_id", ds.DatasetID) - continue + + for _, ds := range datasets { + if IsExcludedDataset(ds.DatasetID, e.config.Exclude.Datasets) { + e.logger.Debug("excluding dataset from bigquery extract", "dataset_id", ds.DatasetID) + continue + } + e.extractTable(ctx, ds, emit) + } + + if nextToken == "" { + break } - e.extractTable(ctx, ds, emit) } return nil @@ -215,38 +223,48 @@ func (e *Extractor) createPolicyTagClient(ctx context.Context) (*datacatalog.Pol // Create big query client func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit plugins.Emit) { - tb := ds.Tables(ctx) - tb.PageInfo().MaxSize = e.getMaxPageSize() + pageSize := pickFirstNonZero(e.config.TablePageSize, e.config.MaxPageSize, 50) + + pager := iterator.NewPager(ds.Tables(ctx), pageSize, "") for { - table, err := tb.Next() - if errors.Is(err, iterator.Done) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - break - } else if err != nil { - e.logger.Error("failed to get table, skipping table", "err", err) - continue - } + var tables []*bigquery.Table + nextToken, err := pager.NextPage(&tables) + if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + break + } - if IsExcludedTable(ds.DatasetID, table.TableID, e.config.Exclude.Tables) { - e.logger.Debug("excluding table from bigquery extract", "dataset_id", ds.DatasetID, "table_id", table.TableID) + e.logger.Error("failed to get page of tables, skipping page", "err", err) continue } - tableFQN := table.FullyQualifiedName() + for _, table := range tables { + if IsExcludedTable(ds.DatasetID, table.TableID, e.config.Exclude.Tables) { + e.logger.Debug("excluding table from bigquery extract", "dataset_id", ds.DatasetID, "table_id", table.TableID) + continue + } - e.logger.Debug("extracting table", "table", tableFQN) - tmd, err := table.Metadata(ctx) - if err != nil { - e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN) - continue - } + tableFQN := table.FullyQualifiedName() - asset, err := e.buildAsset(ctx, table, tmd) - if err != nil { - e.logger.Error("failed to build asset", "err", err, "table", tableFQN) - continue + e.logger.Debug("extracting table", "table", tableFQN) + tmd, err := table.Metadata(ctx) + if err != nil { + e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN) + continue + } + + asset, err := e.buildAsset(ctx, table, tmd) + if err != nil { + e.logger.Error("failed to build asset", "err", err, "table", tableFQN) + continue + } + + emit(models.NewRecord(asset)) } - emit(models.NewRecord(asset)) + if nextToken == "" { + break + } } } @@ -634,3 +652,12 @@ func seededRandom(seed int64) func(max int64) int64 { return rnd.Int63n(max) } } + +func pickFirstNonZero(ints ...int) int { + for _, intItem := range ints { + if intItem != 0 { + return intItem + } + } + return 0 +}