Skip to content

Commit

Permalink
Add config option for snapshot time (#1221)
Browse files Browse the repository at this point in the history
Co-authored-by: Thomas Powell <thomas.samuel.powell@gmail.com>
  • Loading branch information
tom-s-powell and Thomas Powell committed May 6, 2024
1 parent c5a18ab commit 5691e24
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 3 deletions.
10 changes: 9 additions & 1 deletion README-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,15 @@ word-break:break-word
</td>
<td>Read/Write</td>
</tr>

<tr>
<td><code>snapshotTimeMillis</code>
</td>
<td>A timestamp specified in milliseconds to use to read a table snapshot.
By default this is not set and the latest version of a table is read.
<br/> (Optional)
</td>
<td>Read</td>
</tr>
</table>

Options can also be set outside of the code, using the `--conf` parameter of `spark-submit` or `--properties` parameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.protobuf.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
Expand Down Expand Up @@ -162,6 +164,19 @@ public ReadSessionResponse create(
}
Instant sessionPrepEndTime = Instant.now();

TableModifiers.Builder modifiers = TableModifiers.newBuilder();
config
.getSnapshotTimeMillis()
.ifPresent(
millis -> {
Instant snapshotTime = Instant.ofEpochMilli(millis);
modifiers.setSnapshotTime(
Timestamp.newBuilder()
.setSeconds(snapshotTime.getEpochSecond())
.setNanos(snapshotTime.getNano())
.build());
});

CreateReadSessionRequest createReadSessionRequest =
request
.newBuilder()
Expand All @@ -170,6 +185,7 @@ public ReadSessionResponse create(
requestedSession
.setDataFormat(config.getReadDataFormat())
.setReadOptions(readOptions)
.setTableModifiers(modifiers)
.setTable(tablePath)
.build())
.setMaxStreamCount(maxStreamCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.bigquery.storage.v1.DataFormat;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;

public class ReadSessionCreatorConfig {
private final boolean viewsEnabled;
Expand All @@ -42,6 +43,7 @@ public class ReadSessionCreatorConfig {
private final Optional<String> traceId;
private final boolean enableReadSessionCaching;
private final long readSessionCacheDurationMins;
private final OptionalLong snapshotTimeMillis;

ReadSessionCreatorConfig(
boolean viewsEnabled,
Expand All @@ -64,7 +66,8 @@ public class ReadSessionCreatorConfig {
CompressionCodec arrowCompressionCodec,
Optional<String> traceId,
boolean enableReadSessionCaching,
long readSessionCacheDurationMins) {
long readSessionCacheDurationMins,
OptionalLong snapshotTimeMillis) {
this.viewsEnabled = viewsEnabled;
this.materializationProject = materializationProject;
this.materializationDataset = materializationDataset;
Expand All @@ -86,6 +89,7 @@ public class ReadSessionCreatorConfig {
this.traceId = traceId;
this.enableReadSessionCaching = enableReadSessionCaching;
this.readSessionCacheDurationMins = readSessionCacheDurationMins;
this.snapshotTimeMillis = snapshotTimeMillis;
}

public boolean isViewsEnabled() {
Expand Down Expand Up @@ -179,4 +183,8 @@ public boolean isReadSessionCachingEnabled() {
public long getReadSessionCacheDurationMins() {
return readSessionCacheDurationMins;
}

public OptionalLong getSnapshotTimeMillis() {
return snapshotTimeMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;

public class ReadSessionCreatorConfigBuilder {

Expand All @@ -44,6 +45,7 @@ public class ReadSessionCreatorConfigBuilder {
private Optional<String> traceId = Optional.empty();
private boolean enableReadSessionCaching = true;
private long readSessionCacheDurationMins = 5L;
private OptionalLong snapshotTimeMillis = OptionalLong.empty();

@CanIgnoreReturnValue
public ReadSessionCreatorConfigBuilder setViewsEnabled(boolean viewsEnabled) {
Expand Down Expand Up @@ -179,6 +181,12 @@ public ReadSessionCreatorConfigBuilder setReadSessionCacheDurationMins(
return this;
}

@CanIgnoreReturnValue
public ReadSessionCreatorConfigBuilder setSnapshotTimeMillis(OptionalLong snapshotTimeMillis) {
this.snapshotTimeMillis = snapshotTimeMillis;
return this;
}

public ReadSessionCreatorConfig build() {
return new ReadSessionCreatorConfig(
viewsEnabled,
Expand All @@ -201,6 +209,7 @@ public ReadSessionCreatorConfig build() {
arrowCompressionCodec,
traceId,
enableReadSessionCaching,
readSessionCacheDurationMins);
readSessionCacheDurationMins,
snapshotTimeMillis);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,19 @@
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.MockBigQueryRead;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions;
import com.google.cloud.bigquery.storage.v1.ReadStream;
import com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStub;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.Arrays;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.UUID;
import org.junit.After;
import org.junit.AfterClass;
Expand Down Expand Up @@ -286,6 +289,32 @@ public void testMaxStreamCountWithoutMinStreamCount() throws Exception {
assertThat(createReadSessionRequest.getPreferredMinStreamCount()).isEqualTo(10);
}

@Test
public void testSnapshotTimeMillis() throws Exception {
// setting up
when(bigQueryClient.getTable(any())).thenReturn(table);
mockBigQueryRead.reset();
mockBigQueryRead.addResponse(
ReadSession.newBuilder().addStreams(ReadStream.newBuilder().setName("0")).build());
BigQueryClientFactory mockBigQueryClientFactory = mock(BigQueryClientFactory.class);
when(mockBigQueryClientFactory.getBigQueryReadClient()).thenReturn(client);

ReadSessionCreatorConfig config =
new ReadSessionCreatorConfigBuilder()
.setEnableReadSessionCaching(false)
.setSnapshotTimeMillis(OptionalLong.of(1234567890L))
.build();
ReadSessionCreator creator =
new ReadSessionCreator(config, bigQueryClient, mockBigQueryClientFactory);
ReadSessionResponse readSessionResponse =
creator.create(table.getTableId(), ImmutableList.of(), Optional.empty());
assertThat(readSessionResponse).isNotNull();
CreateReadSessionRequest createReadSessionRequest =
(CreateReadSessionRequest) mockBigQueryRead.getRequests().get(0);
assertThat(createReadSessionRequest.getReadSession().getTableModifiers().getSnapshotTime())
.isEqualTo(Timestamp.newBuilder().setSeconds(1234567).setNanos(890000000).build());
}

private void testCacheMissScenario(
ReadSessionCreator creator,
String readSessionName,
Expand Down Expand Up @@ -354,6 +383,7 @@ private ReadSession addCacheEntry(
.setDataFormat(config.getReadDataFormat())
.setReadOptions(readOptions)
.setTable(ReadSessionCreator.toTablePath(table.getTableId()))
.setTableModifiers(TableModifiers.newBuilder())
.build())
.setMaxStreamCount(config.getMaxParallelism().getAsInt())
.setPreferredMinStreamCount(3 * config.getDefaultParallelism())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public static WriteMethod from(@Nullable String writeMethod) {
com.google.common.base.Optional.absent();
private boolean enableReadSessionCaching = true;
private long readSessionCacheDurationMins = 5L;
private Long snapshotTimeMillis = null;
private SparkBigQueryProxyAndHttpConfig sparkBigQueryProxyAndHttpConfig;
private CompressionCodec arrowCompressionCodec = DEFAULT_ARROW_COMPRESSION_CODEC;
private WriteMethod writeMethod = DEFAULT_WRITE_METHOD;
Expand Down Expand Up @@ -584,6 +585,9 @@ public static SparkBigQueryConfig from(

config.gpn = getAnyOption(globalOptions, options, GPN_ATTRIBUTION);

config.snapshotTimeMillis =
getOption(options, "snapshotTimeMillis").transform(Long::valueOf).orNull();

return config;
}

Expand Down Expand Up @@ -1038,6 +1042,10 @@ public Optional<String> getGpn() {
return gpn.toJavaUtil();
}

public OptionalLong getSnapshotTimeMillis() {
return snapshotTimeMillis == null ? OptionalLong.empty() : OptionalLong.of(snapshotTimeMillis);
}

public ReadSessionCreatorConfig toReadSessionCreatorConfig() {
return new ReadSessionCreatorConfigBuilder()
.setViewsEnabled(viewsEnabled)
Expand All @@ -1061,6 +1069,7 @@ public ReadSessionCreatorConfig toReadSessionCreatorConfig() {
.setTraceId(traceId.toJavaUtil())
.setEnableReadSessionCaching(enableReadSessionCaching)
.setReadSessionCacheDurationMins(readSessionCacheDurationMins)
.setSnapshotTimeMillis(getSnapshotTimeMillis())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public void testDefaults() {
assertThat(config.getAllowMapTypeConversion()).isTrue();
assertThat(config.getBigQueryJobTimeoutInMinutes()).isEqualTo(6 * 60);
assertThat(config.getGpn()).isEmpty();
assertThat(config.getSnapshotTimeMillis()).isEmpty();
}

@Test
Expand Down Expand Up @@ -180,6 +181,7 @@ public void testConfigFromOptions() {
.put("allowMapTypeConversion", "false")
.put("bigQueryJobTimeoutInMinutes", "30")
.put("GPN", "testUser")
.put("snapshotTimeMillis", "123456789")
.build());
SparkBigQueryConfig config =
SparkBigQueryConfig.from(
Expand Down Expand Up @@ -235,6 +237,7 @@ public void testConfigFromOptions() {
assertThat(config.getAllowMapTypeConversion()).isFalse();
assertThat(config.getBigQueryJobTimeoutInMinutes()).isEqualTo(30);
assertThat(config.getGpn().get()).isEqualTo("testUser");
assertThat(config.getSnapshotTimeMillis()).hasValue(123456789L);
}

@Test
Expand Down

0 comments on commit 5691e24

Please sign in to comment.