Skip to content

Commit

Permalink
Add support for creating bigquery jobs in a different project
Browse files Browse the repository at this point in the history
This is an initial commit to show one of the way of implementing it.
  • Loading branch information
luca.rovinetti committed Apr 16, 2024
1 parent 71ec64b commit 12da668
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
3 changes: 1 addition & 2 deletions internal/impl/gcp/bigquery.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package gcp

import (
"cloud.google.com/go/bigquery"
"context"
"fmt"

"cloud.google.com/go/bigquery"
"github.com/Masterminds/squirrel"
"go.uber.org/multierr"

Expand Down
20 changes: 14 additions & 6 deletions internal/impl/gcp/output_bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func gcpBigQueryCSVConfigFromParsed(conf *service.ParsedConfig) (csvconf gcpBigQ
}

type gcpBigQueryOutputConfig struct {
JobProjectID string
ProjectID string
DatasetID string
TableID string
Expand All @@ -71,6 +72,12 @@ func gcpBigQueryOutputConfigFromParsed(conf *service.ParsedConfig) (gconf gcpBig
if gconf.ProjectID == "" {
gconf.ProjectID = bigquery.DetectProjectID
}
if gconf.JobProjectID, err = conf.FieldString("job_project"); err != nil {
return
}
if gconf.JobProjectID == "" {
gconf.ProjectID = gconf.ProjectID
}
if gconf.DatasetID, err = conf.FieldString("dataset"); err != nil {
return
}
Expand Down Expand Up @@ -106,11 +113,11 @@ func gcpBigQueryOutputConfigFromParsed(conf *service.ParsedConfig) (gconf gcpBig

type gcpBQClientURL string

func (g gcpBQClientURL) NewClient(ctx context.Context, projectID string) (*bigquery.Client, error) {
func (g gcpBQClientURL) NewClient(ctx context.Context, JobProjectID string) (*bigquery.Client, error) {
if g == "" {
return bigquery.NewClient(ctx, projectID)
return bigquery.NewClient(ctx, JobProjectID)
}
return bigquery.NewClient(ctx, projectID, option.WithoutAuthentication(), option.WithEndpoint(string(g)))
return bigquery.NewClient(ctx, JobProjectID, option.WithoutAuthentication(), option.WithEndpoint(string(g)))
}

func gcpBigQueryConfig() *service.ConfigSpec {
Expand Down Expand Up @@ -155,6 +162,7 @@ The same is true for the CSV format.
For the CSV format when the field `+"`csv.header`"+` is specified a header row will be inserted as the first line of each message batch. If this field is not provided then the first message of each message batch must include a header line.`)).
Field(service.NewStringField("project").Description("The project ID of the dataset to insert data to. If not set, it will be inferred from the credentials or read from the GOOGLE_CLOUD_PROJECT environment variable.").Default("")).
Field(service.NewStringField("job_project").Description("The project ID in which jobs will be exectuted. If not set, project will be used.").Default("")).
Field(service.NewStringField("dataset").Description("The BigQuery Dataset ID.")).
Field(service.NewStringField("table").Description("The table to insert messages to.")).
Field(service.NewStringEnumField("format", string(bigquery.JSON), string(bigquery.CSV)).
Expand Down Expand Up @@ -302,7 +310,7 @@ func (g *gcpBigQueryOutput) Connect(ctx context.Context) (err error) {
defer g.connMut.Unlock()

var client *bigquery.Client
if client, err = g.clientURL.NewClient(context.Background(), g.conf.ProjectID); err != nil {
if client, err = g.clientURL.NewClient(context.Background(), g.conf.JobProjectID); err != nil {
err = fmt.Errorf("error creating big query client: %w", err)
return
}
Expand All @@ -312,7 +320,7 @@ func (g *gcpBigQueryOutput) Connect(ctx context.Context) (err error) {
}
}()

dataset := client.DatasetInProject(client.Project(), g.conf.DatasetID)
dataset := client.DatasetInProject(g.conf.ProjectID, g.conf.DatasetID)
if _, err = dataset.Metadata(ctx); err != nil {
if hasStatusCode(err, http.StatusNotFound) {
err = fmt.Errorf("dataset does not exist: %v", g.conf.DatasetID)
Expand Down Expand Up @@ -385,7 +393,7 @@ func (g *gcpBigQueryOutput) WriteBatch(ctx context.Context, batch service.Messag
}

func (g *gcpBigQueryOutput) createTableLoader(data *[]byte) *bigquery.Loader {
table := g.client.DatasetInProject(g.client.Project(), g.conf.DatasetID).Table(g.conf.TableID)
table := g.client.DatasetInProject(g.conf.JobProjectID, g.conf.DatasetID).Table(g.conf.TableID)

source := bigquery.NewReaderSource(bytes.NewReader(*data))
source.SourceFormat = bigquery.DataFormat(g.conf.Format)
Expand Down

0 comments on commit 12da668

Please sign in to comment.