diff --git a/README-template.md b/README-template.md
index 40ef53aca..81acbaa32 100644
--- a/README-template.md
+++ b/README-template.md
@@ -895,7 +895,15 @@ word-break:break-word
Read/Write |
-
+
+ snapshotTimeMillis
+ |
+ A timestamp specified in milliseconds to use to read a table snapshot.
+ By default this is not set and the latest version of a table is read.
+ (Optional)
+ |
+ Read |
+
Options can also be set outside of the code, using the `--conf` parameter of `spark-submit` or `--properties` parameter
diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java
index 807d6f758..d92e143dd 100644
--- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java
+++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java
@@ -25,12 +25,14 @@
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.ReadSession;
+import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
+import com.google.protobuf.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
@@ -162,6 +164,19 @@ public ReadSessionResponse create(
}
Instant sessionPrepEndTime = Instant.now();
+ TableModifiers.Builder modifiers = TableModifiers.newBuilder();
+ config
+ .getSnapshotTimeMillis()
+ .ifPresent(
+ millis -> {
+ Instant snapshotTime = Instant.ofEpochMilli(millis);
+ modifiers.setSnapshotTime(
+ Timestamp.newBuilder()
+ .setSeconds(snapshotTime.getEpochSecond())
+ .setNanos(snapshotTime.getNano())
+ .build());
+ });
+
CreateReadSessionRequest createReadSessionRequest =
request
.newBuilder()
@@ -170,6 +185,7 @@ public ReadSessionResponse create(
requestedSession
.setDataFormat(config.getReadDataFormat())
.setReadOptions(readOptions)
+ .setTableModifiers(modifiers)
.setTable(tablePath)
.build())
.setMaxStreamCount(maxStreamCount)
diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreatorConfig.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreatorConfig.java
index 35f1227ea..7d2e264ba 100644
--- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreatorConfig.java
+++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreatorConfig.java
@@ -19,6 +19,7 @@
import com.google.cloud.bigquery.storage.v1.DataFormat;
import java.util.Optional;
import java.util.OptionalInt;
+import java.util.OptionalLong;
public class ReadSessionCreatorConfig {
private final boolean viewsEnabled;
@@ -42,6 +43,7 @@ public class ReadSessionCreatorConfig {
private final Optional traceId;
private final boolean enableReadSessionCaching;
private final long readSessionCacheDurationMins;
+ private final OptionalLong snapshotTimeMillis;
ReadSessionCreatorConfig(
boolean viewsEnabled,
@@ -64,7 +66,8 @@ public class ReadSessionCreatorConfig {
CompressionCodec arrowCompressionCodec,
Optional traceId,
boolean enableReadSessionCaching,
- long readSessionCacheDurationMins) {
+ long readSessionCacheDurationMins,
+ OptionalLong snapshotTimeMillis) {
this.viewsEnabled = viewsEnabled;
this.materializationProject = materializationProject;
this.materializationDataset = materializationDataset;
@@ -86,6 +89,7 @@ public class ReadSessionCreatorConfig {
this.traceId = traceId;
this.enableReadSessionCaching = enableReadSessionCaching;
this.readSessionCacheDurationMins = readSessionCacheDurationMins;
+ this.snapshotTimeMillis = snapshotTimeMillis;
}
public boolean isViewsEnabled() {
@@ -179,4 +183,8 @@ public boolean isReadSessionCachingEnabled() {
public long getReadSessionCacheDurationMins() {
return readSessionCacheDurationMins;
}
+
+ public OptionalLong getSnapshotTimeMillis() {
+ return snapshotTimeMillis;
+ }
}
diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreatorConfigBuilder.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreatorConfigBuilder.java
index 9254918d2..cda3b9e2b 100644
--- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreatorConfigBuilder.java
+++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreatorConfigBuilder.java
@@ -20,6 +20,7 @@
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.Optional;
import java.util.OptionalInt;
+import java.util.OptionalLong;
public class ReadSessionCreatorConfigBuilder {
@@ -44,6 +45,7 @@ public class ReadSessionCreatorConfigBuilder {
private Optional traceId = Optional.empty();
private boolean enableReadSessionCaching = true;
private long readSessionCacheDurationMins = 5L;
+ private OptionalLong snapshotTimeMillis = OptionalLong.empty();
@CanIgnoreReturnValue
public ReadSessionCreatorConfigBuilder setViewsEnabled(boolean viewsEnabled) {
@@ -179,6 +181,12 @@ public ReadSessionCreatorConfigBuilder setReadSessionCacheDurationMins(
return this;
}
+ @CanIgnoreReturnValue
+ public ReadSessionCreatorConfigBuilder setSnapshotTimeMillis(OptionalLong snapshotTimeMillis) {
+ this.snapshotTimeMillis = snapshotTimeMillis;
+ return this;
+ }
+
public ReadSessionCreatorConfig build() {
return new ReadSessionCreatorConfig(
viewsEnabled,
@@ -201,6 +209,7 @@ public ReadSessionCreatorConfig build() {
arrowCompressionCodec,
traceId,
enableReadSessionCaching,
- readSessionCacheDurationMins);
+ readSessionCacheDurationMins,
+ snapshotTimeMillis);
}
}
diff --git a/bigquery-connector-common/src/test/java/com/google/cloud/bigquery/connector/common/ReadSessionCreatorTest.java b/bigquery-connector-common/src/test/java/com/google/cloud/bigquery/connector/common/ReadSessionCreatorTest.java
index c576340ce..dfae6804a 100644
--- a/bigquery-connector-common/src/test/java/com/google/cloud/bigquery/connector/common/ReadSessionCreatorTest.java
+++ b/bigquery-connector-common/src/test/java/com/google/cloud/bigquery/connector/common/ReadSessionCreatorTest.java
@@ -41,16 +41,19 @@
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.MockBigQueryRead;
import com.google.cloud.bigquery.storage.v1.ReadSession;
+import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions;
import com.google.cloud.bigquery.storage.v1.ReadStream;
import com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStub;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.Arrays;
import java.util.Optional;
import java.util.OptionalInt;
+import java.util.OptionalLong;
import java.util.UUID;
import org.junit.After;
import org.junit.AfterClass;
@@ -286,6 +289,32 @@ public void testMaxStreamCountWithoutMinStreamCount() throws Exception {
assertThat(createReadSessionRequest.getPreferredMinStreamCount()).isEqualTo(10);
}
+ @Test
+ public void testSnapshotTimeMillis() throws Exception {
+ // setting up
+ when(bigQueryClient.getTable(any())).thenReturn(table);
+ mockBigQueryRead.reset();
+ mockBigQueryRead.addResponse(
+ ReadSession.newBuilder().addStreams(ReadStream.newBuilder().setName("0")).build());
+ BigQueryClientFactory mockBigQueryClientFactory = mock(BigQueryClientFactory.class);
+ when(mockBigQueryClientFactory.getBigQueryReadClient()).thenReturn(client);
+
+ ReadSessionCreatorConfig config =
+ new ReadSessionCreatorConfigBuilder()
+ .setEnableReadSessionCaching(false)
+ .setSnapshotTimeMillis(OptionalLong.of(1234567890L))
+ .build();
+ ReadSessionCreator creator =
+ new ReadSessionCreator(config, bigQueryClient, mockBigQueryClientFactory);
+ ReadSessionResponse readSessionResponse =
+ creator.create(table.getTableId(), ImmutableList.of(), Optional.empty());
+ assertThat(readSessionResponse).isNotNull();
+ CreateReadSessionRequest createReadSessionRequest =
+ (CreateReadSessionRequest) mockBigQueryRead.getRequests().get(0);
+ assertThat(createReadSessionRequest.getReadSession().getTableModifiers().getSnapshotTime())
+ .isEqualTo(Timestamp.newBuilder().setSeconds(1234567).setNanos(890000000).build());
+ }
+
private void testCacheMissScenario(
ReadSessionCreator creator,
String readSessionName,
@@ -354,6 +383,7 @@ private ReadSession addCacheEntry(
.setDataFormat(config.getReadDataFormat())
.setReadOptions(readOptions)
.setTable(ReadSessionCreator.toTablePath(table.getTableId()))
+ .setTableModifiers(TableModifiers.newBuilder())
.build())
.setMaxStreamCount(config.getMaxParallelism().getAsInt())
.setPreferredMinStreamCount(3 * config.getDefaultParallelism())
diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java
index 47cc261b9..e052a5d43 100644
--- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java
+++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java
@@ -218,6 +218,7 @@ public static WriteMethod from(@Nullable String writeMethod) {
com.google.common.base.Optional.absent();
private boolean enableReadSessionCaching = true;
private long readSessionCacheDurationMins = 5L;
+ private Long snapshotTimeMillis = null;
private SparkBigQueryProxyAndHttpConfig sparkBigQueryProxyAndHttpConfig;
private CompressionCodec arrowCompressionCodec = DEFAULT_ARROW_COMPRESSION_CODEC;
private WriteMethod writeMethod = DEFAULT_WRITE_METHOD;
@@ -584,6 +585,9 @@ public static SparkBigQueryConfig from(
config.gpn = getAnyOption(globalOptions, options, GPN_ATTRIBUTION);
+ config.snapshotTimeMillis =
+ getOption(options, "snapshotTimeMillis").transform(Long::valueOf).orNull();
+
return config;
}
@@ -1038,6 +1042,10 @@ public Optional getGpn() {
return gpn.toJavaUtil();
}
+ public OptionalLong getSnapshotTimeMillis() {
+ return snapshotTimeMillis == null ? OptionalLong.empty() : OptionalLong.of(snapshotTimeMillis);
+ }
+
public ReadSessionCreatorConfig toReadSessionCreatorConfig() {
return new ReadSessionCreatorConfigBuilder()
.setViewsEnabled(viewsEnabled)
@@ -1061,6 +1069,7 @@ public ReadSessionCreatorConfig toReadSessionCreatorConfig() {
.setTraceId(traceId.toJavaUtil())
.setEnableReadSessionCaching(enableReadSessionCaching)
.setReadSessionCacheDurationMins(readSessionCacheDurationMins)
+ .setSnapshotTimeMillis(getSnapshotTimeMillis())
.build();
}
diff --git a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/SparkBigQueryConfigTest.java b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/SparkBigQueryConfigTest.java
index 2c6a3b44e..f4cf1fded 100644
--- a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/SparkBigQueryConfigTest.java
+++ b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/SparkBigQueryConfigTest.java
@@ -132,6 +132,7 @@ public void testDefaults() {
assertThat(config.getAllowMapTypeConversion()).isTrue();
assertThat(config.getBigQueryJobTimeoutInMinutes()).isEqualTo(6 * 60);
assertThat(config.getGpn()).isEmpty();
+ assertThat(config.getSnapshotTimeMillis()).isEmpty();
}
@Test
@@ -180,6 +181,7 @@ public void testConfigFromOptions() {
.put("allowMapTypeConversion", "false")
.put("bigQueryJobTimeoutInMinutes", "30")
.put("GPN", "testUser")
+ .put("snapshotTimeMillis", "123456789")
.build());
SparkBigQueryConfig config =
SparkBigQueryConfig.from(
@@ -235,6 +237,7 @@ public void testConfigFromOptions() {
assertThat(config.getAllowMapTypeConversion()).isFalse();
assertThat(config.getBigQueryJobTimeoutInMinutes()).isEqualTo(30);
assertThat(config.getGpn().get()).isEqualTo("testUser");
+ assertThat(config.getSnapshotTimeMillis()).hasValue(123456789L);
}
@Test