Skip to content

Commit

Permalink
feat(Spanner): Enable Leader Aware Routing (LAR) (#6981)
Browse files Browse the repository at this point in the history
Changes required for LAR in PHP Spanner Library.
  • Loading branch information
ajupazhamayil committed Feb 16, 2024
1 parent 1e35432 commit de55bd1
Show file tree
Hide file tree
Showing 11 changed files with 293 additions and 71 deletions.
37 changes: 21 additions & 16 deletions Spanner/src/Connection/Grpc.php
Expand Up @@ -45,6 +45,7 @@
use Google\Cloud\Spanner\Admin\Instance\V1\UpdateInstanceMetadata;
use Google\Cloud\Spanner\Operation;
use Google\Cloud\Spanner\SpannerClient as ManualSpannerClient;
use Google\Cloud\Spanner\RequestHeaderTrait;
use Google\Cloud\Spanner\V1\CreateSessionRequest;
use Google\Cloud\Spanner\V1\DeleteSessionRequest;
use Google\Cloud\Spanner\V1\DirectedReadOptions;
Expand Down Expand Up @@ -83,6 +84,7 @@ class Grpc implements ConnectionInterface
use EmulatorTrait;
use GrpcTrait;
use OperationResponseTrait;
use RequestHeaderTrait;

/**
* @var InstanceAdminClient|null
Expand Down Expand Up @@ -209,6 +211,11 @@ class Grpc implements ConnectionInterface
*/
private $credentialsWrapper;

/**
* @var bool
*/
private $larEnabled;

/**
* @param array $config [optional]
*/
Expand Down Expand Up @@ -270,6 +277,7 @@ public function __construct(array $config = [])
//@codeCoverageIgnoreEnd

$this->grpcConfig = $grpcConfig;
$this->larEnabled = $this->pluck('routeToLeader', $config, false) ?? true;
}

