Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support partitioned queries + data boost in Connection API #2540

Merged
merged 22 commits into from Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
052688a
feat: support partitioned queries + data boost in Connection API
olavloite Jul 24, 2023
9817586
fix: match the correct group in regex
olavloite Jul 24, 2023
dc91c9a
Merge branch 'main' into batch-read-connection-api
olavloite Jul 28, 2023
bca9b12
feat: add more SQL statements for partitioned queries
olavloite Jul 28, 2023
bc5b691
chore: refactor client side statements to accept the entire parsed st…
olavloite Jul 28, 2023
56dfb75
Merge branch 'parsed-statement-to-client-executors' into batch-read-c…
olavloite Jul 28, 2023
f0e9f5b
chore: simplify test
olavloite Jul 28, 2023
af211d3
Merge branch 'parsed-statement-to-client-executors' into batch-read-c…
olavloite Jul 28, 2023
090868c
chore: cleanup differences
olavloite Jul 28, 2023
31ba134
chore: cleanup unrelated changes
olavloite Jul 28, 2023
6960e11
fix: update converter name
olavloite Jul 28, 2023
549e870
test: add more tests
olavloite Jul 30, 2023
0b5269e
chore: add missing license header
olavloite Jul 30, 2023
d5248cc
fix: handle empty partitioned queries correctly
olavloite Jul 31, 2023
a461c39
fix: do not use any random staleness for partitioned queries
olavloite Jul 31, 2023
13541b1
fix: only return false for next() if all have finished
olavloite Aug 1, 2023
3ae95e3
chore: rename to autoPartitionMode
olavloite Aug 1, 2023
ed6a34e
chore: rename sql statements + add tests for empty results
olavloite Aug 1, 2023
1b39e8c
Merge branch 'main' into batch-read-connection-api
olavloite Aug 2, 2023
9e6dc6e
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Aug 2, 2023
89889ee
chore: address review comments
olavloite Aug 4, 2023
1658a51
Batch read connection api native adjustments (#2569)
burkedavison Aug 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.20.0')
implementation platform('com.google.cloud:libraries-bom:26.21.0')

implementation 'com.google.cloud:google-cloud-spanner'
```
Expand Down
56 changes: 56 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -360,6 +360,62 @@
<method>boolean isDelayTransactionStartUntilFirstWrite()</method>
</difference>

<!-- Partitioned queries in Connection API -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>int getMaxPartitionedParallelism()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>int getMaxPartitions()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>boolean isAutoPartitionMode()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>boolean isDataBoostEnabled()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.ResultSet partitionQuery(com.google.cloud.spanner.Statement, com.google.cloud.spanner.PartitionOptions, com.google.cloud.spanner.Options$QueryOption[])</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.ResultSet runPartition(java.lang.String)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.connection.PartitionedQueryResultSet runPartitionedQuery(com.google.cloud.spanner.Statement, com.google.cloud.spanner.PartitionOptions, com.google.cloud.spanner.Options$QueryOption[])</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setAutoPartitionMode(boolean)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setDataBoostEnabled(boolean)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setMaxPartitionedParallelism(int)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setMaxPartitions(int)</method>
</difference>
<!-- (Internal change, use stream timeout) -->
<difference>
<differenceType>7012</differenceType>
Expand Down
Expand Up @@ -86,6 +86,12 @@ private Builder(Statement statement) {
statement.queryOptions == null ? null : statement.queryOptions.toBuilder().build();
}

/** Replaces the current SQL of this builder with the given string. */
public Builder replace(String sql) {
sqlBuffer.replace(0, sqlBuffer.length(), sql);
return this;
}

/** Appends {@code sqlFragment} to the statement. */
public Builder append(String sqlFragment) {
sqlBuffer.append(checkNotNull(sqlFragment));
Expand Down
Expand Up @@ -21,13 +21,22 @@
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.spanner.BatchReadOnlyTransaction;
import com.google.cloud.spanner.BatchTransactionId;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.Partition;
import com.google.cloud.spanner.PartitionOptions;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.ResultSets;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.Type.StructField;
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.StatementExecutor.StatementTimeout;
import com.google.common.base.Preconditions;
Expand All @@ -39,13 +48,15 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
Expand Down Expand Up @@ -157,6 +168,39 @@ public void rollbackToSavepoint(
"Rollback to savepoint is not supported for " + getUnitOfWorkName());
}

@Override
public ApiFuture<ResultSet> partitionQueryAsync(
CallType callType,
ParsedStatement query,
PartitionOptions partitionOptions,
QueryOption... options) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION,
"Partition query is not supported for " + getUnitOfWorkName());
}

ResultSet partitionQuery(
BatchReadOnlyTransaction transaction,
PartitionOptions partitionOptions,
ParsedStatement query,
QueryOption... options) {
final String partitionColumnName = "PARTITION";
BatchTransactionId transactionId = transaction.getBatchTransactionId();
List<Partition> partitions =
transaction.partitionQuery(partitionOptions, query.getStatement(), options);
return ResultSets.forRows(
com.google.cloud.spanner.Type.struct(
StructField.of(partitionColumnName, com.google.cloud.spanner.Type.string())),
partitions.stream()
.map(
partition ->
Struct.newBuilder()
.set(partitionColumnName)
.to(PartitionId.encodeToString(transactionId, partition))
.build())
.collect(Collectors.toList()));
}

StatementExecutor getStatementExecutor() {
return statementExecutor;
}
Expand Down
Expand Up @@ -34,6 +34,8 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Internal class for the Spanner Connection API.
Expand Down Expand Up @@ -91,8 +93,7 @@ public static AbstractStatementParser getInstance(Dialect dialect) {
*/

/** Begins a transaction. */
static final ParsedStatement BEGIN_STATEMENT =
AbstractStatementParser.getInstance(Dialect.GOOGLE_STANDARD_SQL).parse(Statement.of("BEGIN"));
static final ParsedStatement BEGIN_STATEMENT;

/**
* Create a COMMIT statement to use with the {@link #commit()} method to allow it to be cancelled,
Expand All @@ -104,14 +105,10 @@ public static AbstractStatementParser getInstance(Dialect dialect) {
* #commit()} method is called directly, we do not have a {@link ParsedStatement}, and the method
* uses this statement instead in order to use the same logic as the other statements.
*/
static final ParsedStatement COMMIT_STATEMENT =
AbstractStatementParser.getInstance(Dialect.GOOGLE_STANDARD_SQL)
.parse(Statement.of("COMMIT"));
static final ParsedStatement COMMIT_STATEMENT;

/** The {@link Statement} and {@link Callable} for rollbacks */
static final ParsedStatement ROLLBACK_STATEMENT =
AbstractStatementParser.getInstance(Dialect.GOOGLE_STANDARD_SQL)
.parse(Statement.of("ROLLBACK"));
static final ParsedStatement ROLLBACK_STATEMENT;

/**
* Create a RUN BATCH statement to use with the {@link #executeBatchUpdate(Iterable)} method to
Expand All @@ -124,9 +121,22 @@ public static AbstractStatementParser getInstance(Dialect dialect) {
* and the method uses this statement instead in order to use the same logic as the other
* statements.
*/
static final ParsedStatement RUN_BATCH_STATEMENT =
AbstractStatementParser.getInstance(Dialect.GOOGLE_STANDARD_SQL)
.parse(Statement.of("RUN BATCH"));
static final ParsedStatement RUN_BATCH_STATEMENT;

static {
try {
BEGIN_STATEMENT = getInstance(Dialect.GOOGLE_STANDARD_SQL).parse(Statement.of("BEGIN"));
COMMIT_STATEMENT = getInstance(Dialect.GOOGLE_STANDARD_SQL).parse(Statement.of("COMMIT"));
ROLLBACK_STATEMENT = getInstance(Dialect.GOOGLE_STANDARD_SQL).parse(Statement.of("ROLLBACK"));
RUN_BATCH_STATEMENT =
getInstance(Dialect.GOOGLE_STANDARD_SQL).parse(Statement.of("RUN BATCH"));

} catch (Throwable ex) {
Logger logger = Logger.getLogger(AbstractStatementParser.class.getName());
logger.log(Level.SEVERE, "Static initialization failure.", ex);
throw ex;
}
}

/** The type of statement that has been recognized by the parser. */
@InternalApi
Expand Down
@@ -0,0 +1,63 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.connection;

import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.ClientSideStatementImpl.CompileException;
import java.lang.reflect.Method;
import java.util.regex.Matcher;

/** Executor for <code>PARTITION &lt;sql&gt;</code> statements. */
class ClientSideStatementPartitionExecutor implements ClientSideStatementExecutor {
private final ClientSideStatementImpl statement;
private final Method method;

ClientSideStatementPartitionExecutor(ClientSideStatementImpl statement) throws CompileException {
try {
this.statement = statement;
this.method =
ConnectionStatementExecutor.class.getDeclaredMethod(
statement.getMethodName(), Statement.class);
} catch (Exception e) {
throw new CompileException(e, statement);
}
}

@Override
public StatementResult execute(
ConnectionStatementExecutor connection, ParsedStatement parsedStatement) throws Exception {
String sql = getParameterValue(parsedStatement);
return (StatementResult)
method.invoke(connection, parsedStatement.getStatement().toBuilder().replace(sql).build());
}

String getParameterValue(ParsedStatement parsedStatement) {
Matcher matcher = statement.getPattern().matcher(parsedStatement.getSqlWithoutComments());
if (matcher.find() && matcher.groupCount() >= 2) {
String space = matcher.group(1);
String value = matcher.group(2);
return (space + value).trim();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion - StringBuilder would be a bit more optimal as compared to + operation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Java compiler automatically optimizes this internally, as it is not dynamic concatenation (e.g. in a loop or other hard-to-understand construct), so for these simple cases you should keep it as is. You will see that if you change this to using a StringBuilder, IntelliJ will even give you a warning.

}
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT,
String.format(
"Invalid argument for PARTITION: %s", parsedStatement.getStatement().getSql()));
}
}
@@ -0,0 +1,81 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.connection;

import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Value;
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.ClientSideStatementImpl.CompileException;
import com.google.common.base.Strings;
import java.lang.reflect.Method;
import java.util.regex.Matcher;

/** Executor for <code>RUN PARTITION &lt;partition_id&gt;</code> statements. */
class ClientSideStatementRunPartitionExecutor implements ClientSideStatementExecutor {
private final ClientSideStatementImpl statement;
private final Method method;

ClientSideStatementRunPartitionExecutor(ClientSideStatementImpl statement)
throws CompileException {
try {
this.statement = statement;
this.method =
ConnectionStatementExecutor.class.getDeclaredMethod(
statement.getMethodName(), String.class);
} catch (Exception e) {
throw new CompileException(e, statement);
}
}

@Override
public StatementResult execute(
ConnectionStatementExecutor connection, ParsedStatement parsedStatement) throws Exception {
String partitionId = getParameterValue(parsedStatement);
if (partitionId == null) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT,
"No valid partition id found in statement: " + parsedStatement.getStatement().getSql());
}
return (StatementResult) method.invoke(connection, partitionId);
}

String getParameterValue(ParsedStatement parsedStatement) {
// The statement has the form `RUN PARTITION ['partition-id']`.
// The regex that is defined for this statement is (simplified) `run\s+partition(?:\s*'(.*)')?`
// This regex has one capturing group, which captures the partition-id inside the single quotes.
// That capturing group is however inside a non-capturing optional group.
// That means that:
// 1. If the matcher matches and returns one or more groups, we know that we have a partition-id
// in the SQL statement itself, as that is the only thing that can be in a capturing group.
// 2. If the matcher matches and returns zero groups, we know that the statement is valid, but
// that it does not contain a partition-id in the SQL statement. The partition-id must then
// be included in the statement as a query parameter.
Matcher matcher = statement.getPattern().matcher(parsedStatement.getSqlWithoutComments());
if (matcher.find() && matcher.groupCount() >= 1) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • It could be just me but a first time reader will have little difficulty in understanding what we are doing here. My understanding is we are parsing and obtaining the partition ID from the statement. In what cases will a statement have a groupCount >=1 ?
  • Should we beef up the documentation a bit by adding examples for future readers?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some commentary to explain what is going on with this regex.

String value = matcher.group(1);
if (!Strings.isNullOrEmpty(value)) {
return value;
}
}
if (parsedStatement.getStatement().getParameters().size() == 1) {
Value value = parsedStatement.getStatement().getParameters().values().iterator().next();
return value.getAsString();
}
return null;
}
}