Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 60 additions & 33 deletions plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type Config struct {
ServiceAccountBase64 string `mapstructure:"service_account_base64"`
ServiceAccountJSON string `mapstructure:"service_account_json"`
MaxPageSize int `mapstructure:"max_page_size"`
DatasetPageSize int `mapstructure:"dataset_page_size"`
TablePageSize int `mapstructure:"table_page_size"`
TablePattern string `mapstructure:"table_pattern"`
Exclude Exclude `mapstructure:"exclude"`
IncludeColumnProfile bool `mapstructure:"include_column_profile"`
Expand Down Expand Up @@ -164,22 +166,28 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {

// Extract checks if the table is valid and extracts the table schema
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
pageSize := pickFirstNonZero(e.config.DatasetPageSize, e.config.MaxPageSize, 10)

// Fetch and iterate over datasets
it := e.client.Datasets(ctx)
it.PageInfo().MaxSize = e.getMaxPageSize()
pager := iterator.NewPager(e.client.Datasets(ctx), pageSize, "")
for {
ds, err := it.Next()
if errors.Is(err, iterator.Done) {
break
}
var datasets []*bigquery.Dataset
nextToken, err := pager.NextPage(&datasets)
if err != nil {
return fmt.Errorf("fetch dataset: %w", err)
}
if IsExcludedDataset(ds.DatasetID, e.config.Exclude.Datasets) {
e.logger.Debug("excluding dataset from bigquery extract", "dataset_id", ds.DatasetID)
continue

for _, ds := range datasets {
if IsExcludedDataset(ds.DatasetID, e.config.Exclude.Datasets) {
e.logger.Debug("excluding dataset from bigquery extract", "dataset_id", ds.DatasetID)
continue
}
e.extractTable(ctx, ds, emit)
}

if nextToken == "" {
break
}
e.extractTable(ctx, ds, emit)
}

return nil
Expand Down Expand Up @@ -215,38 +223,48 @@ func (e *Extractor) createPolicyTagClient(ctx context.Context) (*datacatalog.Pol

// Create big query client
func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit plugins.Emit) {
tb := ds.Tables(ctx)
tb.PageInfo().MaxSize = e.getMaxPageSize()
pageSize := pickFirstNonZero(e.config.TablePageSize, e.config.MaxPageSize, 50)

pager := iterator.NewPager(ds.Tables(ctx), pageSize, "")
for {
table, err := tb.Next()
if errors.Is(err, iterator.Done) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
break
} else if err != nil {
e.logger.Error("failed to get table, skipping table", "err", err)
continue
}
var tables []*bigquery.Table
nextToken, err := pager.NextPage(&tables)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
break
}

if IsExcludedTable(ds.DatasetID, table.TableID, e.config.Exclude.Tables) {
e.logger.Debug("excluding table from bigquery extract", "dataset_id", ds.DatasetID, "table_id", table.TableID)
e.logger.Error("failed to get page of tables, skipping page", "err", err)
continue
}

tableFQN := table.FullyQualifiedName()
for _, table := range tables {
if IsExcludedTable(ds.DatasetID, table.TableID, e.config.Exclude.Tables) {
e.logger.Debug("excluding table from bigquery extract", "dataset_id", ds.DatasetID, "table_id", table.TableID)
continue
}

e.logger.Debug("extracting table", "table", tableFQN)
tmd, err := table.Metadata(ctx)
if err != nil {
e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN)
continue
}
tableFQN := table.FullyQualifiedName()

asset, err := e.buildAsset(ctx, table, tmd)
if err != nil {
e.logger.Error("failed to build asset", "err", err, "table", tableFQN)
continue
e.logger.Debug("extracting table", "table", tableFQN)
tmd, err := table.Metadata(ctx)
if err != nil {
e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN)
continue
}

asset, err := e.buildAsset(ctx, table, tmd)
if err != nil {
e.logger.Error("failed to build asset", "err", err, "table", tableFQN)
continue
}

emit(models.NewRecord(asset))
}

emit(models.NewRecord(asset))
if nextToken == "" {
break
}
}
}

Expand Down Expand Up @@ -634,3 +652,12 @@ func seededRandom(seed int64) func(max int64) int64 {
return rnd.Int63n(max)
}
}

func pickFirstNonZero(ints ...int) int {
for _, intItem := range ints {
if intItem != 0 {
return intItem
}
}
return 0
}