/**
Expand Down Expand Up @@ -806,6 +814,7 @@ function ($value) {
);
}

$args = $this->addLarHeader($args, $this->larEnabled);
return $this->send([$this->spannerClient, 'createSession'], [
$databaseName,
$this->addResourcePrefixHeader($args, $databaseName)
Expand All @@ -825,6 +834,7 @@ public function createSessionAsync(array $args)
{
$databaseName = $this->pluck('database', $args);
$opts = $this->addResourcePrefixHeader([], $databaseName);
$opts = $this->addLarHeader($opts, $this->larEnabled);
$opts['credentialsWrapper'] = $this->credentialsWrapper;
$transport = $this->spannerClient->getTransport();

Expand Down Expand Up @@ -860,6 +870,7 @@ public function batchCreateSessions(array $args)
);

$databaseName = $this->pluck('database', $args);
$args = $this->addLarHeader($args, $this->larEnabled);
return $this->send([$this->spannerClient, 'batchCreateSessions'], [
$databaseName,
$this->pluck('sessionCount', $args),
Expand All @@ -873,6 +884,7 @@ public function batchCreateSessions(array $args)
public function getSession(array $args)
{
$databaseName = $this->pluck('database', $args);
$args = $this->addLarHeader($args, $this->larEnabled);
return $this->send([$this->spannerClient, 'getSession'], [
$this->pluck('name', $args),
$this->addResourcePrefixHeader($args, $databaseName)
Expand Down Expand Up @@ -959,6 +971,7 @@ public function executeStreamingSql(array $args)
);
}

$args = $this->conditionallyUnsetLarHeader($args, $this->larEnabled);
return $this->send([$this->spannerClient, 'executeStreamingSql'], [
$this->pluck('session', $args),
$this->pluck('sql', $args),
Expand Down Expand Up @@ -986,6 +999,7 @@ public function streamingRead(array $args)
$args['transaction'] = $this->createTransactionSelector($args);

$databaseName = $this->pluck('database', $args);
$args = $this->conditionallyUnsetLarHeader($args, $this->larEnabled);
return $this->send([$this->spannerClient, 'streamingRead'], [
$this->pluck('session', $args),
$this->pluck('table', $args),
Expand All @@ -1002,6 +1016,7 @@ public function executeBatchDml(array $args)
{
$databaseName = $this->pluck('database', $args);
$args['transaction'] = $this->createTransactionSelector($args);
$args = $this->addLarHeader($args, $this->larEnabled);

$statements = [];
foreach ($this->pluck('statements', $args) as $statement) {
Expand Down Expand Up @@ -1045,9 +1060,11 @@ public function beginTransaction(array $args)
} elseif (isset($transactionOptions['readWrite'])) {
$readWrite = new ReadWrite();
$options->setReadWrite($readWrite);
$args = $this->addLarHeader($args, $this->larEnabled);
} elseif (isset($transactionOptions['partitionedDml'])) {
$pdml = new PartitionedDml();
$options->setPartitionedDml($pdml);
$args = $this->addLarHeader($args, $this->larEnabled);
}

$requestOptions = $this->pluck('requestOptions', $args, false) ?: [];
Expand Down Expand Up @@ -1135,6 +1152,7 @@ public function commit(array $args)
}

$databaseName = $this->pluck('database', $args);
$args = $this->addLarHeader($args, $this->larEnabled);
return $this->send([$this->spannerClient, 'commit'], [
$this->pluck('session', $args),
$mutations,
Expand All @@ -1148,6 +1166,7 @@ public function commit(array $args)
public function rollback(array $args)
{
$databaseName = $this->pluck('database', $args);
$args = $this->addLarHeader($args, $this->larEnabled);
return $this->send([$this->spannerClient, 'rollback'], [
$this->pluck('session', $args),
$this->pluck('transactionId', $args),
Expand All @@ -1162,6 +1181,7 @@ public function partitionQuery(array $args)
{
$args = $this->formatSqlParams($args);
$args['transaction'] = $this->createTransactionSelector($args);
$args = $this->addLarHeader($args, $this->larEnabled);

$args['partitionOptions'] = $this->serializer->decodeMessage(
new PartitionOptions,
Expand All @@ -1185,6 +1205,7 @@ public function partitionRead(array $args)
$keySet = $this->serializer->decodeMessage(new KeySet, $this->formatKeySet($keySet));

$args['transaction'] = $this->createTransactionSelector($args);
$args = $this->addLarHeader($args, $this->larEnabled);

$args['partitionOptions'] = $this->serializer->decodeMessage(
new PartitionOptions,
Expand Down Expand Up @@ -1511,22 +1532,6 @@ private function formatTransactionOptions(array $transactionOptions)
return $transactionOptions;
}

/**
* Add the `google-cloud-resource-prefix` header value to the request.
*
* @param array $args
* @param string $value
* @return array
*/
private function addResourcePrefixHeader(array $args, $value)
{
$args['headers'] = [
'google-cloud-resource-prefix' => [$value]
];

return $args;
}

