Skip to content

Commit

Permalink
test: Replace deprecated method with non-deprecated one
Browse files Browse the repository at this point in the history
Since we have several deprecated method in the ITPubSubTest,
I think we can begin to migrate it to non-deprecated method one.
  • Loading branch information
irvifa committed Jul 4, 2020
1 parent 270bd94 commit 591e0b3
Showing 1 changed file with 72 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,26 @@
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.iam.v1.Binding;
import com.google.iam.v1.GetIamPolicyRequest;
import com.google.iam.v1.Policy;
import com.google.iam.v1.SetIamPolicyRequest;
import com.google.iam.v1.TestIamPermissionsRequest;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.DeleteTopicRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;

import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand All @@ -56,8 +64,8 @@ public class ITPubSubTest {
private static SubscriptionAdminClient subscriptionAdminClient;
private static String projectId;
private static final boolean IS_VPC_TEST =
System.getenv("GOOGLE_CLOUD_TESTS_IN_VPCSC") != null
&& System.getenv("GOOGLE_CLOUD_TESTS_IN_VPCSC").equalsIgnoreCase("true");
System.getenv("GOOGLE_CLOUD_TESTS_IN_VPCSC") != null
&& System.getenv("GOOGLE_CLOUD_TESTS_IN_VPCSC").equalsIgnoreCase("true");

@Rule public Timeout globalTimeout = Timeout.seconds(300);

Expand Down Expand Up @@ -92,91 +100,106 @@ private String formatForTest(String resourceName) {
@Test
public void testTopicPolicy() {
ProjectTopicName topicName =
ProjectTopicName.of(projectId, formatForTest("testing-topic-policy"));
topicAdminClient.createTopic(topicName);
Policy policy = topicAdminClient.getIamPolicy(topicName.toString());
ProjectTopicName.of(projectId, formatForTest("testing-topic-policy"));
topicAdminClient.createTopic(Topic.newBuilder().setName(topicName.toString()).build());
Policy policy =
topicAdminClient.getIamPolicy(GetIamPolicyRequest.newBuilder().setResource(topicName.toString()).build());
Binding binding =
Binding.newBuilder().setRole("roles/viewer").addMembers("allAuthenticatedUsers").build();
Binding.newBuilder().setRole("roles/viewer").addMembers("allAuthenticatedUsers").build();

Policy newPolicy =
topicAdminClient.setIamPolicy(
topicName.toString(), policy.toBuilder().addBindings(binding).build());
topicAdminClient.setIamPolicy(SetIamPolicyRequest
.newBuilder().setResource(topicName.toString()).setPolicy(policy).build());
assertThat(newPolicy.getBindingsList()).contains(binding);

String permissionName = "pubsub.topics.get";
List<String> permissions =
topicAdminClient
.testIamPermissions(topicName.toString(), Collections.singletonList(permissionName))
.getPermissionsList();
topicAdminClient
.testIamPermissions(TestIamPermissionsRequest.newBuilder()
.setResource(topicName.toString())
.addAllPermissions(Collections.singletonList(permissionName))
.build())
.getPermissionsList();
assertThat(permissions).contains(permissionName);

topicAdminClient.deleteTopic(topicName);
topicAdminClient.deleteTopic(
DeleteTopicRequest.newBuilder().setTopic(topicName.toString()).build());
}

@Test
public void testVPCPushSubscriber() {
assumeTrue(IS_VPC_TEST);
ProjectTopicName topicName =
ProjectTopicName.of(projectId, formatForTest("testing-vpc-push-subscriber-topic"));
ProjectTopicName.of(projectId, formatForTest("testing-vpc-push-subscriber-topic"));
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(
projectId, formatForTest("testing-vpc-push-subscriber-subscription"));
topicAdminClient.createTopic(topicName);
ProjectSubscriptionName.of(
projectId, formatForTest("testing-vpc-push-subscriber-subscription"));
topicAdminClient.createTopic(Topic.newBuilder().setName(topicName.toString()).build());

try {
subscriptionAdminClient.createSubscription(
subscriptionName,
topicName,
PushConfig.newBuilder().setPushEndpoint("https://random_point").build(),
10);
Subscription.newBuilder()
.setName(subscriptionName.toString())
.setTopic(topicName.toString())
.setPushConfig(PushConfig.newBuilder().setPushEndpoint("https://random_point").build())
.setAckDeadlineSeconds(10)
.build());
subscriptionAdminClient.deleteSubscription(subscriptionName);
Assert.fail("No exception raised");
} catch (PermissionDeniedException e) {
// expected
}

topicAdminClient.deleteTopic(topicName);
topicAdminClient.deleteTopic(
DeleteTopicRequest.newBuilder().setTopic(topicName.toString()).build());
}

@Test
public void testPublishSubscribe() throws Exception {
ProjectTopicName topicName =
ProjectTopicName.of(projectId, formatForTest("testing-publish-subscribe-topic"));
ProjectTopicName.of(projectId, formatForTest("testing-publish-subscribe-topic"));
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(
projectId, formatForTest("testing-publish-subscribe-subscription"));
ProjectSubscriptionName.of(
projectId, formatForTest("testing-publish-subscribe-subscription"));

topicAdminClient.createTopic(Topic.newBuilder().setName(topicName.toString()).build());

topicAdminClient.createTopic(topicName);
subscriptionAdminClient.createSubscription(
subscriptionName, topicName, PushConfig.newBuilder().build(), 10);
Subscription.newBuilder()
.setName(subscriptionName.toString())
.setTopic(topicName.toString())
.setPushConfig(PushConfig.newBuilder().build())
.setAckDeadlineSeconds(10)
.build());

final BlockingQueue<Object> receiveQueue = new LinkedBlockingQueue<>();
Subscriber subscriber =
Subscriber.newBuilder(
subscriptionName,
new MessageReceiver() {
@Override
public void receiveMessage(
final PubsubMessage message, final AckReplyConsumer consumer) {
receiveQueue.offer(MessageAndConsumer.create(message, consumer));
}
})
.build();
Subscriber.newBuilder(
subscriptionName,
new MessageReceiver() {
@Override
public void receiveMessage(
final PubsubMessage message, final AckReplyConsumer consumer) {
receiveQueue.offer(MessageAndConsumer.create(message, consumer));
}
})
.build();
subscriber.addListener(
new Subscriber.Listener() {
public void failed(Subscriber.State from, Throwable failure) {
receiveQueue.offer(failure);
}
},
MoreExecutors.directExecutor());
new Subscriber.Listener() {
public void failed(Subscriber.State from, Throwable failure) {
receiveQueue.offer(failure);
}
},
MoreExecutors.directExecutor());
subscriber.startAsync();

Publisher publisher = Publisher.newBuilder(topicName).build();
publisher
.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("msg1")).build())
.get();
.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("msg1")).build())
.get();
publisher
.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("msg2")).build())
.get();
.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("msg2")).build())
.get();
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);

Expand All @@ -196,7 +219,8 @@ public void failed(Subscriber.State from, Throwable failure) {

subscriber.stopAsync().awaitTerminated();
subscriptionAdminClient.deleteSubscription(subscriptionName);
topicAdminClient.deleteTopic(topicName);
topicAdminClient.deleteTopic(
DeleteTopicRequest.newBuilder().setTopic(topicName.toString()).build());
}

private MessageAndConsumer pollQueue(BlockingQueue<Object> queue) throws InterruptedException {
Expand All @@ -211,6 +235,6 @@ private MessageAndConsumer pollQueue(BlockingQueue<Object> queue) throws Interru
return (MessageAndConsumer) obj;
}
throw new IllegalStateException(
"expected either MessageAndConsumer or Throwable, found: " + obj);
"expected either MessageAndConsumer or Throwable, found: " + obj);
}
}

0 comments on commit 591e0b3

Please sign in to comment.