Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.cloud.RetryHelper.RetryHelperException;
import com.google.cloud.ServiceOptions;
import com.google.cloud.datastore.ReadOption.EventualConsistency;
import com.google.cloud.datastore.ReadOption.ReadTime;
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -338,12 +339,29 @@ public Iterator<Entity> get(Iterable<Key> keys, ReadOption... options) {

private static com.google.datastore.v1.ReadOptions toReadOptionsPb(ReadOption... options) {
com.google.datastore.v1.ReadOptions readOptionsPb = null;
if (options != null
&& ReadOption.asImmutableMap(options).containsKey(EventualConsistency.class)) {
readOptionsPb =
com.google.datastore.v1.ReadOptions.newBuilder()
.setReadConsistency(ReadConsistency.EVENTUAL)
.build();
if (options != null) {
Map<Class<? extends ReadOption>, ReadOption> optionsByType =
ReadOption.asImmutableMap(options);

if (optionsByType.containsKey(EventualConsistency.class)
&& optionsByType.containsKey(ReadTime.class)) {
throw DatastoreException.throwInvalidRequest(
"Can not use eventual consistency read with read time.");
}

if (optionsByType.containsKey(EventualConsistency.class)) {
readOptionsPb =
com.google.datastore.v1.ReadOptions.newBuilder()
.setReadConsistency(ReadConsistency.EVENTUAL)
.build();
}

if (optionsByType.containsKey(ReadTime.class)) {
readOptionsPb =
com.google.datastore.v1.ReadOptions.newBuilder()
.setReadTime(((ReadTime) optionsByType.get(ReadTime.class)).time().toProto())
.build();
}
}
return readOptionsPb;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.datastore;

import com.google.api.core.BetaApi;
import com.google.cloud.Timestamp;
import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
import java.util.Map;
Expand Down Expand Up @@ -47,6 +49,25 @@ public boolean isEventual() {
}
}

/**
* Reads entities as they were at the given time. This may not be older than 270 seconds. This
* value is only supported for Cloud Firestore in Datastore mode.
*/
public static final class ReadTime extends ReadOption {

private static final long serialVersionUID = -6780321449114616067L;

private final Timestamp time;

private ReadTime(Timestamp time) {
this.time = time;
}

public Timestamp time() {
return time;
}
}

private ReadOption() {}

/**
Expand All @@ -57,6 +78,16 @@ public static EventualConsistency eventualConsistency() {
return new EventualConsistency(true);
}

/**
* Returns a {@code ReadOption} that specifies read time, allowing Datastore to return results
* from lookups and queries at a particular timestamp. This feature is currently in private
* preview.
*/
@BetaApi
public static ReadTime readTime(Timestamp time) {
return new ReadTime(time);
}

static Map<Class<? extends ReadOption>, ReadOption> asImmutableMap(ReadOption... options) {
ImmutableMap.Builder<Class<? extends ReadOption>, ReadOption> builder = ImmutableMap.builder();
for (ReadOption option : options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,26 @@ public void testEventualConsistencyQuery() {
EasyMock.verify(rpcFactoryMock, rpcMock);
}

@Test
public void testReadTimeQuery() {
Timestamp timestamp = Timestamp.now();
ReadOptions readOption = ReadOptions.newBuilder().setReadTime(timestamp.toProto()).build();
com.google.datastore.v1.GqlQuery query =
com.google.datastore.v1.GqlQuery.newBuilder().setQueryString("FROM * SELECT *").build();
RunQueryRequest.Builder expectedRequest =
RunQueryRequest.newBuilder()
.setReadOptions(readOption)
.setGqlQuery(query)
.setPartitionId(PartitionId.newBuilder().setProjectId(PROJECT_ID).build());
EasyMock.expect(rpcMock.runQuery(expectedRequest.build()))
.andReturn(RunQueryResponse.newBuilder().build());
EasyMock.replay(rpcFactoryMock, rpcMock);
Datastore datastore = rpcMockOptions.getService();
datastore.run(
Query.newGqlQueryBuilder("FROM * SELECT *").build(), ReadOption.readTime(timestamp));
EasyMock.verify(rpcFactoryMock, rpcMock);
}

@Test
public void testToUrlSafe() {
byte[][] invalidUtf8 =
Expand Down Expand Up @@ -921,6 +941,34 @@ public void testLookupEventualConsistency() {
EasyMock.verify(rpcFactoryMock, rpcMock);
}

@Test
public void testLookupReadTime() {
Timestamp timestamp = Timestamp.now();
ReadOptions readOption = ReadOptions.newBuilder().setReadTime(timestamp.toProto()).build();
com.google.datastore.v1.Key key =
com.google.datastore.v1.Key.newBuilder()
.setPartitionId(PartitionId.newBuilder().setProjectId(PROJECT_ID).build())
.addPath(
com.google.datastore.v1.Key.PathElement.newBuilder()
.setKind("kind1")
.setName("name")
.build())
.build();
LookupRequest lookupRequest =
LookupRequest.newBuilder().setReadOptions(readOption).addKeys(key).build();
EasyMock.expect(rpcMock.lookup(lookupRequest))
.andReturn(LookupResponse.newBuilder().build())
.times(3);
EasyMock.replay(rpcFactoryMock, rpcMock);
com.google.cloud.datastore.Datastore datastore = rpcMockOptions.getService();
datastore.get(KEY1, com.google.cloud.datastore.ReadOption.readTime(timestamp));
datastore.get(
ImmutableList.of(KEY1), com.google.cloud.datastore.ReadOption.readTime(timestamp));
datastore.fetch(
ImmutableList.of(KEY1), com.google.cloud.datastore.ReadOption.readTime(timestamp));
EasyMock.verify(rpcFactoryMock, rpcMock);
}

@Test
public void testGetArrayNoDeferredResults() {
datastore.put(ENTITY3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.google.cloud.datastore.Query;
import com.google.cloud.datastore.Query.ResultType;
import com.google.cloud.datastore.QueryResults;
import com.google.cloud.datastore.ReadOption;
import com.google.cloud.datastore.StringValue;
import com.google.cloud.datastore.StructuredQuery;
import com.google.cloud.datastore.StructuredQuery.OrderBy;
Expand Down Expand Up @@ -647,6 +648,31 @@ public void testGet() {
assertFalse(entity.contains("bla"));
}

@Test
public void testGetWithReadTime() throws InterruptedException {
Key key = Key.newBuilder(PROJECT_ID, "new_kind", "name").setNamespace(NAMESPACE).build();

try {
DATASTORE.put(Entity.newBuilder(key).set("str", "old_str_value").build());

Thread.sleep(1000);
Timestamp now = Timestamp.now();
Thread.sleep(1000);

DATASTORE.put(Entity.newBuilder(key).set("str", "new_str_value").build());

Entity entity = DATASTORE.get(key);
StringValue value1 = entity.getValue("str");
assertEquals(StringValue.of("new_str_value"), value1);

entity = DATASTORE.get(key, ReadOption.readTime(now));
value1 = entity.getValue("str");
assertEquals(StringValue.of("old_str_value"), value1);
} finally {
DATASTORE.delete(key);
}
}

@Test
public void testGetArrayNoDeferredResults() {
DATASTORE.put(ENTITY3);
Expand Down Expand Up @@ -920,4 +946,48 @@ public void testQueryWithStartCursor() {
assertEquals(cursor2, cursor1);
DATASTORE.delete(entity1.getKey(), entity2.getKey(), entity3.getKey());
}

@Test
public void testQueryWithReadTime() throws InterruptedException {
Entity entity1 =
Entity.newBuilder(
Key.newBuilder(PROJECT_ID, "new_kind", "name-01").setNamespace(NAMESPACE).build())
.build();
Entity entity2 =
Entity.newBuilder(
Key.newBuilder(PROJECT_ID, "new_kind", "name-02").setNamespace(NAMESPACE).build())
.build();
Entity entity3 =
Entity.newBuilder(
Key.newBuilder(PROJECT_ID, "new_kind", "name-03").setNamespace(NAMESPACE).build())
.build();

DATASTORE.put(entity1, entity2);
Thread.sleep(1000);
Timestamp now = Timestamp.now();
Thread.sleep(1000);
DATASTORE.put(entity3);

try {
Query<Entity> query = Query.newEntityQueryBuilder().setKind("new_kind").build();

QueryResults<Entity> withoutReadTime = DATASTORE.run(query);
assertTrue(withoutReadTime.hasNext());
assertEquals(entity1, withoutReadTime.next());
assertTrue(withoutReadTime.hasNext());
assertEquals(entity2, withoutReadTime.next());
assertTrue(withoutReadTime.hasNext());
assertEquals(entity3, withoutReadTime.next());
assertFalse(withoutReadTime.hasNext());

QueryResults<Entity> withReadTime = DATASTORE.run(query, ReadOption.readTime(now));
assertTrue(withReadTime.hasNext());
assertEquals(entity1, withReadTime.next());
assertTrue(withReadTime.hasNext());
assertEquals(entity2, withReadTime.next());
assertFalse(withReadTime.hasNext());
} finally {
DATASTORE.delete(entity1.getKey(), entity2.getKey(), entity3.getKey());
}
}
}