/**
* Allow lazy instantiation of the instance admin client.
*
Expand Down
6 changes: 6 additions & 0 deletions Spanner/src/Database.php
Expand Up @@ -101,6 +101,7 @@ class Database
{
use LROTrait;
use TransactionConfigurationTrait;
use RequestHeaderTrait;

const STATE_CREATING = State::CREATING;
const STATE_READY = State::READY;
Expand Down Expand Up @@ -1670,6 +1671,7 @@ public function execute($sql, array $options = [])
$options['transaction'],
$options['transactionContext']
) = $this->transactionSelector($options);
$options = $this->addLarHeader($options, true, $options['transactionContext']);

$options['directedReadOptions'] = $this->configureDirectedReadOptions(
$options,
Expand Down Expand Up @@ -1811,6 +1813,8 @@ public function executePartitionedUpdate($statement, array $options = [])
]
]);

$options = $this->addLarHeader($options);

try {
return $this->operation->executeUpdate($session, $transaction, $statement, [
'statsItem' => 'rowCountLowerBound'
Expand Down Expand Up @@ -1954,6 +1958,8 @@ public function read($table, KeySet $keySet, array $columns, array $options = []
$this->directedReadOptions ?? []
);

$options = $this->addLarHeader($options, true, $context);

try {
return $this->operation->read($session, $table, $keySet, $columns, $options);
} finally {
Expand Down
84 changes: 84 additions & 0 deletions Spanner/src/RequestHeaderTrait.php
@@ -0,0 +1,84 @@
<?php
/**
* Copyright 2024 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace Google\Cloud\Spanner;

use Google\Cloud\Spanner\Session\SessionPoolInterface;

/**
* Shared functionality for request headers.
*
* @internal
*/
trait RequestHeaderTrait
{
private static $larHeader = 'x-goog-spanner-route-to-leader';
private static $resourcePrefixHeader = 'google-cloud-resource-prefix';

/**
* Add the `x-goog-spanner-route-to-leader` header value to the request.
*
* @param array $args Request arguments.
* @param bool $value LAR header value.
* @param string $context Transaction context.
* @return array
*/
private function addLarHeader(
array $args,
bool $value = true,
string $context = SessionPoolInterface::CONTEXT_READWRITE
) {
if (!$value) {
return $args;
}
// If value is true and context is READWRITE, set LAR header.
if ($context === SessionPoolInterface::CONTEXT_READWRITE) {
$args['headers'][self::$larHeader] = ['true'];
}
return $args;
}

/**
* Conditionally unset the LAR header.
*
* @param array $args Request arguments.
* @param bool $value Whether to set or unset the LAR header.
* @return array
*/
private function conditionallyUnsetLarHeader(
array $args,
bool $value = true
) {
if (!$value) {
unset($args['headers'][self::$larHeader]);
}
return $args;
}

/**
* Add the `google-cloud-resource-prefix` header value to the request.
*
* @param array $args Request arguments.
* @param string $value Resource prefix header value.
* @return array
*/
private function addResourcePrefixHeader(array $args, string $value)
{
$args['headers'][self::$resourcePrefixHeader] = [$value];
return $args;
}
}
9 changes: 9 additions & 0 deletions Spanner/src/SpannerClient.php
Expand Up @@ -93,6 +93,13 @@
* $spanner = new SpannerClient($directedOptions);
* ```
*
* ```
* use Google\Cloud\Spanner\SpannerClient;
*
* $config = ['routeToLeader' => false];
* $spanner = new SpannerClient($config);
* ```
*
* @method resumeOperation() {
* Resume a Long Running Operation
*
Expand Down Expand Up @@ -193,6 +200,8 @@ class SpannerClient
* {@see \Google\Cloud\Spanner\V1\DirectedReadOptions}
* If using the `replicaSelection::type` setting, utilize the constants available in
* {@see \Google\Cloud\Spanner\V1\DirectedReadOptions\ReplicaSelection\Type} to set a value.
* @type bool $routeToLeader Enable/disable Leader Aware Routing.
* **Defaults to** `true` (enabled).
* }
* @throws GoogleException If the gRPC extension is not enabled.
*/
Expand Down
2 changes: 1 addition & 1 deletion Spanner/src/Transaction.php
Expand Up @@ -742,6 +742,6 @@ private function buildUpdateOptions(array $options): array
$selector = $this->transactionSelector($options);
$options['transaction'] = $selector[0];

return $options;
return $this->addLarHeader($options);
}
}
5 changes: 5 additions & 0 deletions Spanner/src/TransactionalReadTrait.php
Expand Up @@ -26,6 +26,7 @@
trait TransactionalReadTrait
{
use TransactionConfigurationTrait;
use RequestHeaderTrait;

/**
* @var Operation
Expand Down Expand Up @@ -302,6 +303,8 @@ public function execute($sql, array $options = [])
$this->directedReadOptions ?? []
);

$options = $this->addLarHeader($options, true, $this->context);

$result = $this->operation->execute($this->session, $sql, $options);
if (empty($this->id()) && $result->transaction()) {
$this->setId($result->transaction()->id());
Expand Down Expand Up @@ -380,6 +383,8 @@ public function read($table, KeySet $keySet, array $columns, array $options = []
$this->directedReadOptions ?? []
);

$options = $this->addLarHeader($options, true, $this->context);

$result = $this->operation->read($this->session, $table, $keySet, $columns, $options);
if (empty($this->id()) && $result->transaction()) {
$this->setId($result->transaction()->id());
Expand Down
7 changes: 7 additions & 0 deletions Spanner/tests/Snippet/SpannerClientTest.php
Expand Up @@ -305,4 +305,11 @@ public function testClassWithDirectedRead()
$res = $snippet->invoke('spanner');
$this->assertInstanceOf(SpannerClient::class, $res->returnVal());
}

public function testClassWithLar()
{
$snippet = $this->snippetFromClass(SpannerClient::class, 3);
$res = $snippet->invoke('spanner');
$this->assertInstanceOf(SpannerClient::class, $res->returnVal());
}
}

0 comments on commit de55bd1

Please sign in to comment.