Skip to content

Commit

Permalink
feat: Implement Assigner, which delivers partition assignments to a P…
Browse files Browse the repository at this point in the history
…artitionAssignmentReceiver (#133)

* [feat] Import and build new Pub/Sub Lite assignment protos.

Also fix a build issue and reformat all java files.

* [feat] Implement Assigner, which delivers partition assignments to a PartitionAssignmentReceiver.

* Add a comment to handleInitialResponse
  • Loading branch information
dpcollins-google committed Jun 15, 2020
1 parent 2d5242f commit a4485d9
Show file tree
Hide file tree
Showing 7 changed files with 377 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.ApiService;

/** An Assigner is responsible for handling partition assignments for a subscribing client. */
public interface Assigner extends ApiService {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal.wire;

import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest;
import com.google.cloud.pubsublite.proto.PartitionAssignment;
import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest;
import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc.PartitionAssignmentServiceStub;
import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Status;
import io.grpc.StatusException;

public class AssignerImpl extends ProxyService
implements Assigner, RetryingConnectionObserver<PartitionAssignment> {
@GuardedBy("monitor.monitor")
private final RetryingConnection<ConnectedAssigner> connection;

@GuardedBy("monitor.monitor")
private final PartitionAssignmentReceiver receiver;

private final CloseableMonitor monitor = new CloseableMonitor();

@VisibleForTesting
AssignerImpl(
PartitionAssignmentServiceStub stub,
ConnectedAssignerFactory factory,
InitialPartitionAssignmentRequest initialRequest,
PartitionAssignmentReceiver receiver)
throws StatusException {
this.receiver = receiver;
this.connection =
new RetryingConnectionImpl<>(
stub::assignPartitions,
factory,
PartitionAssignmentRequest.newBuilder().setInitial(initialRequest).build(),
this);
addServices(this.connection);
}

@Override
protected void start() {}

@Override
protected void stop() {}

@Override
protected void handlePermanentError(StatusException error) {}

@Override
public void triggerReinitialize() {
try (CloseableMonitor.Hold h = monitor.enter()) {
connection.reinitialize();
}
}

@Override
public Status onClientResponse(PartitionAssignment value) {
try (CloseableMonitor.Hold h = monitor.enter()) {
receiver.DeliverAssignment(value);
connection.modifyConnection(connectionOr -> connectionOr.ifPresent(ConnectedAssigner::ack));
} catch (StatusException e) {
return e.getStatus();
}
return Status.OK;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal.wire;

public interface ConnectedAssigner extends AutoCloseable {
// Acknowledge an outstanding assignment.
void ack();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal.wire;

import com.google.cloud.pubsublite.proto.PartitionAssignment;
import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest;

interface ConnectedAssignerFactory
extends SingleConnectionFactory<
PartitionAssignmentRequest, PartitionAssignment, PartitionAssignment, ConnectedAssigner> {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.Preconditions.checkState;

import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.proto.PartitionAssignment;
import com.google.cloud.pubsublite.proto.PartitionAssignmentAck;
import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.stub.StreamObserver;

public class ConnectedAssignerImpl
extends SingleConnection<PartitionAssignmentRequest, PartitionAssignment, PartitionAssignment>
implements ConnectedAssigner {
private final CloseableMonitor monitor = new CloseableMonitor();

@GuardedBy("monitor.monitor")
boolean outstanding = false;

private ConnectedAssignerImpl(
StreamFactory<PartitionAssignmentRequest, PartitionAssignment> streamFactory,
StreamObserver<PartitionAssignment> clientStream,
PartitionAssignmentRequest initialRequest) {
super(streamFactory, clientStream);
initialize(initialRequest);
}

static class Factory implements ConnectedAssignerFactory {
@Override
public ConnectedAssigner New(
StreamFactory<PartitionAssignmentRequest, PartitionAssignment> streamFactory,
StreamObserver<PartitionAssignment> clientStream,
PartitionAssignmentRequest initialRequest) {
return new ConnectedAssignerImpl(streamFactory, clientStream, initialRequest);
}
}

// SingleConnection implementation.
@Override
protected Status handleInitialResponse(PartitionAssignment response) {
// The assignment stream is server-initiated by sending a PartitionAssignment. The
// initial response from the server is handled identically to other responses.
return handleStreamResponse(response);
}

@Override
protected Status handleStreamResponse(PartitionAssignment response) {
try (CloseableMonitor.Hold h = monitor.enter()) {
checkState(
!outstanding,
"Received assignment from the server while there was an assignment outstanding.");
outstanding = true;
} catch (StatusException e) {
return e.getStatus();
}
sendToClient(response);
return Status.OK;
}

// ConnectedAssigner implementation.
@Override
public void ack() {
try (CloseableMonitor.Hold h = monitor.enter()) {
checkState(outstanding, "Client acknowledged when there was no request outstanding.");
outstanding = false;
} catch (StatusException e) {
setError(e.getStatus());
}
sendToStream(
PartitionAssignmentRequest.newBuilder()
.setAck(PartitionAssignmentAck.getDefaultInstance())
.build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal.wire;

import com.google.cloud.pubsublite.proto.PartitionAssignment;

/**
* A receiver for partition assignments. All updates to reflect the assignment should be performed
* inline.
*/
public interface PartitionAssignmentReceiver {
void DeliverAssignment(PartitionAssignment assignment);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal.wire;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import com.google.api.core.ApiService.Listener;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPaths;
import com.google.cloud.pubsublite.internal.FakeApiService;
import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest;
import com.google.cloud.pubsublite.proto.PartitionAssignment;
import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest;
import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc;
import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc.PartitionAssignmentServiceStub;
import io.grpc.ManagedChannel;
import io.grpc.StatusException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;

@RunWith(JUnit4.class)
public class AssignerImplTest {
@Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();

private static PartitionAssignmentRequest initialRequest() {
try {
return PartitionAssignmentRequest.newBuilder()
.setInitial(
InitialPartitionAssignmentRequest.newBuilder()
.setSubscription(
SubscriptionPaths.newBuilder()
.setProjectNumber(ProjectNumber.of(12345))
.setZone(CloudZone.of(CloudRegion.of("us-east1"), 'a'))
.setSubscriptionName(SubscriptionName.of("some_subscription"))
.build()
.value()))
.build();
} catch (StatusException e) {
throw e.getStatus().asRuntimeException();
}
}

abstract static class ConnectedAssignerFakeService extends FakeApiService
implements ConnectedAssigner {}

@Spy private ConnectedAssignerFakeService connectedAssigner;
@Mock private ConnectedAssignerFactory assignerFactory;

@Mock private PartitionAssignmentReceiver receiver;
@Mock private Listener permanentErrorHandler;

private Assigner assigner;
private StreamObserver<PartitionAssignment> leakedResponseObserver;

@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
doAnswer(
args -> {
leakedResponseObserver = args.getArgument(1);
return connectedAssigner;
})
.when(assignerFactory)
.New(any(), any(), eq(initialRequest()));
ManagedChannel channel =
grpcCleanup.register(
InProcessChannelBuilder.forName("localhost:12345").directExecutor().build());
PartitionAssignmentServiceStub unusedStub = PartitionAssignmentServiceGrpc.newStub(channel);
assigner =
new AssignerImpl(unusedStub, assignerFactory, initialRequest().getInitial(), receiver);
}

@Test
public void construct_CallsFactoryNew() {
verifyNoMoreInteractions(assignerFactory);
verifyNoMoreInteractions(connectedAssigner);
}
}

0 comments on commit a4485d9

Please sign in to comment.