Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 43 additions & 9 deletions plugins/extractors/bigquery/auditlog/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ import (
"fmt"
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/logging/logadmin"
"github.com/goto/meteor/plugins"
"github.com/goto/salt/log"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
auditpb "google.golang.org/genproto/googleapis/cloud/audit"
Expand All @@ -23,20 +28,34 @@ type Config struct {
UsageProjectIDs []string
}

const advancedFilterTemplate = `protoPayload.methodName="jobservice.jobcompleted" AND ` +
`resource.type="bigquery_resource" AND NOT ` +
`protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.query.query:(INFORMATION_SCHEMA OR __TABLES__) AND ` +
`timestamp >= "%s" AND timestamp < "%s" AND %s`
const (
advancedFilterTemplate = `protoPayload.methodName="jobservice.jobcompleted" AND ` +
`resource.type="bigquery_resource" AND NOT ` +
`protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.query.query:(INFORMATION_SCHEMA OR __TABLES__) AND ` +
`timestamp >= "%s" AND timestamp < "%s" AND %s`

metricTableDurn = "meteor.bq.client.table.duration"
)

type AuditLog struct {
logger log.Logger
client *logadmin.Client
config Config

histogram metric.Int64Histogram
}

func New(logger log.Logger) *AuditLog {
h, err := otel.Meter("github.com/goto/meteor/plugins/extractors/bigquery").
Int64Histogram(metricTableDurn, metric.WithUnit("ms"))
if err != nil {
otel.Handle(err)
}

return &AuditLog{
logger: logger,

histogram: h,
}
}

Expand Down Expand Up @@ -69,20 +88,35 @@ func (l *AuditLog) createClient(ctx context.Context) (*logadmin.Client, error) {
return logadmin.NewClient(ctx, l.config.ProjectID, option.WithCredentialsJSON([]byte(l.config.ServiceAccountJSON)))
}

func (l *AuditLog) Collect(ctx context.Context, tableID string) (*TableStats, error) {
func (l *AuditLog) Collect(ctx context.Context, tbl *bigquery.Table) (stats *TableStats, err error) {
defer func(start time.Time) {
attrs := []attribute.KeyValue{
attribute.String("bq.operation", "table.audit_logs"),
attribute.String("bq.project_id", tbl.ProjectID),
attribute.String("bq.dataset_id", tbl.DatasetID),
}
if err != nil {
attrs = append(attrs, attribute.String("bq.error_code", plugins.BQErrReason(err)))
}

l.histogram.Record(
ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attrs...),
)
}(time.Now())

if l.client == nil {
return nil, errors.New("auditlog client is nil")
}

filter := l.buildFilter(tableID)
filter := l.buildFilter(tbl.TableID)
it := l.client.Entries(ctx,
logadmin.ProjectIDs(l.config.UsageProjectIDs),
logadmin.Filter(filter))

l.logger.Info("getting logs in these projects", "projects", l.config.UsageProjectIDs)
l.logger.Info("getting logs with the filter", "filter", filter)

tableStats := NewTableStats()
stats = NewTableStats()
for {
entry, err := it.Next()
if errors.Is(err, iterator.Done) {
Expand All @@ -98,12 +132,12 @@ func (l *AuditLog) Collect(ctx context.Context, tableID string) (*TableStats, er
continue
}

if errF := tableStats.Populate(logData); errF != nil {
if errF := stats.Populate(logData); errF != nil {
l.logger.Warn("error populating logdata", "err", errF)
continue
}
}
return tableStats, nil
return stats, nil
}

func (l *AuditLog) buildFilter(tableID string) string {
Expand Down
131 changes: 122 additions & 9 deletions plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"math/rand"
"strings"
"sync"
"time"

"cloud.google.com/go/bigquery"
datacatalog "cloud.google.com/go/datacatalog/apiv1"
Expand All @@ -22,6 +23,9 @@ import (
"github.com/goto/meteor/utils"
"github.com/goto/salt/log"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/protobuf/types/known/anypb"
Expand Down Expand Up @@ -60,6 +64,12 @@ type Exclude struct {

const (
maxPageSizeDefault = 100

metricDatasetsDurn = "meteor.bq.client.datasets.duration"
metricTablesDurn = "meteor.bq.client.tables.duration"
metricTableDurn = "meteor.bq.client.table.duration"
metricExcludedDatasetCtr = "meteor.bq.dataset.excluded"
metricExcludedTableCtr = "meteor.bq.table.excluded"
)

var sampleConfig = `
Expand Down Expand Up @@ -108,20 +118,49 @@ type Extractor struct {
policyTagClient *datacatalog.PolicyTagManagerClient
newClient NewClientFunc
randFn randFn

datasetsDurn metric.Int64Histogram
tablesDurn metric.Int64Histogram
tableDurn metric.Int64Histogram
excludedDatasetCtr metric.Int64Counter
excludedTableCtr metric.Int64Counter
}

type randFn func(rndSeed int64) func(int64) int64

type NewClientFunc func(ctx context.Context, logger log.Logger, config *Config) (*bigquery.Client, error)

func New(logger log.Logger, newClient NewClientFunc, randFn randFn) *Extractor {
meter := otel.Meter("github.com/goto/meteor/plugins/extractors/bigquery")

datasetsDurn, err := meter.Int64Histogram(metricDatasetsDurn, metric.WithUnit("ms"))
handleOtelErr(err)

tablesDurn, err := meter.Int64Histogram(metricTablesDurn, metric.WithUnit("ms"))
handleOtelErr(err)

tableDurn, err := meter.Int64Histogram(metricTableDurn, metric.WithUnit("ms"))
handleOtelErr(err)

excludedDatasetCtr, err := meter.Int64Counter(metricExcludedDatasetCtr)
handleOtelErr(err)

excludedTableCtr, err := meter.Int64Counter(metricExcludedTableCtr)
handleOtelErr(err)

galc := auditlog.New(logger)

e := &Extractor{
logger: logger,
galClient: galc,
newClient: newClient,
randFn: randFn,

datasetsDurn: datasetsDurn,
tablesDurn: tablesDurn,
tableDurn: tableDurn,
excludedDatasetCtr: excludedDatasetCtr,
excludedTableCtr: excludedTableCtr,
}
e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config)
e.ScopeNotRequired = true
Expand Down Expand Up @@ -171,28 +210,49 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
// Fetch and iterate over datasets
pager := iterator.NewPager(e.client.Datasets(ctx), pageSize, "")
for {
var datasets []*bigquery.Dataset
nextToken, err := pager.NextPage(&datasets)
datasets, hasNext, err := e.fetchDatasetsNextPage(ctx, pager)
if err != nil {
return fmt.Errorf("fetch dataset: %w", err)
return err
}

for _, ds := range datasets {
if IsExcludedDataset(ds.DatasetID, e.config.Exclude.Datasets) {
e.excludedDatasetCtr.Add(
ctx, 1, metric.WithAttributes(attribute.String("bq.project_id", e.config.ProjectID)),
)
e.logger.Debug("excluding dataset from bigquery extract", "dataset_id", ds.DatasetID)
continue
}
e.extractTable(ctx, ds, emit)
}

if nextToken == "" {
if !hasNext {
break
}
}

return nil
}

func (e *Extractor) fetchDatasetsNextPage(ctx context.Context, pager *iterator.Pager) (datasets []*bigquery.Dataset, hasNext bool, err error) {
defer func(start time.Time) {
attrs := []attribute.KeyValue{attribute.String("bq.project_id", e.config.ProjectID)}
if err != nil {
attrs = append(attrs, attribute.String("bq.error_code", plugins.BQErrReason(err)))
}
e.datasetsDurn.Record(
ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attrs...),
)
}(time.Now())

nextToken, err := pager.NextPage(&datasets)
if err != nil {
return nil, false, fmt.Errorf("fetch dataset: %w", err)
}

return datasets, nextToken != "", nil
}

// CreateClient creates a bigquery client
func CreateClient(ctx context.Context, logger log.Logger, config *Config) (*bigquery.Client, error) {
if config.ServiceAccountBase64 == "" && config.ServiceAccountJSON == "" {
Expand Down Expand Up @@ -227,8 +287,7 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit

pager := iterator.NewPager(ds.Tables(ctx), pageSize, "")
for {
var tables []*bigquery.Table
nextToken, err := pager.NextPage(&tables)
tables, hasNext, err := e.fetchTablesNextPage(ctx, ds.DatasetID, pager)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
break
Expand All @@ -240,14 +299,18 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit

for _, table := range tables {
if IsExcludedTable(ds.DatasetID, table.TableID, e.config.Exclude.Tables) {
e.excludedTableCtr.Add(ctx, 1, metric.WithAttributes(
attribute.String("bq.project_id", e.config.ProjectID),
attribute.String("bq.dataset_id", ds.DatasetID),
))
e.logger.Debug("excluding table from bigquery extract", "dataset_id", ds.DatasetID, "table_id", table.TableID)
continue
}

tableFQN := table.FullyQualifiedName()

e.logger.Debug("extracting table", "table", tableFQN)
tmd, err := table.Metadata(ctx)
tmd, err := e.fetchTableMetadata(ctx, table)
if err != nil {
e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN)
continue
Expand All @@ -262,19 +325,44 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit
emit(models.NewRecord(asset))
}

if nextToken == "" {
if !hasNext {
break
}
}
}

func (e *Extractor) fetchTablesNextPage(
ctx context.Context, datasetID string, pager *iterator.Pager,
) (tables []*bigquery.Table, hasNext bool, err error) {
defer func(start time.Time) {
attrs := []attribute.KeyValue{
attribute.String("bq.project_id", e.config.ProjectID),
attribute.String("bq.dataset_id", datasetID),
}
if err != nil {
attrs = append(attrs, attribute.String("bq.error_code", plugins.BQErrReason(err)))
}

e.tablesDurn.Record(
ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attrs...),
)
}(time.Now())

nextToken, err := pager.NextPage(&tables)
if err != nil {
return nil, false, err
}

return tables, nextToken != "", nil
}

// Build the bigquery table metadata
func (e *Extractor) buildAsset(ctx context.Context, t *bigquery.Table, md *bigquery.TableMetadata) (*v1beta2.Asset, error) {
var tableStats *auditlog.TableStats
if e.config.IsCollectTableUsage {
// Fetch and extract logs first to build a map
var errL error
tableStats, errL = e.galClient.Collect(ctx, t.TableID)
tableStats, errL = e.galClient.Collect(ctx, t)
if errL != nil {
e.logger.Warn("error populating table stats usage", "error", errL)
}
Expand Down Expand Up @@ -637,6 +725,25 @@ func (e *Extractor) getMaxPageSize() int {
return maxPageSizeDefault
}

func (e *Extractor) fetchTableMetadata(ctx context.Context, tbl *bigquery.Table) (md *bigquery.TableMetadata, err error) {
defer func(start time.Time) {
attrs := []attribute.KeyValue{
attribute.String("bq.operation", "table.metadata"),
attribute.String("bq.project_id", tbl.ProjectID),
attribute.String("bq.dataset_id", tbl.DatasetID),
}
if err != nil {
attrs = append(attrs, attribute.String("bq.error_code", plugins.BQErrReason(err)))
}

e.tableDurn.Record(
ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attrs...),
)
}(time.Now())

return tbl.Metadata(ctx)
}

// Register the extractor to catalog
func init() {
if err := registry.Extractors.Register("bigquery", func() plugins.Extractor {
Expand All @@ -661,3 +768,9 @@ func pickFirstNonZero(ints ...int) int {
}
return 0
}

func handleOtelErr(err error) {
if err != nil {
otel.Handle(err)
}
}
11 changes: 11 additions & 0 deletions plugins/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"

"github.com/go-playground/validator/v10"
"github.com/googleapis/gax-go/v2/apierror"
"github.com/goto/meteor/models"
"github.com/mcuadros/go-defaults"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -124,6 +125,16 @@ func DrainBody(resp *http.Response) {
_ = resp.Body.Close()
}

func BQErrReason(err error) string {
reason := "UNKNOWN"
var apiErr *apierror.APIError
if errors.As(err, &apiErr) {
reason = apiErr.Reason()
}

return reason
}

func parseBQTableFQN(fqn string) (projectID, datasetID, tableID string, err error) {
// fqn is the ID of the table in projectID:datasetID.tableID format.
if !strings.ContainsRune(fqn, ':') || strings.IndexRune(fqn, '.') < strings.IndexRune(fqn, ':') {
Expand Down