Skip to content

Commit

Permalink
feat: Compute cursors for publish and event timestamps (#618)
Browse files Browse the repository at this point in the history
Adds methods to the internal TopicStatsClient for computing cursors for publish or event times.
  • Loading branch information
tmdiep committed Apr 28, 2021
1 parent d7dd200 commit ded756e
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.protobuf.Timestamp;
import java.util.Optional;

public interface TopicStatsClient extends ApiBackgroundResource {
static TopicStatsClient create(TopicStatsClientSettings settings) throws ApiException {
Expand Down Expand Up @@ -57,4 +59,37 @@ ApiFuture<ComputeMessageStatsResponse> computeMessageStats(
* success.
*/
ApiFuture<Cursor> computeHeadCursor(TopicPath path, Partition partition);

/**
* Compute the cursor of the first message with publish time greater than or equal to the
* specified publish time, for a topic partition. All messages thereafter are guaranteed to have
* publish times greater than or equal to the specified publish time.
*
* <p>If such a message cannot be found, the returned Optional will be empty.
*
* @param path The topic to compute cursor for
* @param partition The partition to compute cursor for
* @param publishTime The target publish time
* @return A future that will have either an error {@link ApiException}, an empty Optional or a
* non-null {@link Cursor}.
*/
ApiFuture<Optional<Cursor>> computeCursorForPublishTime(
TopicPath path, Partition partition, Timestamp publishTime);

/**
* Compute the cursor of the first message with event time greater than or equal to the specified
* event time, for a topic partition. If messages are missing an event time, the publish time is
* used as a fallback. As event times are user supplied, subsequent messages may have event times
* less than the specified event time and should be filtered by the client, if necessary.
*
* <p>If such a message cannot be found, the returned Optional will be empty.
*
* @param path The topic to compute cursor for
* @param partition The partition to compute cursor for
* @param eventTime The target event time
* @return A future that will have either an error {@link ApiException}, an empty Optional or a
* non-null {@link Cursor}.
*/
ApiFuture<Optional<Cursor>> computeCursorForEventTime(
TopicPath path, Partition partition, Timestamp eventTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorRequest;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.TimeTarget;
import com.google.cloud.pubsublite.v1.TopicStatsServiceClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Timestamp;
import java.util.Optional;

public class TopicStatsClientImpl extends ApiResourceAggregation implements TopicStatsClient {
private final CloudRegion region;
Expand Down Expand Up @@ -71,4 +75,38 @@ public ApiFuture<Cursor> computeHeadCursor(TopicPath path, Partition partition)
ComputeHeadCursorResponse::getHeadCursor,
MoreExecutors.directExecutor());
}

@Override
public ApiFuture<Optional<Cursor>> computeCursorForPublishTime(
TopicPath path, Partition partition, Timestamp publishTime) {
return computeTimeCursor(
path, partition, TimeTarget.newBuilder().setPublishTime(publishTime).build());
}

@Override
public ApiFuture<Optional<Cursor>> computeCursorForEventTime(
TopicPath path, Partition partition, Timestamp eventTime) {
return computeTimeCursor(
path, partition, TimeTarget.newBuilder().setEventTime(eventTime).build());
}

private ApiFuture<Optional<Cursor>> computeTimeCursor(
TopicPath path, Partition partition, TimeTarget target) {
return ApiFutures.transform(
serviceClient
.computeTimeCursorCallable()
.futureCall(
ComputeTimeCursorRequest.newBuilder()
.setTopic(path.toString())
.setPartition(partition.value())
.setTarget(target)
.build()),
response -> {
if (response.hasCursor()) {
return Optional.of(response.getCursor());
}
return Optional.empty();
},
MoreExecutors.directExecutor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,19 @@
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorResponse;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.TimeTarget;
import com.google.cloud.pubsublite.v1.TopicStatsServiceClient;
import com.google.cloud.pubsublite.v1.stub.TopicStatsServiceStub;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.Optional;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -70,7 +77,15 @@ private static Offset end() {
return Offset.of(2);
}

private static ComputeMessageStatsRequest request() {
private static Timestamp timestamp() {
return Timestamp.newBuilder().setSeconds(1).setNanos(2).build();
}

private static Cursor cursor() {
return Cursor.newBuilder().setOffset(45).build();
}

private static ComputeMessageStatsRequest messageStatsRequest() {
return ComputeMessageStatsRequest.newBuilder()
.setTopic(path().toString())
.setPartition(partition().value())
Expand All @@ -79,19 +94,62 @@ private static ComputeMessageStatsRequest request() {
.build();
}

private static ComputeMessageStatsResponse response() {
private static ComputeMessageStatsResponse messageStatsResponse() {
return ComputeMessageStatsResponse.newBuilder().setMessageBytes(1).setMessageCount(2).build();
}

private static ComputeHeadCursorRequest headCursorRequest() {
return ComputeHeadCursorRequest.newBuilder()
.setTopic(path().toString())
.setPartition(partition().value())
.build();
}

private static ComputeHeadCursorResponse headCursorResponse() {
return ComputeHeadCursorResponse.newBuilder().setHeadCursor(cursor()).build();
}

private static ComputeTimeCursorRequest publishTimeCursorRequest() {
return ComputeTimeCursorRequest.newBuilder()
.setTopic(path().toString())
.setPartition(partition().value())
.setTarget(TimeTarget.newBuilder().setPublishTime(timestamp()))
.build();
}

private static ComputeTimeCursorRequest eventTimeCursorRequest() {
return ComputeTimeCursorRequest.newBuilder()
.setTopic(path().toString())
.setPartition(partition().value())
.setTarget(TimeTarget.newBuilder().setEventTime(timestamp()))
.build();
}

private static ComputeTimeCursorResponse unsetTimeCursorResponse() {
return ComputeTimeCursorResponse.getDefaultInstance();
}

private static ComputeTimeCursorResponse timeCursorResponse() {
return ComputeTimeCursorResponse.newBuilder().setCursor(cursor()).build();
}

@Mock TopicStatsServiceStub stub;
@Mock UnaryCallable<ComputeMessageStatsRequest, ComputeMessageStatsResponse> computeCallable;
@Mock UnaryCallable<ComputeMessageStatsRequest, ComputeMessageStatsResponse> computeStatsCallable;

@Mock
UnaryCallable<ComputeHeadCursorRequest, ComputeHeadCursorResponse> computeHeadCursorCallable;

@Mock
UnaryCallable<ComputeTimeCursorRequest, ComputeTimeCursorResponse> computeTimeCursorCallable;

private TopicStatsClientImpl client;

@Before
public void setUp() throws IOException {
initMocks(this);
when(stub.computeMessageStatsCallable()).thenReturn(computeCallable);
when(stub.computeMessageStatsCallable()).thenReturn(computeStatsCallable);
when(stub.computeHeadCursorCallable()).thenReturn(computeHeadCursorCallable);
when(stub.computeTimeCursorCallable()).thenReturn(computeTimeCursorCallable);
client = new TopicStatsClientImpl(REGION, TopicStatsServiceClient.create(stub));
}

Expand All @@ -108,18 +166,85 @@ public void region_isConstructedRegion() {

@Test
public void computeMessageStats_Ok() throws Exception {
when(computeCallable.futureCall(request())).thenReturn(ApiFutures.immediateFuture(response()));
when(computeStatsCallable.futureCall(messageStatsRequest()))
.thenReturn(ApiFutures.immediateFuture(messageStatsResponse()));
assertThat(client.computeMessageStats(path(), partition(), start(), end()).get())
.isEqualTo(response());
.isEqualTo(messageStatsResponse());
}

@Test
public void computeMessageStats_Error() {
when(computeCallable.futureCall(request()))
when(computeStatsCallable.futureCall(messageStatsRequest()))
.thenReturn(
ApiFutures.immediateFailedFuture(
new CheckedApiException(Code.FAILED_PRECONDITION).underlying));
assertFutureThrowsCode(
client.computeMessageStats(path(), partition(), start(), end()), Code.FAILED_PRECONDITION);
}

@Test
public void computeHeadCursor_Ok() throws Exception {
when(computeHeadCursorCallable.futureCall(headCursorRequest()))
.thenReturn(ApiFutures.immediateFuture(headCursorResponse()));
assertThat(client.computeHeadCursor(path(), partition()).get()).isEqualTo(cursor());
}

@Test
public void computeHeadCursor_Error() {
when(computeHeadCursorCallable.futureCall(headCursorRequest()))
.thenReturn(
ApiFutures.immediateFailedFuture(
new CheckedApiException(Code.FAILED_PRECONDITION).underlying));
assertFutureThrowsCode(client.computeHeadCursor(path(), partition()), Code.FAILED_PRECONDITION);
}

@Test
public void computeCursorForPublishTime_OkPresent() throws Exception {
when(computeTimeCursorCallable.futureCall(publishTimeCursorRequest()))
.thenReturn(ApiFutures.immediateFuture(timeCursorResponse()));
assertThat(client.computeCursorForPublishTime(path(), partition(), timestamp()).get())
.isEqualTo(Optional.of(cursor()));
}

@Test
public void computeCursorForPublishTime_OkUnset() throws Exception {
when(computeTimeCursorCallable.futureCall(publishTimeCursorRequest()))
.thenReturn(ApiFutures.immediateFuture(unsetTimeCursorResponse()));
assertThat(client.computeCursorForPublishTime(path(), partition(), timestamp()).get())
.isEqualTo(Optional.empty());
}

@Test
public void computeCursorForPublishTime_Error() throws Exception {
when(computeTimeCursorCallable.futureCall(publishTimeCursorRequest()))
.thenReturn(
ApiFutures.immediateFailedFuture(new CheckedApiException(Code.UNAVAILABLE).underlying));
assertFutureThrowsCode(
client.computeCursorForPublishTime(path(), partition(), timestamp()), Code.UNAVAILABLE);
}

@Test
public void computeCursorForEventTime_OkPresent() throws Exception {
when(computeTimeCursorCallable.futureCall(eventTimeCursorRequest()))
.thenReturn(ApiFutures.immediateFuture(timeCursorResponse()));
assertThat(client.computeCursorForEventTime(path(), partition(), timestamp()).get())
.isEqualTo(Optional.of(cursor()));
}

@Test
public void computeCursorForEventTime_OkUnset() throws Exception {
when(computeTimeCursorCallable.futureCall(eventTimeCursorRequest()))
.thenReturn(ApiFutures.immediateFuture(unsetTimeCursorResponse()));
assertThat(client.computeCursorForEventTime(path(), partition(), timestamp()).get())
.isEqualTo(Optional.empty());
}

@Test
public void computeCursorForEventTime_Error() throws Exception {
when(computeTimeCursorCallable.futureCall(eventTimeCursorRequest()))
.thenReturn(
ApiFutures.immediateFailedFuture(new CheckedApiException(Code.ABORTED).underlying));
assertFutureThrowsCode(
client.computeCursorForEventTime(path(), partition(), timestamp()), Code.ABORTED);
}
}

0 comments on commit ded756e

Please sign in to comment.