Skip to content
Permalink
Browse files
docs(samples): Add minimal sample to show Write API in pending mode (#…
…322)

This sample is a stripped down version of the bigquerystorage_append_rows_raw_proto2 sample, for embedding in the Write API documentation. The docs would then link to the longer sample which shows how to format all of the datatypes including STRUCT types.

btw I registered a new region tag for this snippet
  • Loading branch information
VeronicaWasson committed Oct 5, 2021
1 parent b0465d1 commit db5146980bd1a358413c56f6e090c07277bfac26
@@ -0,0 +1,134 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START bigquerystorage_append_rows_pending]
"""
This code sample demonstrates how to write records in pending mode
using the low-level generated client for Python.
"""

from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2

# If you update the customer_record.proto protocol buffer definition, run:
#
# protoc --python_out=. customer_record.proto
#
# from the samples/snippets directory to generate the customer_record_pb2.py module.
from . import customer_record_pb2


def create_row_data(row_num: int, name: str):
row = customer_record_pb2.CustomerRecord()
row.row_num = row_num
row.customer_name = name
return row.SerializeToString()


def append_rows_pending(project_id: str, dataset_id: str, table_id: str):

"""Create a write stream, write some sample data, and commit the stream."""
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
write_stream = types.WriteStream()

# When creating the stream, choose the type. Use the PENDING type to wait
# until the stream is committed before it is visible. See:
# https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.WriteStream.Type
write_stream.type_ = types.WriteStream.Type.PENDING
write_stream = write_client.create_write_stream(
parent=parent, write_stream=write_stream
)
stream_name = write_stream.name

# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()

# The initial request must contain the stream name.
request_template.write_stream = stream_name

# So that BigQuery knows how to parse the serialized_rows, generate a
# protocol buffer representation of your message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
customer_record_pb2.CustomerRecord.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data

# Some stream types support an unbounded number of requests. Construct an
# AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)

# Create a batch of row data by appending proto2 serialized bytes to the
# serialized_rows repeated field.
proto_rows = types.ProtoRows()
proto_rows.serialized_rows.append(create_row_data(1, "Alice"))
proto_rows.serialized_rows.append(create_row_data(2, "Bob"))

# Set an offset to allow resuming this stream if the connection breaks.
# Keep track of which requests the server has acknowledged and resume the
# stream at the first non-acknowledged message. If the server has already
# processed a message with that offset, it will return an ALREADY_EXISTS
# error, which can be safely ignored.
#
# The first request must always have an offset of 0.
request = types.AppendRowsRequest()
request.offset = 0
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data

response_future_1 = append_rows_stream.send(request)

# Send another batch.
proto_rows = types.ProtoRows()
proto_rows.serialized_rows.append(create_row_data(3, "Charles"))

# Since this is the second request, you only need to include the row data.
# The name of the stream and protocol buffers DESCRIPTOR is only needed in
# the first request.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data

# Offset must equal the number of rows that were previously sent.
request.offset = 2

response_future_2 = append_rows_stream.send(request)

print(response_future_1.result())
print(response_future_2.result())

# Shutdown background threads and close the streaming connection.
append_rows_stream.close()

# A PENDING type stream must be "finalized" before being committed. No new
# records can be written to the stream after this method has been called.
write_client.finalize_write_stream(name=write_stream.name)

# Commit the stream you created earlier.
batch_commit_write_streams_request = types.BatchCommitWriteStreamsRequest()
batch_commit_write_streams_request.parent = parent
batch_commit_write_streams_request.write_streams = [write_stream.name]
write_client.batch_commit_write_streams(batch_commit_write_streams_request)

print(f"Writes to stream: '{write_stream.name}' have been committed.")


# [END bigquerystorage_append_rows_pending]
@@ -0,0 +1,73 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pathlib
import random

from google.cloud import bigquery
import pytest

from . import append_rows_pending


DIR = pathlib.Path(__file__).parent


regions = ["US", "non-US"]


@pytest.fixture(params=regions)
def sample_data_table(
request: pytest.FixtureRequest,
bigquery_client: bigquery.Client,
project_id: str,
dataset_id: str,
dataset_id_non_us: str,
) -> str:
dataset = dataset_id
if request.param != "US":
dataset = dataset_id_non_us
schema = bigquery_client.schema_from_json(str(DIR / "customer_record_schema.json"))
table_id = f"append_rows_proto2_{random.randrange(10000)}"
full_table_id = f"{project_id}.{dataset}.{table_id}"
table = bigquery.Table(full_table_id, schema=schema)
table = bigquery_client.create_table(table, exists_ok=True)
yield full_table_id
bigquery_client.delete_table(table, not_found_ok=True)


def test_append_rows_pending(
capsys: pytest.CaptureFixture,
bigquery_client: bigquery.Client,
sample_data_table: str,
):
project_id, dataset_id, table_id = sample_data_table.split(".")
append_rows_pending.append_rows_pending(
project_id=project_id, dataset_id=dataset_id, table_id=table_id
)
out, _ = capsys.readouterr()
assert "have been committed" in out

rows = bigquery_client.query(
f"SELECT * FROM `{project_id}.{dataset_id}.{table_id}`"
).result()
row_items = [
# Convert to sorted tuple of items to more easily search for expected rows.
tuple(sorted(row.items()))
for row in rows
]

assert (("customer_name", "Alice"), ("row_num", 1)) in row_items
assert (("customer_name", "Bob"), ("row_num", 2)) in row_items
assert (("customer_name", "Charles"), ("row_num", 3)) in row_items
@@ -0,0 +1,28 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// The BigQuery Storage API expects protocol buffer data to be encoded in the
// proto2 wire format. This allows it to disambiguate missing optional fields
// from default values without the need for wrapper types.
syntax = "proto2";

// Define a message type representing the rows in your table. The message
// cannot contain fields which are not present in the table.
message CustomerRecord {

optional string customer_name = 1;

// Use the required keyword for client-side validation of required fields.
required int64 row_num = 2;
}

Some generated files are not rendered by default. Learn more.

@@ -0,0 +1,11 @@
[
{
"name": "customer_name",
"type": "STRING"
},
{
"name": "row_num",
"type": "INTEGER",
"mode": "REQUIRED"
}
]

0 comments on commit db51469

Please sign in to comment.