Skip to content

Commit

Permalink
Add Publisher flow control span
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu committed Jun 19, 2024
1 parent a7ed13b commit 524a5e1
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 5 deletions.
18 changes: 14 additions & 4 deletions google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import sys
from datetime import datetime
from opentelemetry import trace
from opentelemetry.trace.propagation import set_span_in_context

from google.api_core import gapic_v1
from google.auth.credentials import AnonymousCredentials # type: ignore
Expand Down Expand Up @@ -394,11 +395,10 @@ def publish( # type: ignore[override]
"messaging.message.envelope.size": sys.getsizeof(message),
},
kind=trace.SpanKind.PRODUCER,
# TODO(mk): set end_on_exit=False. So that span
# ends only when publich RPC call is made.
# TODO(mk): end_on_exit=False
# end_on_exit=False,
) as span:
span.add_event(
) as publish_create_span:
publish_create_span.add_event(
name="publish start",
attributes={
"timestamp": str(datetime.now()),
Expand All @@ -408,7 +408,17 @@ def publish( # type: ignore[override]
# Messages should go through flow control to prevent excessive
# queuing on the client side (depending on the settings).
try:
if self._open_telemetry_enabled:
with tracer.start_as_current_span(
name="publisher flow control",
kind=trace.SpanKind.INTERNAL,
context=set_span_in_context(publish_create_span),
end_on_exit=False,
) as publish_flow_control_span:
self._publish_flow_control_span = publish_flow_control_span
self._flow_controller.add(message)
if self._open_telemetry_enabled:
self._publish_flow_control_span.end()
except exceptions.FlowControlLimitError as exc:
future = futures.Future()
future.set_exception(exc)
Expand Down
12 changes: 11 additions & 1 deletion tests/unit/pubsub_v1/publisher/test_publisher_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,12 @@ def test_publish_otel(creds):
trace.set_tracer_provider(provider)

client.publish(TOPIC, b"message")

spans = memory_exporter.get_finished_spans()

assert len(spans) == 1
assert len(spans) == 2

# Verify create span
assert spans[0].name == f"{TOPIC} create"

# Verify attribute values
Expand All @@ -298,6 +300,14 @@ def test_publish_otel(creds):
assert start_event.name == "publish start"
assert "timestamp" in start_event.attributes

# Verify flow control span.
flow_control_span = spans[1]
assert flow_control_span.name == "publisher flow control"
assert flow_control_span.kind == trace.SpanKind.INTERNAL

# Verify that flow control span is a child of the publish create span.
assert flow_control_span._parent[1] == spans[0]._context[1]


def test_publish_error_exceeding_flow_control_limits(creds):
publisher_options = types.PublisherOptions(
Expand Down

0 comments on commit 524a5e1

Please sign in to comment.