Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config option for snapshot time #1221

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion README-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,15 @@ word-break:break-word
</td>
<td>Read/Write</td>
</tr>

<tr>
<td><code>snapshotTimeMillis</code>
</td>
<td>A timestamp specified in milliseconds to use to read a table snapshot.
By default this is not set and the latest version of a table is read.
<br/> (Optional)
</td>
<td>Read</td>
</tr>
</table>

Options can also be set outside of the code, using the `--conf` parameter of `spark-submit` or `--properties` parameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.protobuf.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
Expand Down Expand Up @@ -162,6 +164,19 @@ public ReadSessionResponse create(
}
Instant sessionPrepEndTime = Instant.now();

TableModifiers.Builder modifiers = TableModifiers.newBuilder();
config
.getSnapshotTimeMillis()
.ifPresent(
millis -> {
Instant snapshotTime = Instant.ofEpochMilli(millis);
modifiers.setSnapshotTime(
Timestamp.newBuilder()
.setSeconds(snapshotTime.getEpochSecond())
.setNanos(snapshotTime.getNano())
.build());
});

CreateReadSessionRequest createReadSessionRequest =
request
.newBuilder()
Expand All @@ -170,6 +185,7 @@ public ReadSessionResponse create(
requestedSession
.setDataFormat(config.getReadDataFormat())
.setReadOptions(readOptions)
.setTableModifiers(modifiers)
.setTable(tablePath)
.build())
.setMaxStreamCount(maxStreamCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.bigquery.storage.v1.DataFormat;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;

public class ReadSessionCreatorConfig {
private final boolean viewsEnabled;
Expand All @@ -42,6 +43,7 @@ public class ReadSessionCreatorConfig {
private final Optional<String> traceId;
private final boolean enableReadSessionCaching;
private final long readSessionCacheDurationMins;
private final OptionalLong snapshotTimeMillis;

ReadSessionCreatorConfig(
boolean viewsEnabled,
Expand All @@ -64,7 +66,8 @@ public class ReadSessionCreatorConfig {
CompressionCodec arrowCompressionCodec,
Optional<String> traceId,
boolean enableReadSessionCaching,
long readSessionCacheDurationMins) {
long readSessionCacheDurationMins,
OptionalLong snapshotTimeMillis) {
this.viewsEnabled = viewsEnabled;
this.materializationProject = materializationProject;
this.materializationDataset = materializationDataset;
Expand All @@ -86,6 +89,7 @@ public class ReadSessionCreatorConfig {
this.traceId = traceId;
this.enableReadSessionCaching = enableReadSessionCaching;
this.readSessionCacheDurationMins = readSessionCacheDurationMins;
this.snapshotTimeMillis = snapshotTimeMillis;
}

public boolean isViewsEnabled() {
Expand Down Expand Up @@ -179,4 +183,8 @@ public boolean isReadSessionCachingEnabled() {
public long getReadSessionCacheDurationMins() {
return readSessionCacheDurationMins;
}

public OptionalLong getSnapshotTimeMillis() {
return snapshotTimeMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;

public class ReadSessionCreatorConfigBuilder {

Expand All @@ -44,6 +45,7 @@ public class ReadSessionCreatorConfigBuilder {
private Optional<String> traceId = Optional.empty();
private boolean enableReadSessionCaching = true;
private long readSessionCacheDurationMins = 5L;
private OptionalLong snapshotTimeMillis = OptionalLong.empty();

@CanIgnoreReturnValue
public ReadSessionCreatorConfigBuilder setViewsEnabled(boolean viewsEnabled) {
Expand Down Expand Up @@ -179,6 +181,12 @@ public ReadSessionCreatorConfigBuilder setReadSessionCacheDurationMins(
return this;
}

@CanIgnoreReturnValue
public ReadSessionCreatorConfigBuilder setSnapshotTimeMillis(OptionalLong snapshotTimeMillis) {
this.snapshotTimeMillis = snapshotTimeMillis;
return this;
}

public ReadSessionCreatorConfig build() {
return new ReadSessionCreatorConfig(
viewsEnabled,
Expand All @@ -201,6 +209,7 @@ public ReadSessionCreatorConfig build() {
arrowCompressionCodec,
traceId,
enableReadSessionCaching,
readSessionCacheDurationMins);
readSessionCacheDurationMins,
snapshotTimeMillis);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,19 @@
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.MockBigQueryRead;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions;
import com.google.cloud.bigquery.storage.v1.ReadStream;
import com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStub;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.Arrays;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.UUID;
import org.junit.After;
import org.junit.AfterClass;
Expand Down Expand Up @@ -286,6 +289,32 @@ public void testMaxStreamCountWithoutMinStreamCount() throws Exception {
assertThat(createReadSessionRequest.getPreferredMinStreamCount()).isEqualTo(10);
}

@Test
public void testSnapshotTimeMillis() throws Exception {
// setting up
when(bigQueryClient.getTable(any())).thenReturn(table);
mockBigQueryRead.reset();
mockBigQueryRead.addResponse(
ReadSession.newBuilder().addStreams(ReadStream.newBuilder().setName("0")).build());
BigQueryClientFactory mockBigQueryClientFactory = mock(BigQueryClientFactory.class);
when(mockBigQueryClientFactory.getBigQueryReadClient()).thenReturn(client);

ReadSessionCreatorConfig config =
new ReadSessionCreatorConfigBuilder()
.setEnableReadSessionCaching(false)
.setSnapshotTimeMillis(OptionalLong.of(1234567890L))
.build();
ReadSessionCreator creator =
new ReadSessionCreator(config, bigQueryClient, mockBigQueryClientFactory);
ReadSessionResponse readSessionResponse =
creator.create(table.getTableId(), ImmutableList.of(), Optional.empty());
assertThat(readSessionResponse).isNotNull();
CreateReadSessionRequest createReadSessionRequest =
(CreateReadSessionRequest) mockBigQueryRead.getRequests().get(0);
assertThat(createReadSessionRequest.getReadSession().getTableModifiers().getSnapshotTime())
.isEqualTo(Timestamp.newBuilder().setSeconds(1234567).setNanos(890000000).build());
}

private void testCacheMissScenario(
ReadSessionCreator creator,
String readSessionName,
Expand Down Expand Up @@ -354,6 +383,7 @@ private ReadSession addCacheEntry(
.setDataFormat(config.getReadDataFormat())
.setReadOptions(readOptions)
.setTable(ReadSessionCreator.toTablePath(table.getTableId()))
.setTableModifiers(TableModifiers.newBuilder())
.build())
.setMaxStreamCount(config.getMaxParallelism().getAsInt())
.setPreferredMinStreamCount(3 * config.getDefaultParallelism())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public static WriteMethod from(@Nullable String writeMethod) {
com.google.common.base.Optional.absent();
private boolean enableReadSessionCaching = true;
private long readSessionCacheDurationMins = 5L;
private OptionalLong snapshotTimeMillis = OptionalLong.empty();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkBigQueryConfig is a Serializable class, and therefore we need to keep all of it's members Serializable as well. This is why all Optionals are the guava version and not Java's. Please replace this OptionalLong with com.google.common.base.Optional<Long>? The conversion to OptionalLong should be done in getSnapshotTimeMillis()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have switched to have similar behaviour to partitionExpirationMs

private SparkBigQueryProxyAndHttpConfig sparkBigQueryProxyAndHttpConfig;
private CompressionCodec arrowCompressionCodec = DEFAULT_ARROW_COMPRESSION_CODEC;
private WriteMethod writeMethod = DEFAULT_WRITE_METHOD;
Expand Down Expand Up @@ -584,6 +585,12 @@ public static SparkBigQueryConfig from(

config.gpn = getAnyOption(globalOptions, options, GPN_ATTRIBUTION);

config.snapshotTimeMillis =
getOption(options, "snapshotTimeMillis")
.transform(Long::valueOf)
.transform(OptionalLong::of)
.or(OptionalLong::empty);

return config;
}

Expand Down Expand Up @@ -1038,6 +1045,10 @@ public Optional<String> getGpn() {
return gpn.toJavaUtil();
}

public OptionalLong getSnapshotTimeMillis() {
return snapshotTimeMillis;
}

public ReadSessionCreatorConfig toReadSessionCreatorConfig() {
return new ReadSessionCreatorConfigBuilder()
.setViewsEnabled(viewsEnabled)
Expand All @@ -1061,6 +1072,7 @@ public ReadSessionCreatorConfig toReadSessionCreatorConfig() {
.setTraceId(traceId.toJavaUtil())
.setEnableReadSessionCaching(enableReadSessionCaching)
.setReadSessionCacheDurationMins(readSessionCacheDurationMins)
.setSnapshotTimeMillis(snapshotTimeMillis)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public void testDefaults() {
assertThat(config.getAllowMapTypeConversion()).isTrue();
assertThat(config.getBigQueryJobTimeoutInMinutes()).isEqualTo(6 * 60);
assertThat(config.getGpn()).isEmpty();
assertThat(config.getSnapshotTimeMillis()).isEmpty();
}

@Test
Expand Down Expand Up @@ -180,6 +181,7 @@ public void testConfigFromOptions() {
.put("allowMapTypeConversion", "false")
.put("bigQueryJobTimeoutInMinutes", "30")
.put("GPN", "testUser")
.put("snapshotTimeMillis", "123456789")
.build());
SparkBigQueryConfig config =
SparkBigQueryConfig.from(
Expand Down Expand Up @@ -235,6 +237,7 @@ public void testConfigFromOptions() {
assertThat(config.getAllowMapTypeConversion()).isFalse();
assertThat(config.getBigQueryJobTimeoutInMinutes()).isEqualTo(30);
assertThat(config.getGpn().get()).isEqualTo("testUser");
assertThat(config.getSnapshotTimeMillis()).hasValue(123456789L);
}

@Test
Expand Down
Loading