@@ -31,18 +31,22 @@
import com .google .api .gax .core .FixedExecutorProvider ;
import com .google .api .gax .core .InstantiatingExecutorProvider ;
import com .google .api .gax .core .NoCredentialsProvider ;
import com .google .api .gax .grpc .GrpcTransportChannel ;
import com .google .api .gax .grpc .testing .LocalChannelProvider ;
import com .google .api .gax .rpc .DataLossException ;
import com .google .api .gax .rpc .FixedTransportChannelProvider ;
import com .google .api .gax .rpc .TransportChannelProvider ;
import com .google .cloud .pubsub .v1 .Publisher .Builder ;
import com .google .protobuf .ByteString ;
import com .google .pubsub .v1 .ProjectTopicName ;
import com .google .pubsub .v1 .PublishRequest ;
import com .google .pubsub .v1 .PublishResponse ;
import com .google .pubsub .v1 .PubsubMessage ;
import io .grpc .ManagedChannel ;
import io .grpc .Server ;
import io .grpc .Status ;
import io .grpc .StatusException ;
import io .grpc .inprocess .InProcessChannelBuilder ;
import io .grpc .inprocess .InProcessServerBuilder ;
import java .util .List ;
import java .util .concurrent .CountDownLatch ;
@@ -75,6 +79,8 @@ public class PublisherImplTest {
private FakePublisherServiceImpl testPublisherServiceImpl ;
private ManagedChannel testChannel ;
private Server testServer ;
@ Before
@@ -84,6 +90,7 @@ public void setUp() throws Exception {
InProcessServerBuilder serverBuilder = InProcessServerBuilder .forName ("test-server" );
serverBuilder .addService (testPublisherServiceImpl );
testServer = serverBuilder .build ();
testChannel = InProcessChannelBuilder .forName ("test-server" ).build ();
testServer .start ();
fakeExecutor = new FakeScheduledExecutorService ();
@@ -92,6 +99,7 @@ public void setUp() throws Exception {
@ After
public void tearDown () throws Exception {
testServer .shutdownNow ().awaitTermination ();
testChannel .shutdown ();
}
@ Test
@@ -122,8 +130,7 @@ public void testPublishByDuration() throws Exception {
assertEquals ("2" , publishFuture2 .get ());
assertEquals (2 , testPublisherServiceImpl .getCapturedRequests ().get (0 ).getMessagesCount ());
publisher .shutdown ();
publisher .awaitTermination (1 , TimeUnit .MINUTES );
shutdownTestPublisher (publisher );
}
@ Test
@@ -160,8 +167,9 @@ public void testPublishByNumBatchedMessages() throws Exception {
assertEquals (2 , testPublisherServiceImpl .getCapturedRequests ().get (0 ).getMessagesCount ());
assertEquals (2 , testPublisherServiceImpl .getCapturedRequests ().get (1 ).getMessagesCount ());
publisher .shutdown ();
publisher .awaitTermination (1 , TimeUnit .MINUTES );
fakeExecutor .advanceTime (Duration .ofSeconds (100 ));
shutdownTestPublisher (publisher );
}
@ Test
@@ -195,8 +203,9 @@ public void testSinglePublishByNumBytes() throws Exception {
assertEquals ("4" , publishFuture4 .get ());
assertEquals (2 , testPublisherServiceImpl .getCapturedRequests ().size ());
publisher .shutdown ();
publisher .awaitTermination (1 , TimeUnit .MINUTES );
fakeExecutor .advanceTime (Duration .ofSeconds (100 ));
shutdownTestPublisher (publisher );
}
@ Test
@@ -219,15 +228,16 @@ public void testPublishByShutdown() throws Exception {
// Note we are not advancing time or reaching the count threshold but messages should
// still get published by call to shutdown
publisher .shutdown ();
publisher .awaitTermination (1 , TimeUnit .MINUTES );
// Verify the publishes completed
assertTrue (publishFuture1 .isDone ());
assertTrue (publishFuture2 .isDone ());
assertEquals ("1" , publishFuture1 .get ());
assertEquals ("2" , publishFuture2 .get ());
fakeExecutor .advanceTime (Duration .ofSeconds (100 ));
publisher .awaitTermination (1 , TimeUnit .MINUTES );
}
@ Test
@@ -269,8 +279,7 @@ public void testPublishMixedSizeAndDuration() throws Exception {
assertEquals (2 , testPublisherServiceImpl .getCapturedRequests ().get (0 ).getMessagesCount ());
assertEquals (1 , testPublisherServiceImpl .getCapturedRequests ().get (1 ).getMessagesCount ());
publisher .shutdown ();
publisher .awaitTermination (1 , TimeUnit .MINUTES );
shutdownTestPublisher (publisher );
}
private ApiFuture <String > sendTestMessage (Publisher publisher , String data ) {
@@ -326,7 +335,9 @@ public void testBatchedMessagesWithOrderingKeyByNum() throws Exception {
}
}
}
publisher .shutdown ();
fakeExecutor .advanceTime (Duration .ofSeconds (100 ));
shutdownTestPublisher (publisher );
}
@ Test
@@ -389,7 +400,7 @@ public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception {
}
}
}
publisher . shutdown ( );
shutdownTestPublisher ( publisher );
}
@ Test
@@ -418,7 +429,8 @@ public void testLargeMessagesDoNotReorderBatches() throws Exception {
// Verify that messages with "OrderB" were delivered in order.
assertTrue (Integer .parseInt (publishFuture2 .get ()) < Integer .parseInt (publishFuture3 .get ()));
publisher .shutdown ();
fakeExecutor .advanceTime (Duration .ofSeconds (100 ));
shutdownTestPublisher (publisher );
}
@ Test
@@ -431,7 +443,7 @@ public void testOrderingKeyWhenDisabled_throwsException() throws Exception {
} catch (IllegalStateException expected ) {
// expected
}
publisher . shutdown ( );
shutdownTestPublisher ( publisher );
}
@ Test
@@ -461,6 +473,7 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception {
assertEquals (4 , testPublisherServiceImpl .getCapturedRequests ().size ());
publisher .shutdown ();
assertTrue (publisher .awaitTermination (1 , TimeUnit .MINUTES ));
}
@ Test
@@ -550,7 +563,7 @@ public void testResumePublish() throws Exception {
Assert .assertEquals ("7" , future7 .get ());
Assert .assertEquals ("8" , future8 .get ());
publisher . shutdown ( );
shutdownTestPublisher ( publisher );
}
private ApiFuture <String > sendTestMessageWithOrderingKey (
@@ -604,8 +617,7 @@ public void testPublishFailureRetries() throws Exception {
assertEquals ("1" , publishFuture1 .get ());
assertEquals (2 , testPublisherServiceImpl .getCapturedRequests ().size ());
publisher .shutdown ();
publisher .awaitTermination (1 , TimeUnit .MINUTES );
shutdownTestPublisher (publisher );
}
@ Test (expected = ExecutionException .class )
@@ -629,8 +641,7 @@ public void testPublishFailureRetries_retriesDisabled() throws Exception {
publishFuture1 .get ();
} finally {
assertSame (testPublisherServiceImpl .getCapturedRequests ().size (), 1 );
publisher .shutdown ();
publisher .awaitTermination (1 , TimeUnit .MINUTES );
shutdownTestPublisher (publisher );
}
}
@@ -656,8 +667,7 @@ public void testPublishFailureRetries_maxRetriesSetup() throws Exception {
assertEquals ("1" , publishFuture1 .get ());
assertEquals (3 , testPublisherServiceImpl .getCapturedRequests ().size ());
publisher .shutdown ();
publisher .awaitTermination (1 , TimeUnit .MINUTES );
shutdownTestPublisher (publisher );
}
@ Test
@@ -683,7 +693,7 @@ public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception
assertEquals (3 , testPublisherServiceImpl .getCapturedRequests ().size ());
publisher .shutdown ();
publisher .awaitTermination (1 , TimeUnit .MINUTES );
assertTrue ( publisher .awaitTermination (1 , TimeUnit .MINUTES ) );
}
@ Test (expected = ExecutionException .class )
@@ -712,14 +722,15 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce
} finally {
assertTrue (testPublisherServiceImpl .getCapturedRequests ().size () >= 1 );
publisher .shutdown ();
publisher .awaitTermination (1 , TimeUnit .MINUTES );
assertTrue ( publisher .awaitTermination (1 , TimeUnit .MINUTES ) );
}
}
@ Test
public void testPublisherGetters () throws Exception {
Publisher .Builder builder = Publisher .newBuilder (TEST_TOPIC );
builder .setChannelProvider (TEST_CHANNEL_PROVIDER );
builder .setChannelProvider (
FixedTransportChannelProvider .create (GrpcTransportChannel .create (testChannel )));
builder .setExecutorProvider (SINGLE_THREAD_EXECUTOR );
builder .setBatchingSettings (
BatchingSettings .newBuilder ()
@@ -735,7 +746,7 @@ public void testPublisherGetters() throws Exception {
assertEquals (Duration .ofMillis (11 ), publisher .getBatchingSettings ().getDelayThreshold ());
assertEquals (12 , (long ) publisher .getBatchingSettings ().getElementCountThreshold ());
publisher .shutdown ();
publisher .awaitTermination (1 , TimeUnit .MINUTES );
assertTrue ( publisher .awaitTermination (1 , TimeUnit .MINUTES ) );
}
@ Test
@@ -1115,7 +1126,14 @@ public void run() {
private Builder getTestPublisherBuilder () {
return Publisher .newBuilder (TEST_TOPIC )
.setExecutorProvider (FixedExecutorProvider .create (fakeExecutor ))
.setChannelProvider (TEST_CHANNEL_PROVIDER )
.setChannelProvider (
FixedTransportChannelProvider .create (GrpcTransportChannel .create (testChannel )))
.setCredentialsProvider (NoCredentialsProvider .create ());
}
private void shutdownTestPublisher (Publisher publisher ) throws InterruptedException {
publisher .shutdown ();
fakeExecutor .advanceTime (Duration .ofSeconds (10 ));
assertTrue (publisher .awaitTermination (1 , TimeUnit .MINUTES ));
}
}