Skip to content

Commit

Permalink
chore(bigquery/storage/managedwriter): internal refactor (flow contro…
Browse files Browse the repository at this point in the history
…ller, ids) (#7104)

This PR adds a new internal mechanism to simplify duplicating flow controllers, and does some preliminary work to wire in a UUID-based ID for managed stream instances.  Neither is used elsewhere at this time, but lay the foundation for further refactoring.

Towards: https://togithub.com/googleapis/google-cloud-go/issues/7103
  • Loading branch information
shollyman committed Dec 13, 2022
1 parent bcc9fcd commit f2b1f1b
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 0 deletions.
10 changes: 10 additions & 0 deletions bigquery/storage/managedwriter/client.go
Expand Up @@ -24,6 +24,7 @@ import (
storage "cloud.google.com/go/bigquery/storage/apiv1"
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"cloud.google.com/go/internal/detect"
"github.com/google/uuid"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
"google.golang.org/grpc"
Expand All @@ -38,6 +39,8 @@ import (
// does not have the project ID encoded.
const DetectProjectID = "*detect-project-id*"

const managedstreamIDPrefix = "managedstream"

// Client is a managed BigQuery Storage write client scoped to a single project.
type Client struct {
rawClient *storage.BigQueryWriteClient
Expand Down Expand Up @@ -106,6 +109,7 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
ctx, cancel := context.WithCancel(ctx)

ms := &ManagedStream{
id: newUUID(managedstreamIDPrefix),
streamSettings: defaultStreamSettings(),
c: c,
ctx: ctx,
Expand Down Expand Up @@ -232,3 +236,9 @@ func TableParentFromStreamName(streamName string) string {
func TableParentFromParts(projectID, datasetID, tableID string) string {
return fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID)
}

// newUUID simplifies generating UUIDs for internal resources.
func newUUID(prefix string) string {
id := uuid.New()
return fmt.Sprintf("%s_%s", prefix, id.String())
}
11 changes: 11 additions & 0 deletions bigquery/storage/managedwriter/flow_controller.go
Expand Up @@ -50,6 +50,17 @@ func newFlowController(maxInserts, maxInsertBytes int) *flowController {
return fc
}

// copyFlowController is for creating a new flow controller based on
// settings from another. It does not copy flow state.
func copyFlowController(in *flowController) *flowController {
var maxInserts, maxBytes int
if in != nil {
maxInserts = in.maxInsertCount
maxBytes = in.maxInsertBytes
}
return newFlowController(maxInserts, maxBytes)
}

// acquire blocks until one insert of size bytes can proceed or ctx is done.
// It returns nil in the first case, or ctx.Err() in the second.
//
Expand Down
37 changes: 37 additions & 0 deletions bigquery/storage/managedwriter/flow_controller_test.go
Expand Up @@ -257,3 +257,40 @@ func TestFlowControllerUnboundedBytes(t *testing.T) {
t.Error("got true, wanted false")
}
}

func TestCopyFlowController(t *testing.T) {
testcases := []struct {
description string
in *flowController
wantMaxRequests int
wantMaxBytes int
}{
{
description: "nil source",
wantMaxRequests: 0,
wantMaxBytes: 0,
},
{
description: "no limit",
in: newFlowController(0, 0),
wantMaxRequests: 0,
wantMaxBytes: 0,
},
{
description: "bounded",
in: newFlowController(10, 1024),
wantMaxRequests: 10,
wantMaxBytes: 1024,
},
}

for _, tc := range testcases {
fc := copyFlowController(tc.in)
if fc.maxInsertBytes != tc.wantMaxBytes {
t.Errorf("%s: max bytes mismatch, got %d want %d ", tc.description, fc.maxInsertBytes, tc.wantMaxBytes)
}
if fc.maxInsertCount != tc.wantMaxRequests {
t.Errorf("%s: max requests mismatch, got %d want %d ", tc.description, fc.maxInsertBytes, tc.wantMaxBytes)
}
}
}
3 changes: 3 additions & 0 deletions bigquery/storage/managedwriter/integration_test.go
Expand Up @@ -187,6 +187,9 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
if ms.id == "" {
t.Errorf("managed stream is missing ID")
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

Expand Down
3 changes: 3 additions & 0 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -72,6 +72,9 @@ func streamTypeToEnum(t StreamType) storagepb.WriteStream_Type {

// ManagedStream is the abstraction over a single write stream.
type ManagedStream struct {
// Unique id for the managedstream instance.
id string

streamSettings *streamSettings
schemaDescriptor *descriptorpb.DescriptorProto
destinationTable string
Expand Down

0 comments on commit f2b1f1b

Please sign in to comment.