diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go index 8aadccc0dd67..cedd4fafbbdd 100644 --- a/bigquery/integration_test.go +++ b/bigquery/integration_test.go @@ -30,6 +30,7 @@ import ( "time" "cloud.google.com/go/civil" + datacatalog "cloud.google.com/go/datacatalog/apiv1" "cloud.google.com/go/httpreplay" "cloud.google.com/go/iam" "cloud.google.com/go/internal" @@ -43,6 +44,7 @@ import ( "google.golang.org/api/googleapi" "google.golang.org/api/iterator" "google.golang.org/api/option" + datacatalogpb "google.golang.org/genproto/googleapis/cloud/datacatalog/v1" ) const replayFilename = "bigquery.replay" @@ -50,10 +52,11 @@ const replayFilename = "bigquery.replay" var record = flag.Bool("record", false, "record RPCs") var ( - client *Client - storageClient *storage.Client - dataset *Dataset - schema = Schema{ + client *Client + storageClient *storage.Client + policyTagManagerClient *datacatalog.PolicyTagManagerClient + dataset *Dataset + schema = Schema{ {Name: "name", Type: StringFieldType}, {Name: "nums", Type: IntegerFieldType, Repeated: true}, {Name: "rec", Type: RecordFieldType, Schema: Schema{ @@ -119,6 +122,10 @@ func initIntegrationTest() func() { if err != nil { log.Fatal(err) } + policyTagManagerClient, err = datacatalog.NewPolicyTagManagerClient(ctx) + if err != nil { + log.Fatal(err) + } cleanup := initTestState(client, t) return func() { cleanup() @@ -142,6 +149,7 @@ func initIntegrationTest() func() { } bqOpts := []option.ClientOption{option.WithTokenSource(ts)} sOpts := []option.ClientOption{option.WithTokenSource(testutil.TokenSource(ctx, storage.ScopeFullControl))} + ptmOpts := []option.ClientOption{option.WithTokenSource(testutil.TokenSource(ctx, "https://www.googleapis.com/auth/cloud-platform"))} cleanup := func() {} now := time.Now().UTC() if *record { @@ -179,6 +187,7 @@ func initIntegrationTest() func() { // incompatible with gRPC options. bqOpts = append(bqOpts, grpcHeadersChecker.CallOptions()...) sOpts = append(sOpts, grpcHeadersChecker.CallOptions()...) + ptmOpts = append(ptmOpts, grpcHeadersChecker.CallOptions()...) } var err error client, err = NewClient(ctx, projID, bqOpts...) @@ -189,6 +198,7 @@ func initIntegrationTest() func() { if err != nil { log.Fatalf("storage.NewClient: %v", err) } + policyTagManagerClient, err = datacatalog.NewPolicyTagManagerClient(ctx, ptmOpts...) c := initTestState(client, now) return func() { c(); cleanup() } } @@ -889,6 +899,88 @@ func TestIntegration_Tables(t *testing.T) { } } +// setupPolicyTag is a helper for setting up policy tags in the datacatalog service. +// +// It returns a string for a policy tag identifier and a cleanup function, or an error. +func setupPolicyTag(ctx context.Context) (string, func(), error) { + location := "us" + req := &datacatalogpb.CreateTaxonomyRequest{ + Parent: fmt.Sprintf("projects/%s/locations/%s", testutil.ProjID(), location), + Taxonomy: &datacatalogpb.Taxonomy{ + DisplayName: "google-cloud-go bigquery testing taxonomy", + Description: "Taxonomy created for google-cloud-go integration tests", + ActivatedPolicyTypes: []datacatalogpb.Taxonomy_PolicyType{ + datacatalogpb.Taxonomy_FINE_GRAINED_ACCESS_CONTROL, + }, + }, + } + resp, err := policyTagManagerClient.CreateTaxonomy(ctx, req) + if err != nil { + return "", nil, fmt.Errorf("datacatalog.CreateTaxonomy: %v", err) + } + taxonomyID := resp.GetName() + cleanupFunc := func() { + policyTagManagerClient.DeleteTaxonomy(ctx, &datacatalogpb.DeleteTaxonomyRequest{ + Name: taxonomyID, + }) + } + + tagReq := &datacatalogpb.CreatePolicyTagRequest{ + Parent: resp.GetName(), + PolicyTag: &datacatalogpb.PolicyTag{ + DisplayName: "ExamplePolicyTag", + }, + } + tagResp, err := policyTagManagerClient.CreatePolicyTag(ctx, tagReq) + if err != nil { + // we're failed to create tags, but we did create taxonomy. clean it up and signal error. + cleanupFunc() + return "", nil, fmt.Errorf("datacatalog.CreatePolicyTag: %v", err) + } + return tagResp.GetName(), cleanupFunc, nil +} + +func TestIntegration_ColumnACLs(t *testing.T) { + if client == nil { + t.Skip("Integration tests skipped") + } + ctx := context.Background() + testSchema := Schema{ + {Name: "name", Type: StringFieldType}, + {Name: "ssn", Type: StringFieldType}, + {Name: "acct_balance", Type: NumericFieldType}, + } + table := newTable(t, testSchema) + defer table.Delete(ctx) + + tagID, cleanupFunc, err := setupPolicyTag(ctx) + if err != nil { + t.Fatalf("failed to setup policy tag resources: %v", err) + } + defer cleanupFunc() + // amend the test schema to add a policy tag + testSchema[1].PolicyTags = &PolicyTagList{ + Names: []string{tagID}, + } + + // Test: Amend an existing schema with a policy tag. + _, err = table.Update(ctx, TableMetadataToUpdate{ + Schema: testSchema, + }, "") + if err != nil { + t.Errorf("update with policyTag failed: %v", err) + } + + // Test: Create a new table with a policy tag defined. + newTable := dataset.Table(tableIDs.New()) + if err = newTable.Create(ctx, &TableMetadata{ + Schema: schema, + Description: "foo", + }); err != nil { + t.Errorf("failed to create new table with policy tag: %v", err) + } +} + func TestIntegration_TableIAM(t *testing.T) { if client == nil { t.Skip("Integration tests skipped")