Skip to content

Commit

Permalink
feat: add BatchWrite API (#1011)
Browse files Browse the repository at this point in the history
* feat: add BatchWrite API

PiperOrigin-RevId: 567412157

Source-Link: googleapis/googleapis@64fd42c

Source-Link: googleapis/googleapis-gen@9e53103
Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiOWU1MzEwM2ZmM2MwNmFmOTRlNTgzYWY3YmFhM2M3ZmNhZmU3ODMyMiJ9

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
Co-authored-by: Anthonios Partheniou <partheniou@google.com>
  • Loading branch information
3 people committed Oct 9, 2023
1 parent a9566ed commit d0e4ffc
Show file tree
Hide file tree
Showing 14 changed files with 1,491 additions and 0 deletions.
15 changes: 15 additions & 0 deletions google/cloud/spanner_v1/gapic_metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
"batch_create_sessions"
]
},
"BatchWrite": {
"methods": [
"batch_write"
]
},
"BeginTransaction": {
"methods": [
"begin_transaction"
Expand Down Expand Up @@ -95,6 +100,11 @@
"batch_create_sessions"
]
},
"BatchWrite": {
"methods": [
"batch_write"
]
},
"BeginTransaction": {
"methods": [
"begin_transaction"
Expand Down Expand Up @@ -175,6 +185,11 @@
"batch_create_sessions"
]
},
"BatchWrite": {
"methods": [
"batch_write"
]
},
"BeginTransaction": {
"methods": [
"begin_transaction"
Expand Down
137 changes: 137 additions & 0 deletions google/cloud/spanner_v1/services/spanner/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1973,6 +1973,143 @@ async def sample_partition_read():
# Done; return the response.
return response

def batch_write(
self,
request: Optional[Union[spanner.BatchWriteRequest, dict]] = None,
*,
session: Optional[str] = None,
mutation_groups: Optional[
MutableSequence[spanner.BatchWriteRequest.MutationGroup]
] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> Awaitable[AsyncIterable[spanner.BatchWriteResponse]]:
r"""Batches the supplied mutation groups in a collection
of efficient transactions. All mutations in a group are
committed atomically. However, mutations across groups
can be committed non-atomically in an unspecified order
and thus, they must be independent of each other.
Partial failure is possible, i.e., some groups may have
been committed successfully, while some may have failed.
The results of individual batches are streamed into the
response as the batches are applied.
BatchWrite requests are not replay protected, meaning
that each mutation group may be applied more than once.
Replays of non-idempotent mutations may have undesirable
effects. For example, replays of an insert mutation may
produce an already exists error or if you use generated
or commit timestamp-based keys, it may result in
additional rows being added to the mutation's table. We
recommend structuring your mutation groups to be
idempotent to avoid this issue.
.. code-block:: python
# This snippet has been automatically generated and should be regarded as a
# code template only.
# It will require modifications to work:
# - It may require correct/in-range values for request initialization.
# - It may require specifying regional endpoints when creating the service
# client as shown in:
# https://googleapis.dev/python/google-api-core/latest/client_options.html
from google.cloud import spanner_v1
async def sample_batch_write():
# Create a client
client = spanner_v1.SpannerAsyncClient()
# Initialize request argument(s)
mutation_groups = spanner_v1.MutationGroup()
mutation_groups.mutations.insert.table = "table_value"
request = spanner_v1.BatchWriteRequest(
session="session_value",
mutation_groups=mutation_groups,
)
# Make the request
stream = await client.batch_write(request=request)
# Handle the response
async for response in stream:
print(response)
Args:
request (Optional[Union[google.cloud.spanner_v1.types.BatchWriteRequest, dict]]):
The request object. The request for
[BatchWrite][google.spanner.v1.Spanner.BatchWrite].
session (:class:`str`):
Required. The session in which the
batch request is to be run.
This corresponds to the ``session`` field
on the ``request`` instance; if ``request`` is provided, this
should not be set.
mutation_groups (:class:`MutableSequence[google.cloud.spanner_v1.types.BatchWriteRequest.MutationGroup]`):
Required. The groups of mutations to
be applied.
This corresponds to the ``mutation_groups`` field
on the ``request`` instance; if ``request`` is provided, this
should not be set.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Returns:
AsyncIterable[google.cloud.spanner_v1.types.BatchWriteResponse]:
The result of applying a batch of
mutations.
"""
# Create or coerce a protobuf request object.
# Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
has_flattened_params = any([session, mutation_groups])
if request is not None and has_flattened_params:
raise ValueError(
"If the `request` argument is set, then none of "
"the individual field arguments should be set."
)

request = spanner.BatchWriteRequest(request)

# If we have keyword arguments corresponding to fields on the
# request, apply these.
if session is not None:
request.session = session
if mutation_groups:
request.mutation_groups.extend(mutation_groups)

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method_async.wrap_method(
self._client._transport.batch_write,
default_timeout=3600.0,
client_info=DEFAULT_CLIENT_INFO,
)

# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata) + (
gapic_v1.routing_header.to_grpc_metadata((("session", request.session),)),
)

# Send the request.
response = rpc(
request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

# Done; return the response.
return response

async def __aenter__(self) -> "SpannerAsyncClient":
return self

Expand Down
137 changes: 137 additions & 0 deletions google/cloud/spanner_v1/services/spanner/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2119,6 +2119,143 @@ def sample_partition_read():
# Done; return the response.
return response

def batch_write(
self,
request: Optional[Union[spanner.BatchWriteRequest, dict]] = None,
*,
session: Optional[str] = None,
mutation_groups: Optional[
MutableSequence[spanner.BatchWriteRequest.MutationGroup]
] = None,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> Iterable[spanner.BatchWriteResponse]:
r"""Batches the supplied mutation groups in a collection
of efficient transactions. All mutations in a group are
committed atomically. However, mutations across groups
can be committed non-atomically in an unspecified order
and thus, they must be independent of each other.
Partial failure is possible, i.e., some groups may have
been committed successfully, while some may have failed.
The results of individual batches are streamed into the
response as the batches are applied.
BatchWrite requests are not replay protected, meaning
that each mutation group may be applied more than once.
Replays of non-idempotent mutations may have undesirable
effects. For example, replays of an insert mutation may
produce an already exists error or if you use generated
or commit timestamp-based keys, it may result in
additional rows being added to the mutation's table. We
recommend structuring your mutation groups to be
idempotent to avoid this issue.
.. code-block:: python
# This snippet has been automatically generated and should be regarded as a
# code template only.
# It will require modifications to work:
# - It may require correct/in-range values for request initialization.
# - It may require specifying regional endpoints when creating the service
# client as shown in:
# https://googleapis.dev/python/google-api-core/latest/client_options.html
from google.cloud import spanner_v1
def sample_batch_write():
# Create a client
client = spanner_v1.SpannerClient()
# Initialize request argument(s)
mutation_groups = spanner_v1.MutationGroup()
mutation_groups.mutations.insert.table = "table_value"
request = spanner_v1.BatchWriteRequest(
session="session_value",
mutation_groups=mutation_groups,
)
# Make the request
stream = client.batch_write(request=request)
# Handle the response
for response in stream:
print(response)
Args:
request (Union[google.cloud.spanner_v1.types.BatchWriteRequest, dict]):
The request object. The request for
[BatchWrite][google.spanner.v1.Spanner.BatchWrite].
session (str):
Required. The session in which the
batch request is to be run.
This corresponds to the ``session`` field
on the ``request`` instance; if ``request`` is provided, this
should not be set.
mutation_groups (MutableSequence[google.cloud.spanner_v1.types.BatchWriteRequest.MutationGroup]):
Required. The groups of mutations to
be applied.
This corresponds to the ``mutation_groups`` field
on the ``request`` instance; if ``request`` is provided, this
should not be set.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Returns:
Iterable[google.cloud.spanner_v1.types.BatchWriteResponse]:
The result of applying a batch of
mutations.
"""
# Create or coerce a protobuf request object.
# Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
has_flattened_params = any([session, mutation_groups])
if request is not None and has_flattened_params:
raise ValueError(
"If the `request` argument is set, then none of "
"the individual field arguments should be set."
)

# Minor optimization to avoid making a copy if the user passes
# in a spanner.BatchWriteRequest.
# There's no risk of modifying the input as we've already verified
# there are no flattened fields.
if not isinstance(request, spanner.BatchWriteRequest):
request = spanner.BatchWriteRequest(request)
# If we have keyword arguments corresponding to fields on the
# request, apply these.
if session is not None:
request.session = session
if mutation_groups is not None:
request.mutation_groups = mutation_groups

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = self._transport._wrapped_methods[self._transport.batch_write]

# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata) + (
gapic_v1.routing_header.to_grpc_metadata((("session", request.session),)),
)

# Send the request.
response = rpc(
request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

# Done; return the response.
return response

def __enter__(self) -> "SpannerClient":
return self

Expand Down
14 changes: 14 additions & 0 deletions google/cloud/spanner_v1/services/spanner/transports/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,11 @@ def _prep_wrapped_messages(self, client_info):
default_timeout=30.0,
client_info=client_info,
),
self.batch_write: gapic_v1.method.wrap_method(
self.batch_write,
default_timeout=3600.0,
client_info=client_info,
),
}

def close(self):
Expand Down Expand Up @@ -473,6 +478,15 @@ def partition_read(
]:
raise NotImplementedError()

@property
def batch_write(
self,
) -> Callable[
[spanner.BatchWriteRequest],
Union[spanner.BatchWriteResponse, Awaitable[spanner.BatchWriteResponse]],
]:
raise NotImplementedError()

@property
def kind(self) -> str:
raise NotImplementedError()
Expand Down
Loading

0 comments on commit d0e4ffc

Please sign in to comment.