Skip to content

Commit

Permalink
feat: Adding support for databoost enabled in PartitionedRead and Par…
Browse files Browse the repository at this point in the history
…titionedQuery (#2316)

* feat: Adding support for spanner serverless analytics

* feat: Adding BetaAPI annotation to flag out changes are under development

* test: Adding Integration test

* style: formatting

* feat: Using databoost field instead of serverless analytics

* feat: Integration test and sample.

* refactor: method name change

* refactor: minor refactoring

* refactor: Adding more junit and java docs
  • Loading branch information
gauravpurohit06 committed Mar 20, 2023
1 parent 6159d7e commit f39e4a3
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 8 deletions.
Expand Up @@ -595,6 +595,9 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
builder.setTransaction(selector);
}
}
if (options.hasDataBoostEnabled()) {
builder.setDataBoostEnabled(options.dataBoostEnabled());
}
builder.setSeqno(getSeqNo());
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
builder.setRequestOptions(buildRequestOptions(options));
Expand Down Expand Up @@ -773,6 +776,9 @@ ResultSet readInternalWithOptions(
if (partitionToken != null) {
builder.setPartitionToken(partitionToken);
}
if (readOptions.hasDataBoostEnabled()) {
builder.setDataBoostEnabled(readOptions.dataBoostEnabled());
}
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
ResumableStreamIterator stream =
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner;

import com.google.api.core.BetaApi;
import com.google.common.base.Preconditions;
import com.google.spanner.v1.RequestOptions.Priority;
import java.io.Serializable;
Expand Down Expand Up @@ -154,6 +155,16 @@ public static ListOption pageSize(int pageSize) {
return new PageSizeOption(pageSize);
}

/**
* If this is for a partitioned read & query and this field is set to `true`, the request will be
* executed via Spanner independent compute resources. The method is available in Beta mode (and
* is not generally available now).
*/
@BetaApi
public static DataBoostQueryOption dataBoostEnabled(Boolean dataBoostEnabled) {
return new DataBoostQueryOption(dataBoostEnabled);
}

/**
* Specifying this will cause the list operation to start fetching the record from this onwards.
*/
Expand Down Expand Up @@ -329,6 +340,7 @@ void appendToOptions(Options options) {
private String etag;
private Boolean validateOnly;
private Boolean withOptimisticLock;
private Boolean dataBoostEnabled;

// Construction is via factory methods below.
private Options() {}
Expand Down Expand Up @@ -421,6 +433,14 @@ Boolean withOptimisticLock() {
return withOptimisticLock;
}

boolean hasDataBoostEnabled() {
return dataBoostEnabled != null;
}

Boolean dataBoostEnabled() {
return dataBoostEnabled;
}

@Override
public String toString() {
StringBuilder b = new StringBuilder();
Expand Down Expand Up @@ -457,6 +477,9 @@ public String toString() {
if (withOptimisticLock != null) {
b.append("withOptimisticLock: ").append(withOptimisticLock).append(' ');
}
if (dataBoostEnabled != null) {
b.append("dataBoostEnabled: ").append(dataBoostEnabled).append(' ');
}
return b.toString();
}

Expand Down Expand Up @@ -491,7 +514,8 @@ public boolean equals(Object o) {
&& Objects.equals(tag(), that.tag())
&& Objects.equals(etag(), that.etag())
&& Objects.equals(validateOnly(), that.validateOnly())
&& Objects.equals(withOptimisticLock(), that.withOptimisticLock());
&& Objects.equals(withOptimisticLock(), that.withOptimisticLock())
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled());
}

@Override
Expand Down Expand Up @@ -533,6 +557,9 @@ public int hashCode() {
if (withOptimisticLock != null) {
result = 31 * result + withOptimisticLock.hashCode();
}
if (dataBoostEnabled != null) {
result = 31 * result + dataBoostEnabled.hashCode();
}
return result;
}

Expand Down Expand Up @@ -613,6 +640,20 @@ void appendToOptions(Options options) {
}
}

static final class DataBoostQueryOption extends InternalOption implements ReadAndQueryOption {

private final Boolean dataBoostEnabled;

DataBoostQueryOption(Boolean dataBoostEnabled) {
this.dataBoostEnabled = dataBoostEnabled;
}

@Override
void appendToOptions(Options options) {
options.dataBoostEnabled = dataBoostEnabled;
}
}

static class PageSizeOption extends InternalOption implements ListOption {
private final int pageSize;

Expand Down
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -188,6 +189,17 @@ public void testGetExecuteSqlRequestBuilderWithPriority() {
assertEquals(Priority.PRIORITY_MEDIUM, request.getRequestOptions().getPriority());
}

@Test
public void testGetExecuteSqlRequestBuilderWithDataBoost() {
ExecuteSqlRequest.Builder request =
context.getExecuteSqlRequestBuilder(
Statement.of("SELECT * FROM FOO"),
QueryMode.NORMAL,
Options.fromQueryOptions(Options.dataBoostEnabled(true)),
false);
assertTrue(request.getDataBoostEnabled());
}

@Test
public void testGetExecuteBatchDmlRequestBuilderWithPriority() {
ExecuteBatchDmlRequest.Builder request =
Expand Down
Expand Up @@ -63,11 +63,15 @@ public void zeroPrefetchChunksNotAllowed() {

@Test
public void allOptionsPresent() {
Options options = Options.fromReadOptions(Options.limit(10), Options.prefetchChunks(1));
Options options =
Options.fromReadOptions(
Options.limit(10), Options.prefetchChunks(1), Options.dataBoostEnabled(true));
assertThat(options.hasLimit()).isTrue();
assertThat(options.limit()).isEqualTo(10);
assertThat(options.hasPrefetchChunks()).isTrue();
assertThat(options.prefetchChunks()).isEqualTo(1);
assertThat(options.hasDataBoostEnabled()).isTrue();
assertTrue(options.dataBoostEnabled());
}

@Test
Expand All @@ -79,6 +83,7 @@ public void allOptionsAbsent() {
assertThat(options.hasPageToken()).isFalse();
assertThat(options.hasPriority()).isFalse();
assertThat(options.hasTag()).isFalse();
assertThat(options.hasDataBoostEnabled()).isFalse();
assertThat(options.toString()).isEqualTo("");
assertThat(options.equals(options)).isTrue();
assertThat(options.equals(null)).isFalse();
Expand Down Expand Up @@ -153,11 +158,17 @@ public void listEquality() {
public void readOptionsTest() {
int limit = 3;
String tag = "app=spanner,env=test,action=read";
Options options = Options.fromReadOptions(Options.limit(limit), Options.tag(tag));
boolean dataBoost = true;
Options options =
Options.fromReadOptions(
Options.limit(limit), Options.tag(tag), Options.dataBoostEnabled(true));

assertThat(options.toString()).isEqualTo("limit: " + limit + " " + "tag: " + tag + " ");
assertThat(options.toString())
.isEqualTo(
"limit: " + limit + " " + "tag: " + tag + " " + "dataBoostEnabled: " + dataBoost + " ");
assertThat(options.tag()).isEqualTo(tag);
assertThat(options.hashCode()).isEqualTo(-1111478426);
assertEquals(dataBoost, options.dataBoostEnabled());
assertThat(options.hashCode()).isEqualTo(-96091607);
}

@Test
Expand Down Expand Up @@ -185,12 +196,41 @@ public void readEquality() {
public void queryOptionsTest() {
int chunks = 3;
String tag = "app=spanner,env=test,action=query";
Options options = Options.fromQueryOptions(Options.prefetchChunks(chunks), Options.tag(tag));
boolean dataBoost = true;
Options options =
Options.fromQueryOptions(
Options.prefetchChunks(chunks), Options.tag(tag), Options.dataBoostEnabled(true));
assertThat(options.toString())
.isEqualTo("prefetchChunks: " + chunks + " " + "tag: " + tag + " ");
.isEqualTo(
"prefetchChunks: "
+ chunks
+ " "
+ "tag: "
+ tag
+ " "
+ "dataBoostEnabled: "
+ dataBoost
+ " ");
assertThat(options.prefetchChunks()).isEqualTo(chunks);
assertThat(options.tag()).isEqualTo(tag);
assertThat(options.hashCode()).isEqualTo(-97431824);
assertEquals(dataBoost, options.dataBoostEnabled());
assertThat(options.hashCode()).isEqualTo(1274581983);
}

@Test
public void testReadOptionsDataBoost() {
boolean dataBoost = true;
Options options = Options.fromReadOptions(Options.dataBoostEnabled(true));
assertTrue(options.hasDataBoostEnabled());
assertEquals("dataBoostEnabled: " + dataBoost + " ", options.toString());
}

@Test
public void testQueryOptionsDataBoost() {
boolean dataBoost = true;
Options options = Options.fromQueryOptions(Options.dataBoostEnabled(true));
assertTrue(options.hasDataBoostEnabled());
assertEquals("dataBoostEnabled: " + dataBoost + " ", options.toString());
}

@Test
Expand Down
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.spanner.IntegrationTestEnv;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.ParallelIntegrationTest;
import com.google.cloud.spanner.Partition;
import com.google.cloud.spanner.PartitionOptions;
Expand Down Expand Up @@ -238,6 +239,25 @@ public void readUsingIndex() {
assertThat(numRowsRead).isEqualTo(numRows);
}

@Test
public void dataBoostRead() {
assumeFalse("Emulator does not support data boost read", isUsingEmulator());

BitSet seenRows = new BitSet(numRows);
TimestampBound bound = getRandomBound();
PartitionOptions partitionParams = getRandomPartitionOptions();
batchTxn = getBatchClient().batchReadOnlyTransaction(bound);
List<Partition> partitions =
batchTxn.partitionRead(
partitionParams,
TABLE_NAME,
KeySet.all(),
Arrays.asList("Key", "Data", "Fingerprint", "Size"),
Options.dataBoostEnabled(true));
BatchTransactionId txnID = batchTxn.getBatchTransactionId();
fetchAndValidateRows(partitions, txnID, seenRows);
}

@After
public void tearDown() {
if (batchTxn != null) {
Expand Down Expand Up @@ -273,6 +293,22 @@ private PartitionOptions getRandomPartitionOptions() {
return parameters;
}

@Test
public void dataBoostQuery() {
assumeFalse("Emulator does not support data boost query", isUsingEmulator());
BitSet seenRows = new BitSet(numRows);
TimestampBound bound = getRandomBound();
PartitionOptions partitionParams = getRandomPartitionOptions();
batchTxn = getBatchClient().batchReadOnlyTransaction(bound);
List<Partition> partitions =
batchTxn.partitionQuery(
partitionParams,
Statement.of("SELECT Key, Data, Fingerprint, Size FROM " + TABLE_NAME),
Options.dataBoostEnabled(true));
BatchTransactionId txnID = batchTxn.getBatchTransactionId();
fetchAndValidateRows(partitions, txnID, seenRows);
}

private TimestampBound getRandomBound() {
Date date = new Date();
switch (RANDOM.nextInt(3)) {
Expand Down

0 comments on commit f39e4a3

Please sign in to comment.