Skip to content
Browse files
feat: add context manager capability to subscriber (#32)
* feat: add context manager capability to subscriber

* Do not use shared subscriber in socket leak test
  • Loading branch information
plamut committed Feb 20, 2020
1 parent 1feaa24 commit b58d0d8e404c0a085b89d3407e6640651e81568c
Showing with 86 additions and 2 deletions.
  1. +16 −0 google/cloud/pubsub_v1/subscriber/
  2. +1 −1
  3. +50 −1 tests/
  4. +19 −0 tests/unit/pubsub_v1/subscriber/
@@ -228,3 +228,19 @@ def callback(message):, on_callback_error=future.set_exception)

return future

def close(self):
"""Close the underlying channel to release socket resources.
After a channel has been closed, the client instance cannot be used
This method is idempotent.

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
@@ -110,7 +110,7 @@ def system(session):

# Install all test dependencies, then install this package into the
# virtualenv's dist-packages.
session.install("mock", "pytest")
session.install("mock", "pytest", "psutil")

session.install("-e", "test_utils")
session.install("-e", ".")
@@ -18,6 +18,7 @@
import itertools
import operator as op
import os
import psutil
import threading
import time

@@ -46,7 +47,7 @@ def publisher():
yield pubsub_v1.PublisherClient()

def subscriber():
yield pubsub_v1.SubscriberClient()

@@ -383,6 +384,54 @@ def test_managing_subscription_iam_policy(
assert bindings[1].members == [""]

def test_subscriber_not_leaking_open_sockets(
publisher, topic_path, subscription_path, cleanup
# Make sure the topic and the supscription get deleted.
# NOTE: Since subscriber client will be closed in the test, we should not
# use the shared `subscriber` fixture, but instead construct a new client
# in this test.
# Also, since the client will get closed, we need another subscriber client
# to clean up the subscription. We also need to make sure that auxiliary
# subscriber releases the sockets, too.
subscriber = pubsub_v1.SubscriberClient()
subscriber_2 = pubsub_v1.SubscriberClient()
cleanup.append((subscriber_2.delete_subscription, subscription_path))

def one_arg_close(subscriber): # the cleanup helper expects exactly one argument

cleanup.append((one_arg_close, subscriber_2))
cleanup.append((publisher.delete_topic, topic_path))

# Create topic before starting to track connection count (any sockets opened
# by the publisher client are not counted by this test).

current_process = psutil.Process()
conn_count_start = len(current_process.connections())

# Publish a few messages, then synchronously pull them and check that
# no sockets are leaked.
with subscriber:
subscriber.create_subscription(name=subscription_path, topic=topic_path)

# Publish a few messages, wait for the publish to succeed.
publish_futures = [
publisher.publish(topic_path, u"message {}".format(i).encode())
for i in range(1, 4)
for future in publish_futures:

# Synchronously pull messages.
response = subscriber.pull(subscription_path, max_messages=3)
assert len(response.received_messages) == 3

conn_count_end = len(current_process.connections())
assert conn_count_end == conn_count_start

class TestStreamingPull(object):
def test_streaming_pull_callback_error_propagation(
self, publisher, topic_path, subscriber, subscription_path, cleanup
@@ -106,3 +106,22 @@ def test_subscribe_options(manager_open):

def test_close():
mock_transport = mock.NonCallableMock()
client = subscriber.Client(transport=mock_transport)


def test_closes_channel_as_context_manager():
mock_transport = mock.NonCallableMock()
client = subscriber.Client(transport=mock_transport)

with client:

0 comments on commit b58d0d8

Please sign in to comment.