diff --git a/package.json b/package.json index f4230960d..22a901a85 100644 --- a/package.json +++ b/package.json @@ -51,8 +51,8 @@ "@google-cloud/precise-date": "^3.0.0", "@google-cloud/projectify": "^3.0.0", "@google-cloud/promisify": "^2.0.0", - "@opentelemetry/api": "^1.0.0", - "@opentelemetry/semantic-conventions": "~1.3.0", + "@opentelemetry/api": "~1.1.0", + "@opentelemetry/semantic-conventions": "~1.3.1", "@types/duplexify": "^3.6.0", "@types/long": "^4.0.0", "arrify": "^2.0.0", @@ -66,7 +66,8 @@ }, "devDependencies": { "@grpc/proto-loader": "^0.7.0", - "@opentelemetry/tracing": "^0.24.0", + "@opentelemetry/core": "~1.3.1", + "@opentelemetry/sdk-trace-base": "~1.3.1", "@types/execa": "^0.9.0", "@types/extend": "^3.0.0", "@types/lodash.snakecase": "^4.1.6", diff --git a/samples/openTelemetryTracing.js b/samples/openTelemetryTracing.js index dae84bdcf..69107bd7f 100644 --- a/samples/openTelemetryTracing.js +++ b/samples/openTelemetryTracing.js @@ -1,18 +1,20 @@ -/*! - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +// Copyright 2020-2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is a generated sample, using the typeless sample bot. Please +// look for the source TypeScript sample (.ts) for modifications. +'use strict'; /** * This sample demonstrates how to add OpenTelemetry tracing to the @@ -22,8 +24,6 @@ * at https://cloud.google.com/pubsub/docs. */ -'use strict'; - // sample-metadata: // title: OpenTelemetry Tracing // description: Demonstrates how to enable OpenTelemetry tracing in @@ -32,103 +32,111 @@ const SUBSCRIBER_TIMEOUT = 10; +// [START opentelemetry_tracing] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const topicNameOrId = 'YOUR_TOPIC_OR_ID'; +// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID'; +// const data = 'Hello, world!"; + +// Imports the Google Cloud client library +const {PubSub} = require('@google-cloud/pubsub'); + +// Imports the OpenTelemetry API +const otel = require('@opentelemetry/sdk-trace-node'); +const {diag, DiagConsoleLogger, DiagLogLevel} = require('@opentelemetry/api'); +const {NodeTracerProvider} = otel; +const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); + +// To output to the console for testing, use the ConsoleSpanExporter. +const {ConsoleSpanExporter} = require('@opentelemetry/sdk-trace-base'); + +// To output to Cloud Trace, import the OpenTelemetry bridge library. +// import {TraceExporter} from '@google-cloud/opentelemetry-cloud-trace-exporter'; + +const {Resource} = require('@opentelemetry/resources'); +const { + SemanticResourceAttributes, +} = require('@opentelemetry/semantic-conventions'); + +// Enable the diagnostic logger for OpenTelemetry +diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG); + +// Log spans out to the console, for testing. +const exporter = new ConsoleSpanExporter(); + +// Log spans out to Cloud Trace, for production. +// const exporter = new TraceExporter(); + +// Build a tracer provider and a span processor to do +// something with the spans we're generating. +const provider = new NodeTracerProvider({ + resource: new Resource({ + [SemanticResourceAttributes.SERVICE_NAME]: 'otel example', + }), +}); +const processor = new SimpleSpanProcessor(exporter); +provider.addSpanProcessor(processor); +provider.register(); + +// Creates a client; cache this for further use. +const pubSubClient = new PubSub(); + +async function publishMessage(topicNameOrId, data) { + // Publishes the message as a string, e.g. "Hello, world!" + // or JSON.stringify(someObject) + const dataBuffer = Buffer.from(data); + const publisher = pubSubClient.topic(topicNameOrId); + const messageId = await publisher.publishMessage({data: dataBuffer}); + console.log(`Message ${messageId} published.`); +} + +async function subscriptionListen(subscriptionNameOrId) { + const subscriber = pubSubClient.subscription(subscriptionNameOrId); + + // Message handler for subscriber + const messageHandler = async message => { + console.log(`Message ${message.id} received.`); + message.ack(); + + // Ensure that all spans got flushed by the exporter + console.log('Cleaning up OpenTelemetry exporter...'); + await processor.forceFlush(); + await subscriber.close(); + }; + + const errorHandler = async error => { + console.log('Received error:', error); + + console.log('Cleaning up OpenTelemetry exporter...'); + await processor.forceFlush(); + await subscriber.close(); + }; + + // Listens for new messages from the topic + subscriber.on('message', messageHandler); + subscriber.on('error', errorHandler); + + // Wait a bit for the subscription to receive messages. + // For the sample only. + setTimeout(() => { + subscriber.removeAllListeners(); + }, SUBSCRIBER_TIMEOUT * 1000); +} +// [END opentelemetry_tracing] + function main( topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID', data = 'Hello, world!' ) { - // [START opentelemetry_tracing] - /** - * TODO(developer): Uncomment these variables before running the sample. - */ - // const topicNameOrId = 'YOUR_TOPIC_OR_ID'; - // const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID'; - // const data = 'Hello, world!"; - - // Imports the Google Cloud client library - const {PubSub} = require('@google-cloud/pubsub'); - - // Imports the OpenTelemetry API - const opentelemetry = require('@opentelemetry/api'); - - // Imports the OpenTelemetry span handlers and exporter - const { - SimpleSpanProcessor, - BasicTracerProvider, - ConsoleSpanExporter, - } = require('@opentelemetry/tracing'); - - // Set up span processing and specify the console as the span exporter - const provider = new BasicTracerProvider(); - const exporter = new ConsoleSpanExporter(); - provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); - // Enable the diagnostic logger for Opentelemetry - opentelemetry.diag.setLogger( - new opentelemetry.DiagConsoleLogger(), - opentelemetry.DiagLogLevel.INFO - ); - - provider.register(); - - // OpenTelemetry tracing is an optional feature and can be enabled by setting - // enableOpenTelemetryTracing as a publisher or subscriber option - const enableOpenTelemetryTracing = { - enableOpenTelemetryTracing: true, - }; - - // Creates a client; cache this for further use - const pubSubClient = new PubSub(); - - async function publishMessage() { - // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) - const dataBuffer = Buffer.from(data); - const messageId = await pubSubClient - .topic(topicNameOrId, enableOpenTelemetryTracing) - .publish(dataBuffer); - console.log(`Message ${messageId} published.`); - } - - async function subscriptionListen() { - // Message handler for subscriber - const messageHandler = message => { - console.log(`Message ${message.id} received.`); - message.ack(); - - // Ensure that all spans got flushed by the exporter - console.log('Cleaning up Opentelemetry exporter...'); - exporter.shutdown().then(() => { - // Cleaned up exporter. - process.exit(0); - }); - }; - - const errorHandler = error => { - console.log('Received error:', error); - - console.log('Cleaning up Opentelemetry exporter...'); - exporter.shutdown().then(() => { - // Cleaned up exporter. - process.exit(0); - }); - }; - - // Listens for new messages from the topic - pubSubClient - .subscription(subscriptionNameOrId, enableOpenTelemetryTracing) - .on('message', messageHandler); - pubSubClient - .subscription(subscriptionNameOrId, enableOpenTelemetryTracing) - .on('error', errorHandler); - - setTimeout(() => { - pubSubClient - .subscription(subscriptionNameOrId, enableOpenTelemetryTracing) - .removeAllListeners(); - }, SUBSCRIBER_TIMEOUT * 1000); - } - - publishMessage().then(subscriptionListen()); - // [END opentelemetry_tracing] + publishMessage(topicNameOrId, data) + .then(() => subscriptionListen(subscriptionNameOrId)) + .catch(err => { + console.error(err.message); + process.exitCode = 1; + }); } main(...process.argv.slice(2)); diff --git a/samples/package.json b/samples/package.json index a1571ab0e..30abe4ad0 100644 --- a/samples/package.json +++ b/samples/package.json @@ -21,9 +21,13 @@ "precompile": "npm run clean" }, "dependencies": { + "@google-cloud/opentelemetry-cloud-trace-exporter": "^2.0.0", "@google-cloud/pubsub": "^3.7.4", - "@opentelemetry/api": "^1.0.0", - "@opentelemetry/tracing": "^0.24.0", + "@opentelemetry/api": "~1.1.0", + "@opentelemetry/resources": "~1.3.1", + "@opentelemetry/sdk-trace-base": "~1.3.1", + "@opentelemetry/sdk-trace-node": "~1.3.1", + "@opentelemetry/semantic-conventions": "~1.3.1", "avro-js": "^1.10.1", "p-defer": "^3.0.0", "protobufjs": "^7.0.0" diff --git a/samples/system-test/openTelemetryTracing.test.ts b/samples/system-test/openTelemetryTracing.test.ts index 90d101ab5..6713a5f07 100644 --- a/samples/system-test/openTelemetryTracing.test.ts +++ b/samples/system-test/openTelemetryTracing.test.ts @@ -49,6 +49,8 @@ describe('openTelemetry', () => { ); assert.match(stdout, /Message .* published./); assert.match(stdout, /Message .* received/); + assert.match(stdout, /send/); + assert.match(stdout, /receive/); assert.notMatch(stdout, /Received error/); }); }); diff --git a/samples/typescript/openTelemetryTracing.ts b/samples/typescript/openTelemetryTracing.ts new file mode 100644 index 000000000..c089f7492 --- /dev/null +++ b/samples/typescript/openTelemetryTracing.ts @@ -0,0 +1,136 @@ +// Copyright 2020-2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This sample demonstrates how to add OpenTelemetry tracing to the + * Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: OpenTelemetry Tracing +// description: Demonstrates how to enable OpenTelemetry tracing in +// a publisher or subscriber. +// usage: node openTelemetryTracing.js + +const SUBSCRIBER_TIMEOUT = 10; + +// [START opentelemetry_tracing] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const topicNameOrId = 'YOUR_TOPIC_OR_ID'; +// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID'; +// const data = 'Hello, world!"; + +// Imports the Google Cloud client library +import {Message, PubSub} from '@google-cloud/pubsub'; + +// Imports the OpenTelemetry API +import * as otel from '@opentelemetry/sdk-trace-node'; +import {diag, DiagConsoleLogger, DiagLogLevel} from '@opentelemetry/api'; +const {NodeTracerProvider} = otel; +import {SimpleSpanProcessor} from '@opentelemetry/sdk-trace-base'; + +// To output to the console for testing, use the ConsoleSpanExporter. +import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base'; + +// To output to Cloud Trace, import the OpenTelemetry bridge library. +// import {TraceExporter} from '@google-cloud/opentelemetry-cloud-trace-exporter'; + +import {Resource} from '@opentelemetry/resources'; +import {SemanticResourceAttributes} from '@opentelemetry/semantic-conventions'; + +// Enable the diagnostic logger for OpenTelemetry +diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG); + +// Log spans out to the console, for testing. +const exporter = new ConsoleSpanExporter(); + +// Log spans out to Cloud Trace, for production. +// const exporter = new TraceExporter(); + +// Build a tracer provider and a span processor to do +// something with the spans we're generating. +const provider = new NodeTracerProvider({ + resource: new Resource({ + [SemanticResourceAttributes.SERVICE_NAME]: 'otel example', + }), +}); +const processor = new SimpleSpanProcessor(exporter); +provider.addSpanProcessor(processor); +provider.register(); + +// Creates a client; cache this for further use. +const pubSubClient = new PubSub(); + +async function publishMessage(topicNameOrId: string, data: string) { + // Publishes the message as a string, e.g. "Hello, world!" + // or JSON.stringify(someObject) + const dataBuffer = Buffer.from(data); + const publisher = pubSubClient.topic(topicNameOrId); + const messageId = await publisher.publishMessage({data: dataBuffer}); + console.log(`Message ${messageId} published.`); +} + +async function subscriptionListen(subscriptionNameOrId: string) { + const subscriber = pubSubClient.subscription(subscriptionNameOrId); + + // Message handler for subscriber + const messageHandler = async (message: Message) => { + console.log(`Message ${message.id} received.`); + message.ack(); + + // Ensure that all spans got flushed by the exporter + console.log('Cleaning up OpenTelemetry exporter...'); + await processor.forceFlush(); + await subscriber.close(); + }; + + const errorHandler = async (error: Error) => { + console.log('Received error:', error); + + console.log('Cleaning up OpenTelemetry exporter...'); + await processor.forceFlush(); + await subscriber.close(); + }; + + // Listens for new messages from the topic + subscriber.on('message', messageHandler); + subscriber.on('error', errorHandler); + + // Wait a bit for the subscription to receive messages. + // For the sample only. + setTimeout(() => { + subscriber.removeAllListeners(); + }, SUBSCRIBER_TIMEOUT * 1000); +} +// [END opentelemetry_tracing] + +function main( + topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', + subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID', + data = 'Hello, world!' +) { + publishMessage(topicNameOrId, data) + .then(() => subscriptionListen(subscriptionNameOrId)) + .catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/src/iam.ts b/src/iam.ts index ea6128be3..db4da053e 100644 --- a/src/iam.ts +++ b/src/iam.ts @@ -34,6 +34,15 @@ export type SetPolicyCallback = RequestCallback; export type SetPolicyResponse = [Policy]; export type GetPolicyResponse = [Policy]; +/** + * Allows us to get the most up to date full name of an object. + * + * @private + */ +export interface Nameable { + name: string; +} + /** * Shows which IAM permissions is allowed. * The key to this object are the IAM permissions (string) and the values are @@ -95,12 +104,22 @@ export type TestIamPermissionsCallback = ResourceCallback< export class IAM { pubsub: PubSub; request: typeof PubSub.prototype.request; - id: string; + private nameable_: Nameable; - constructor(pubsub: PubSub, id: string) { + constructor(pubsub: PubSub, nameOrNameable: Nameable | string) { this.pubsub = pubsub; this.request = pubsub.request.bind(pubsub); - this.id = id; + if (typeof nameOrNameable === 'string') { + this.nameable_ = { + name: nameOrNameable, + }; + } else { + this.nameable_ = nameOrNameable; + } + } + + get id(): string { + return this.nameable_.name; } /** diff --git a/src/index.ts b/src/index.ts index aa80682ba..abc8afb08 100644 --- a/src/index.ts +++ b/src/index.ts @@ -184,3 +184,8 @@ if (process.env.DEBUG_GRPC) { } import * as protos from '../protos/protos'; export {protos}; + +// Deprecated; please see the updated OpenTelemetry sample +// for an example of how to use telemetry in this library. +import {legacyExports} from './telemetry-tracing'; +export {legacyExports as openTelemetry}; diff --git a/src/lease-manager.ts b/src/lease-manager.ts index 084f4b759..73043b765 100644 --- a/src/lease-manager.ts +++ b/src/lease-manager.ts @@ -17,6 +17,7 @@ import {EventEmitter} from 'events'; import {AckError, Message, Subscriber} from './subscriber'; import {defaultOptions} from './default-options'; +import {Duration} from './temporal'; export interface FlowControlOptions { allowExcessMessages?: boolean; @@ -104,6 +105,8 @@ export class LeaseManager extends EventEmitter { this._messages.add(message); this.bytes += message.length; + message.subSpans.flowStart(); + if (allowExcessMessages! || !wasFull) { this._dispense(message); } else { @@ -127,6 +130,9 @@ export class LeaseManager extends EventEmitter { const wasFull = this.isFull(); this._pending = []; + this._messages.forEach(m => { + m.endParentSpan(); + }); this._messages.clear(); this.bytes = 0; @@ -156,6 +162,9 @@ export class LeaseManager extends EventEmitter { * @private */ remove(message: Message): void { + // The subscriber span ends when it leaves leasing. + message.endParentSpan(); + if (!this._messages.has(message)) { return; } @@ -240,7 +249,10 @@ export class LeaseManager extends EventEmitter { */ private _dispense(message: Message): void { if (this._subscriber.isOpen) { - process.nextTick(() => this._subscriber.emit('message', message)); + message.subSpans.flowEnd(); + process.nextTick(() => { + this._subscriber.emit('message', message); + }); } } /** @@ -257,15 +269,23 @@ export class LeaseManager extends EventEmitter { const lifespan = (Date.now() - message.received) / (60 * 1000); if (lifespan < this._options.maxExtensionMinutes!) { + message.subSpans.modAckStart(Duration.from({seconds: deadline}), false); + if (this._subscriber.isExactlyOnceDelivery) { - message.modAckWithResponse(deadline).catch(e => { - // In the case of a permanent failure (temporary failures are retried), - // we need to stop trying to lease-manage the message. - message.ackFailed(e as AckError); - this.remove(message); - }); + message + .modAckWithResponse(deadline) + .catch(e => { + // In the case of a permanent failure (temporary failures are retried), + // we need to stop trying to lease-manage the message. + message.ackFailed(e as AckError); + this.remove(message); + }) + .finally(() => { + message.subSpans.modAckStop(); + }); } else { message.modAck(deadline); + message.subSpans.modAckStop(); } } else { this.remove(message); diff --git a/src/opentelemetry-tracing.ts b/src/opentelemetry-tracing.ts deleted file mode 100644 index df3836f72..000000000 --- a/src/opentelemetry-tracing.ts +++ /dev/null @@ -1,61 +0,0 @@ -/*! - * Copyright 2020 Google LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { - Tracer, - SpanAttributes, - SpanContext, - Span, - context, - trace, - SpanKind, -} from '@opentelemetry/api'; - -// eslint-disable-next-line @typescript-eslint/no-var-requires -const PKG = require('../../package.json'); - -/** - * @internal - * Instantiates a Opentelemetry tracer for the library - */ -const libraryTracer: Tracer = trace.getTracer( - '@google-cloud/pubsub', - PKG.version -); - -/** - * Creates a new span with the given properties - * - * @param {string} spanName the name for the span - * @param {Attributes?} attributes an object containing the attributes to be set for the span - * @param {SpanContext?} parent the context of the parent span to link to the span - */ -export function createSpan( - spanName: string, - kind: SpanKind, - attributes?: SpanAttributes, - parent?: SpanContext -): Span { - return libraryTracer.startSpan( - spanName, - { - // set the kind of the span - kind, - // set the attributes of the span - attributes: attributes, - }, - parent ? trace.setSpanContext(context.active(), parent) : undefined - ); -} diff --git a/src/publisher/flow-publisher.ts b/src/publisher/flow-publisher.ts index ebfee5b0f..74e3b3279 100644 --- a/src/publisher/flow-publisher.ts +++ b/src/publisher/flow-publisher.ts @@ -17,6 +17,7 @@ import {Publisher} from '.'; import {FlowControl} from './flow-control'; import {PubsubMessage, calculateMessageSize} from './pubsub-message'; +import * as tracing from '../telemetry-tracing'; /** * Encapsulates a series of message publishes from a rapid loop (or similar @@ -76,7 +77,11 @@ export class FlowControlledPublisher { * ``` */ publish(message: PubsubMessage): Promise | null { + const flowSpan = message.parentSpan + ? tracing.PubsubSpans.createPublishFlowSpan(message) + : undefined; const doPublish = () => { + flowSpan?.end(); this.doPublish(message); }; diff --git a/src/publisher/index.ts b/src/publisher/index.ts index aa017a46d..c685af977 100644 --- a/src/publisher/index.ts +++ b/src/publisher/index.ts @@ -16,15 +16,14 @@ import * as extend from 'extend'; import {CallOptions} from 'google-gax'; -import {SemanticAttributes} from '@opentelemetry/semantic-conventions'; -import {isSpanContextValid, Span, SpanKind} from '@opentelemetry/api'; +import {isSpanContextValid, Span} from '@opentelemetry/api'; import {BatchPublishOptions} from './message-batch'; import {Queue, OrderedQueue} from './message-queues'; import {Topic} from '../topic'; import {RequestCallback, EmptyCallback} from '../pubsub'; import {defaultOptions} from '../default-options'; -import {createSpan} from '../opentelemetry-tracing'; +import * as tracing from '../telemetry-tracing'; import {FlowControl, FlowControlOptions} from './flow-control'; import {promisifySome} from '../util'; @@ -39,6 +38,8 @@ export interface PublishOptions { flowControlOptions?: FlowControlOptions; gaxOpts?: CallOptions; messageOrdering?: boolean; + + /** @deprecated Unset and use context propagation. */ enableOpenTelemetryTracing?: boolean; } @@ -209,29 +210,23 @@ export class Publisher { } } - const span: Span | undefined = this.constructSpan(message); + // Ensure that there's a parent span for subsequent publishes + // to hang off of. + this.getParentSpan(message); if (!message.orderingKey) { this.queue.add(message, callback!); - if (span) { - span.end(); - } - return; - } - - const key = message.orderingKey; - - if (!this.orderedQueues.has(key)) { - const queue = new OrderedQueue(this, key); - this.orderedQueues.set(key, queue); - queue.once('drain', () => this.orderedQueues.delete(key)); - } + } else { + const key = message.orderingKey; - const queue = this.orderedQueues.get(key)!; - queue.add(message, callback!); + if (!this.orderedQueues.has(key)) { + const queue = new OrderedQueue(this, key); + this.orderedQueues.set(key, queue); + queue.once('drain', () => this.orderedQueues.delete(key)); + } - if (span) { - span.end(); + const queue = this.orderedQueues.get(key)!; + queue.add(message, callback!); } } @@ -332,54 +327,30 @@ export class Publisher { } /** - * Constructs an OpenTelemetry span + * Finds or constructs an telemetry publish/parent span for a message. * * @private * * @param {PubsubMessage} message The message to create a span for */ - constructSpan(message: PubsubMessage): Span | undefined { - if (!this.settings.enableOpenTelemetryTracing) { + getParentSpan(message: PubsubMessage): Span | undefined { + const enabled = tracing.isEnabled(this.settings); + if (!enabled) { return undefined; } - const spanAttributes = { - // Add Opentelemetry semantic convention attributes to the span, based on: - // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.1.0/specification/trace/semantic_conventions/messaging.md - [SemanticAttributes.MESSAGING_TEMP_DESTINATION]: false, - [SemanticAttributes.MESSAGING_SYSTEM]: 'pubsub', - [SemanticAttributes.MESSAGING_OPERATION]: 'send', - [SemanticAttributes.MESSAGING_DESTINATION]: this.topic.name, - [SemanticAttributes.MESSAGING_DESTINATION_KIND]: 'topic', - [SemanticAttributes.MESSAGING_MESSAGE_ID]: message.messageId, - [SemanticAttributes.MESSAGING_PROTOCOL]: 'pubsub', - [SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES]: - message.data?.length, - 'messaging.pubsub.ordering_key': message.orderingKey, - } as Attributes; - - const span: Span = createSpan( - `${this.topic.name} send`, - SpanKind.PRODUCER, - spanAttributes + if (message.parentSpan) { + return message.parentSpan; + } + + const span = tracing.PubsubSpans.createPublisherSpan( + message, + this.topic.name ); - // If the span's context is valid we should pass the span context special attribute + // If the span's context is valid we should inject the propagation trace context. if (isSpanContextValid(span.spanContext())) { - if ( - message.attributes && - message.attributes['googclient_OpenTelemetrySpanContext'] - ) { - console.warn( - 'googclient_OpenTelemetrySpanContext key set as message attribute, but will be overridden.' - ); - } - if (!message.attributes) { - message.attributes = {}; - } - - message.attributes['googclient_OpenTelemetrySpanContext'] = - JSON.stringify(span.spanContext()); + tracing.injectSpan(span, message, enabled); } return span; diff --git a/src/publisher/message-queues.ts b/src/publisher/message-queues.ts index 3ff88be25..fb40a58b3 100644 --- a/src/publisher/message-queues.ts +++ b/src/publisher/message-queues.ts @@ -21,7 +21,8 @@ import {BatchPublishOptions, MessageBatch} from './message-batch'; import {PublishError} from './publish-error'; import {Publisher, PubsubMessage, PublishCallback} from './'; import {google} from '../../protos/protos'; - +import * as tracing from '../telemetry-tracing'; +import {filterMessage} from './pubsub-message'; import {promisify} from 'util'; /** @@ -94,12 +95,32 @@ export abstract class MessageQueue extends EventEmitter { const {topic, settings} = this.publisher; const reqOpts = { topic: topic.name, - messages, + messages: messages.map(filterMessage), }; if (messages.length === 0) { return; } + // Make sure we have a projectId filled in to update telemetry spans. + // The overall spans may not have the correct projectId because it wasn't + // known at the time publishMessage was called. + const spanMessages = messages.filter(m => !!m.parentSpan); + if (spanMessages.length) { + if (!topic.pubsub.isIdResolved) { + await topic.pubsub.getClientConfig(); + } + spanMessages.forEach(m => { + tracing.PubsubSpans.updatePublisherTopicName(m.parentSpan!, topic.name); + }); + } + + messages.forEach(m => { + const span = tracing.PubsubSpans.createPublishRpcSpan(m, messages.length); + if (span) { + m.rpcSpan = span; + } + }); + const requestCallback = topic.request; const request = promisify(requestCallback.bind(topic)); try { @@ -119,6 +140,13 @@ export abstract class MessageQueue extends EventEmitter { callbacks.forEach(callback => callback(err)); throw e; + } finally { + messages.forEach(m => { + // We're finished with both the RPC and the whole publish operation, + // so close out all of the related spans. + m.rpcSpan?.end(); + m.parentSpan?.end(); + }); } } } @@ -158,6 +186,8 @@ export class Queue extends MessageQueue { this.publish().catch(() => {}); } + message.batchingSpan = tracing.PubsubSpans.createPublishBatchSpan(message); + this.batch.add(message, callback); if (this.batch.isFull()) { @@ -209,6 +239,8 @@ export class Queue extends MessageQueue { delete this.pending; } + messages.forEach(m => m.batchingSpan?.end()); + await this._publish(messages, callbacks); if (this.batch.messages.length) { // We only do the indefinite go-arounds when we're trying to do a diff --git a/src/publisher/pubsub-message.ts b/src/publisher/pubsub-message.ts index a1e1283d5..d979d6eac 100644 --- a/src/publisher/pubsub-message.ts +++ b/src/publisher/pubsub-message.ts @@ -15,6 +15,7 @@ */ import {google} from '../../protos/protos'; +import * as tracing from '../telemetry-tracing'; /** * Strings are the only allowed values for keys and values in message attributes. @@ -24,7 +25,9 @@ export type Attributes = Record; /** * The basic {data, attributes} for a message to be published. */ -export interface PubsubMessage extends google.pubsub.v1.IPubsubMessage { +export interface PubsubMessage + extends google.pubsub.v1.IPubsubMessage, + tracing.MessageWithAttributes { /** * If we've calculated the size of this message, it will be cached here. * This is done to avoid having to build up the attribute size over and over. @@ -35,6 +38,50 @@ export interface PubsubMessage extends google.pubsub.v1.IPubsubMessage { * @private */ calculatedSize?: number; + + // The following are here instead of inside an object (like subs) because we + // don't get to control what these objects are. They come from grpc. + + /** + * If tracing is enabled, track the batch span. + * + * @private + */ + batchingSpan?: tracing.Span; + + /** + * If tracing is enabled, track the RPC send time span. + * + * @private + */ + rpcSpan?: tracing.Span; +} + +/** + * Since we tag a fair number of extra things into messages sent to the Pub/Sub + * server, this filters everything down to what needs to be sent. This should be + * used right before gRPC calls. + */ +export function filterMessage( + message: PubsubMessage +): google.pubsub.v1.IPubsubMessage { + const filtered = {} as PubsubMessage; + if (message.data) { + filtered.data = message.data; + } + if (message.attributes) { + filtered.attributes = message.attributes; + } + if (message.messageId) { + filtered.messageId = message.messageId; + } + if (message.publishTime) { + filtered.publishTime = message.publishTime; + } + if (message.orderingKey) { + filtered.orderingKey = message.orderingKey; + } + return filtered; } /** diff --git a/src/subscriber.ts b/src/subscriber.ts index fe5defc8e..cdebdb5c9 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -17,9 +17,6 @@ import {DateStruct, PreciseDate} from '@google-cloud/precise-date'; import {replaceProjectIdToken} from '@google-cloud/projectify'; import {promisify} from '@google-cloud/promisify'; -import {EventEmitter} from 'events'; -import {SpanContext, Span, SpanKind} from '@opentelemetry/api'; -import {SemanticAttributes} from '@opentelemetry/semantic-conventions'; import {google} from '../protos/protos'; import {Histogram} from './histogram'; @@ -29,8 +26,9 @@ import {MessageStream, MessageStreamOptions} from './message-stream'; import {Subscription} from './subscription'; import {defaultOptions} from './default-options'; import {SubscriberClient} from './v1'; -import {createSpan} from './opentelemetry-tracing'; +import * as tracing from './telemetry-tracing'; import {Duration} from './temporal'; +import {EventEmitter} from 'events'; export type PullResponse = google.pubsub.v1.IStreamingPullResponse; export type SubscriptionProperties = @@ -64,6 +62,97 @@ export class AckError extends Error { } } +/** + * Tracks the various spans related to subscriber/receive tracing. + * + * @private + */ +export class SubscriberSpans { + parent: tracing.MessageWithAttributes; + + // These are always attached to a message. + constructor(parent: tracing.MessageWithAttributes) { + this.parent = parent; + } + + // Start a flow control span if needed. + flowStart() { + if (!this.flow) { + this.flow = tracing.PubsubSpans.createReceiveFlowSpan(this.parent); + } + } + + // End any flow control span. + flowEnd() { + if (this.flow) { + this.flow.end(); + this.flow = undefined; + } + } + + // Start a leasing modAck span if needed. + modAckStart(deadline: Duration, isInitial: boolean) { + if (!this.modAck) { + this.modAck = tracing.PubsubSpans.createModAckSpan( + this.parent, + deadline, + isInitial + ); + } + } + + // End any leasing modAck span. + modAckStop() { + if (this.modAck) { + this.modAck.end(); + this.modAck = undefined; + } + } + + // Start a scheduler span if needed. + // Note: This is not currently used in Node, because there is no + // scheduler process, due to the way messages are delivered one at a time. + schedulerStart() { + if (!this.scheduler) { + this.scheduler = tracing.PubsubSpans.createReceiveSchedulerSpan( + this.parent + ); + } + } + + // End any schedular span. + schedulerEnd() { + if (this.scheduler) { + this.scheduler.end(); + this.scheduler = undefined; + } + } + + // Start a processing span if needed. + // This is for user processing, during on('message') delivery. + processingStart(subName: string) { + if (!this.processing) { + this.processing = tracing.PubsubSpans.createReceiveProcessSpan( + this.parent, + subName + ); + } + } + + // End any processing span. + processingEnd() { + if (this.processing) { + this.processing.end(); + this.processing = undefined; + } + } + + private modAck?: tracing.Span; + private flow?: tracing.Span; + private scheduler?: tracing.Span; + private processing?: tracing.Span; +} + /** * Date object with nanosecond precision. Supports all standard Date arguments * in addition to several custom types. @@ -91,7 +180,7 @@ export class AckError extends Error { * }); * ``` */ -export class Message { +export class Message implements tracing.MessageWithAttributes { ackId: string; attributes: {[key: string]: string}; data: Buffer; @@ -103,6 +192,37 @@ export class Message { private _handled: boolean; private _length: number; private _subscriber: Subscriber; + + /** + * @private + * + * Tracks a telemetry tracing parent span through the receive process. This will + * be the original publisher-side span if we have one; otherwise we'll create + * a "publisher" span to hang new subscriber spans onto. + * + * This needs to be declared explicitly here, because having a public class + * implement a private interface seems to confuse TypeScript. (And it's needed + * in unit tests.) + */ + parentSpan?: tracing.Span; + + /** + * @private + * + * Ends any open subscribe telemetry tracing span. + */ + endParentSpan() { + this.parentSpan?.end(); + delete this.parentSpan; + } + + /** + * @private + * + * Tracks subscriber-specific telemetry objects through the library. + */ + subSpans: SubscriberSpans; + private _ackFailed?: AckError; /** @@ -182,6 +302,13 @@ export class Message { */ this.received = Date.now(); + /** + * Telemetry tracing objects. + * + * @private + */ + this.subSpans = new SubscriberSpans(this); + this._handled = false; this._length = this.data.length; this._subscriber = sub; @@ -380,6 +507,9 @@ export interface SubscriberOptions { flowControl?: FlowControlOptions; useLegacyFlowControl?: boolean; streamingOptions?: MessageStreamOptions; + + /** @deprecated Unset this and instantiate a tracer; support will be + * enabled automatically. */ enableOpenTelemetryTracing?: boolean; } @@ -403,7 +533,7 @@ export class Subscriber extends EventEmitter { private _acks!: AckQueue; private _histogram: Histogram; private _inventory!: LeaseManager; - private _useOpentelemetry: boolean; + private _useLegacyOpenTelemetry: boolean; private _latencies: Histogram; private _modAcks!: ModAckQueue; private _name!: string; @@ -421,7 +551,7 @@ export class Subscriber extends EventEmitter { this.maxBytes = defaultOptions.subscription.maxOutstandingBytes; this.useLegacyFlowControl = false; this.isOpen = false; - this._useOpentelemetry = false; + this._useLegacyOpenTelemetry = false; this._histogram = new Histogram({min: 10, max: 600}); this._latencies = new Histogram(); this._subscription = subscription; @@ -565,12 +695,20 @@ export class Subscriber extends EventEmitter { const ackTimeSeconds = (Date.now() - message.received) / 1000; this.updateAckDeadline(ackTimeSeconds); + const ackSpan = tracing.PubsubSpans.createReceiveResponseSpan( + message, + true + ); + // Ignore this in this version of the method (but hook catch // to avoid unhandled exceptions). const resultPromise = this._acks.add(message); resultPromise.catch(() => {}); await this._acks.onFlush(); + + ackSpan?.end(); + this._inventory.remove(message); } @@ -586,7 +724,15 @@ export class Subscriber extends EventEmitter { const ackTimeSeconds = (Date.now() - message.received) / 1000; this.updateAckDeadline(ackTimeSeconds); + const ackSpan = tracing.PubsubSpans.createReceiveResponseSpan( + message, + true + ); + await this._acks.add(message); + + ackSpan?.end(); + this._inventory.remove(message); // No exception means Success. @@ -685,7 +831,15 @@ export class Subscriber extends EventEmitter { * @private */ async nack(message: Message): Promise { + const ackSpan = tracing.PubsubSpans.createReceiveResponseSpan( + message, + false + ); + await this.modAck(message, 0); + + ackSpan?.end(); + this._inventory.remove(message); } @@ -699,7 +853,13 @@ export class Subscriber extends EventEmitter { * @private */ async nackWithResponse(message: Message): Promise { - return await this.modAckWithResponse(message, 0); + const ackSpan = tracing.PubsubSpans.createReceiveResponseSpan( + message, + false + ); + const response = await this.modAckWithResponse(message, 0); + ackSpan?.end(); + return response; } /** @@ -741,7 +901,7 @@ export class Subscriber extends EventEmitter { setOptions(options: SubscriberOptions): void { this._options = options; - this._useOpentelemetry = options.enableOpenTelemetryTracing || false; + this._useLegacyOpenTelemetry = options.enableOpenTelemetryTracing || false; // The user-set ackDeadline value basically pegs the extension time. // We'll emulate it by overwriting min/max. @@ -779,58 +939,18 @@ export class Subscriber extends EventEmitter { } /** - * Constructs an OpenTelemetry span from the incoming message. + * Constructs a telemetry span from the incoming message. * * @param {Message} message One of the received messages * @private */ - private _constructSpan(message: Message): Span | undefined { - // Handle cases where OpenTelemetry is disabled or no span context was sent through message - if ( - !this._useOpentelemetry || - !message.attributes || - !message.attributes['googclient_OpenTelemetrySpanContext'] - ) { - return undefined; - } - - const spanValue = message.attributes['googclient_OpenTelemetrySpanContext']; - const parentSpanContext: SpanContext | undefined = spanValue - ? JSON.parse(spanValue) - : undefined; - const spanAttributes = { - // Original span attributes - ackId: message.ackId, - deliveryAttempt: message.deliveryAttempt, - // - // based on https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#topic-with-multiple-consumers - [SemanticAttributes.MESSAGING_SYSTEM]: 'pubsub', - [SemanticAttributes.MESSAGING_OPERATION]: 'process', - [SemanticAttributes.MESSAGING_DESTINATION]: this.name, - [SemanticAttributes.MESSAGING_DESTINATION_KIND]: 'topic', - [SemanticAttributes.MESSAGING_MESSAGE_ID]: message.id, - [SemanticAttributes.MESSAGING_PROTOCOL]: 'pubsub', - [SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES]: ( - message.data as Buffer - ).length, - // Not in Opentelemetry semantic convention but mimics naming - 'messaging.pubsub.received_at': message.received, - 'messaging.pubsub.acknowlege_id': message.ackId, - 'messaging.pubsub.delivery_attempt': message.deliveryAttempt, - }; - - // Subscriber spans should always have a publisher span as a parent. - // Return undefined if no parent is provided - const spanName = `${this.name} process`; - const span = parentSpanContext - ? createSpan( - spanName.trim(), - SpanKind.CONSUMER, - spanAttributes, - parentSpanContext - ) - : undefined; - return span; + private createParentSpan(message: Message): void { + const enabled = tracing.isEnabled({ + enableOpenTelemetryTracing: this._useLegacyOpenTelemetry, + }); + if (enabled) { + tracing.extractSpan(message, this.name, enabled); + } } /** @@ -860,12 +980,16 @@ export class Subscriber extends EventEmitter { for (const data of receivedMessages!) { const message = new Message(this, data); - const span: Span | undefined = this._constructSpan(message); + this.createParentSpan(message); if (this.isOpen) { if (this.isExactlyOnceDelivery) { // For exactly-once delivery, we must validate that we got a valid // lease on the message before actually leasing it. + message.subSpans.modAckStart( + Duration.from({seconds: this.ackDeadline}), + true + ); message .modAckWithResponse(this.ackDeadline) .then(() => { @@ -875,17 +999,22 @@ export class Subscriber extends EventEmitter { // Temporary failures will retry, so if an error reaches us // here, that means a permanent failure. Silently drop these. this._discardMessage(message); + }) + .finally(() => { + message.subSpans.modAckStop(); }); } else { + message.subSpans.modAckStart( + Duration.from({seconds: this.ackDeadline}), + true + ); message.modAck(this.ackDeadline); + message.subSpans.modAckStop(); this._inventory.add(message); } } else { message.nack(); } - if (span) { - span.end(); - } } } diff --git a/src/subscription.ts b/src/subscription.ts index b721a8681..c12fd4981 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -14,7 +14,6 @@ * limitations under the License. */ -import {EventEmitter} from 'events'; import * as extend from 'extend'; import {CallOptions} from 'google-gax'; import snakeCase = require('lodash.snakecase'); @@ -42,12 +41,15 @@ import { SeekResponse, Snapshot, } from './snapshot'; -import {Subscriber, SubscriberOptions} from './subscriber'; +import {Message, Subscriber, SubscriberOptions} from './subscriber'; import {Topic} from './topic'; import {promisifySome} from './util'; export {AckError, AckResponse, AckResponses} from './subscriber'; +import {EmitterCallback, WrappingEmitter} from './wrapping-emitter'; +import * as tracing from './telemetry-tracing'; + export type PushConfig = google.pubsub.v1.IPushConfig; export type OidcToken = google.pubsub.v1.PushConfig.IOidcToken; @@ -266,22 +268,27 @@ export type DetachSubscriptionResponse = EmptyResponse; * }); * ``` */ -export class Subscription extends EventEmitter { +export class Subscription extends WrappingEmitter { + // Note: WrappingEmitter is used here to wrap user processing callbacks. + // We do this to be able to build telemetry spans around them. pubsub: PubSub; iam: IAM; - name: string; topic?: Topic | string; metadata?: google.pubsub.v1.ISubscription; request: typeof PubSub.prototype.request; + private _subscriber: Subscriber; + constructor(pubsub: PubSub, name: string, options?: SubscriptionOptions) { super(); + this.setEmitterWrapper(this.listenerWrapper.bind(this)); + options = options || {}; this.pubsub = pubsub; this.request = pubsub.request.bind(pubsub); - this.name = Subscription.formatName_(this.projectId, name); + this.id_ = name; this.topic = options.topic; /** @@ -322,7 +329,7 @@ export class Subscription extends EventEmitter { * }); * ``` */ - this.iam = new IAM(pubsub, this.name); + this.iam = new IAM(pubsub, this); this._subscriber = new Subscriber(this, options); this._subscriber @@ -334,6 +341,43 @@ export class Subscription extends EventEmitter { this._listen(); } + private id_: string; + get name(): string { + return Subscription.formatName_(this.pubsub.projectId, this.id_); + } + + /** + * This wrapper will be called as part of the emit() process. This lets + * us capture the full time span of processing even if the user is using + * async callbacks. + * + * @private + */ + private listenerWrapper( + eventName: string | symbol, + listener: EmitterCallback, + args: unknown[] + ) { + if (eventName !== 'message') { + return listener(...args); + } else { + const span = tracing.PubsubSpans.createReceiveProcessSpan( + args[0] as Message, + this.name + ); + + // If the user returned a Promise, that means they used an async handler. + // In that case, we need to tag on to their Promise to end the span. + // Otherwise, the listener chain is sync, and we can close out sync. + const result = listener(...args) as unknown as Promise; + if (result && typeof result.then === 'function') { + result.then(() => span?.end()); + } else { + span?.end(); + } + } + } + /** * Indicates if the Subscription is open and receiving messages. * diff --git a/src/telemetry-tracing.ts b/src/telemetry-tracing.ts new file mode 100644 index 000000000..1181e8dcc --- /dev/null +++ b/src/telemetry-tracing.ts @@ -0,0 +1,515 @@ +/*! + * Copyright 2020-2023 Google LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + Tracer, + SpanContext, + Span, + context, + trace, + propagation, + SpanKind, + TextMapGetter, + TextMapSetter, + ROOT_CONTEXT, + Context, +} from '@opentelemetry/api'; +import {Attributes, PubsubMessage} from './publisher/pubsub-message'; +import {PublishOptions} from './publisher/index'; +import {SemanticAttributes} from '@opentelemetry/semantic-conventions'; +import {Duration} from './temporal'; + +export {Span}; + +// We need this to get the library version. +// eslint-disable-next-line @typescript-eslint/no-var-requires +const packageJson = require('../../package.json'); + +/** + * Instantiates a Opentelemetry tracer for the library + * + * @private + */ +let cachedTracer: Tracer | undefined; +function getTracer(): Tracer { + const tracer = + cachedTracer ?? + trace.getTracer('@google-cloud/pubsub', packageJson.version); + cachedTracer = tracer; + return cachedTracer; +} + +/** + * Determination of the level of OTel support we're providing. + * + * @private + */ +export enum OpenTelemetryLevel { + /** + * None: OTel support is not enabled because we found no trace provider. + */ + None = 0, + + /** + * Legacy: We found a trace provider, but the user also specified the old + * manual enable flag; this will trigger the legacy attribute being included. + * The modern propagation attribute will _also_ be included. + */ + Legacy = 1, + + /** + * Modern: We will only inject/extract the modern propagation attribute. + */ + Modern = 2, +} + +/** + * Tries to divine what sort of OpenTelemetry we're supporting. See the enum + * for the meaning of the values, and other notes. + * + * Legacy OTel is no longer officially supported, but we don't want to + * break anyone at a non-major. + * + * @private + */ +export function isEnabled( + publishSettings?: PublishOptions +): OpenTelemetryLevel { + // If there's no trace provider attached, do nothing in any case. + const traceProvider = trace.getTracerProvider(); + if (!traceProvider) { + return OpenTelemetryLevel.None; + } + + if (publishSettings?.enableOpenTelemetryTracing) { + return OpenTelemetryLevel.Legacy; + } + + return OpenTelemetryLevel.Modern; +} + +/** + * Our Carrier object for propagation is anything with an 'attributes' + * object, which is one of several possible Message classes. (They're + * different for publish and subscribe.) + * + * Also we add a parentSpan optional member for passing around the + * actual Span object within the client library. This can be a publish + * or subscriber span, depending on the context. + * + * @private + */ +export interface MessageWithAttributes { + attributes?: Attributes | null | undefined; + parentSpan?: Span; +} + +/** + * Implements common members for the TextMap getter and setter interfaces for Pub/Sub messages. + * + * @private + */ +export class PubsubMessageGetSet { + static keyPrefix = 'googclient_'; + + keys(carrier: MessageWithAttributes): string[] { + return Object.getOwnPropertyNames(carrier.attributes) + .filter(n => n.startsWith(PubsubMessageGetSet.keyPrefix)) + .map(n => n.substring(PubsubMessageGetSet.keyPrefix.length)); + } + + protected attributeName(key: string): string { + return `${PubsubMessageGetSet.keyPrefix}${key}`; + } +} + +/** + * Implements the TextMap getter interface for Pub/Sub messages. + * + * @private + */ +export class PubsubMessageGet + extends PubsubMessageGetSet + implements TextMapGetter +{ + get( + carrier: MessageWithAttributes, + key: string + ): string | string[] | undefined { + return carrier?.attributes?.[this.attributeName(key)]; + } +} + +/** + * Implements the TextMap setter interface for Pub/Sub messages. + * + * @private + */ +export class PubsubMessageSet + extends PubsubMessageGetSet + implements TextMapSetter +{ + set(carrier: MessageWithAttributes, key: string, value: string): void { + if (!carrier.attributes) { + carrier.attributes = {}; + } + carrier.attributes[this.attributeName(key)] = value; + } +} + +/** + * The getter to use when calling extract() on a Pub/Sub message. + * + * @private + */ +export const pubsubGetter = new PubsubMessageGet(); + +/** + * The setter to use when calling inject() on a Pub/Sub message. + * + * @private + */ +export const pubsubSetter = new PubsubMessageSet(); + +/** + * Description of the data structure passed for span attributes. + * + * @private + */ +export interface SpanAttributes { + [x: string]: string | number; +} + +/** + * Converts a SpanContext to a full Context, as needed. + * + * @private + */ +export function spanContextToContext( + parent?: SpanContext +): Context | undefined { + return parent ? trace.setSpanContext(context.active(), parent) : undefined; +} + +/** + * The modern propagation attribute name. + * + * Technically this is determined by the OpenTelemetry library, but + * in practice, it follows the W3C spec, so this should be the right + * one. The only thing we're using it for, anyway, is emptying user + * supplied attributes. + * + * @private + */ +export const modernAttributeName = 'googclient_traceparent'; + +/** + * The old legacy attribute name. + * + * @private + */ +export const legacyAttributeName = 'googclient_OpenTelemetrySpanContext'; + +export class PubsubSpans { + static createPublisherSpan(message: PubsubMessage, topicName: string): Span { + const spanAttributes = { + // Add Opentelemetry semantic convention attributes to the span, based on: + // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.1.0/specification/trace/semantic_conventions/messaging.md + [SemanticAttributes.MESSAGING_TEMP_DESTINATION]: false, + [SemanticAttributes.MESSAGING_SYSTEM]: 'pubsub', + [SemanticAttributes.MESSAGING_OPERATION]: 'send', + [SemanticAttributes.MESSAGING_DESTINATION]: topicName, + [SemanticAttributes.MESSAGING_DESTINATION_KIND]: 'topic', + [SemanticAttributes.MESSAGING_PROTOCOL]: 'pubsub', + [SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES]: + message.data?.length, + 'messaging.pubsub.ordering_key': message.orderingKey, + } as SpanAttributes; + + const span: Span = getTracer().startSpan(`${topicName} send`, { + kind: SpanKind.PRODUCER, + attributes: spanAttributes, + }); + + return span; + } + + static updatePublisherTopicName(span: Span, topicName: string) { + span.updateName(`${topicName} send`); + span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, topicName); + } + + static createReceiveSpan( + message: MessageWithAttributes, + subName: string, + parent: Context | undefined + ): Span { + const name = `${subName} receive`; + + // Mostly we want to keep the context IDs; the attributes and such + // are only something we do on the publish side. + if (context) { + return getTracer().startSpan( + name, + { + kind: SpanKind.CONSUMER, + attributes: {}, + }, + parent + ); + } else { + return getTracer().startSpan(name, { + kind: SpanKind.CONSUMER, + }); + } + } + + static createChildSpan( + message: MessageWithAttributes, + name: string + ): Span | undefined { + const parent = message.parentSpan; + if (parent) { + return getTracer().startSpan( + name, + { + kind: SpanKind.INTERNAL, + attributes: {}, + }, + spanContextToContext(parent.spanContext()) + ); + } else { + return undefined; + } + } + + static createPublishFlowSpan(message: PubsubMessage): Span | undefined { + return PubsubSpans.createChildSpan(message, 'publisher flow control'); + } + + static createPublishBatchSpan(message: PubsubMessage): Span | undefined { + return PubsubSpans.createChildSpan(message, 'publish scheduler'); + } + + static createPublishRpcSpan( + message: PubsubMessage, + messageCount: number + ): Span | undefined { + const span = PubsubSpans.createChildSpan(message, 'publish'); + span?.setAttribute('messaging.pubsub.num_messages_in_batch', messageCount); + + return span; + } + + static createModAckSpan( + message: MessageWithAttributes, + deadline: Duration, + initial: boolean + ) { + const span = PubsubSpans.createChildSpan(message, 'modify ack deadline'); + if (span) { + span.setAttributes({ + 'messaging.pubsub.modack_deadline_seconds': deadline.totalOf('second'), + 'messaging.pubsub.is_receipt_modack': initial ? 'true' : 'false', + } as unknown as Attributes); + } + return span; + } + + static createReceiveFlowSpan( + message: MessageWithAttributes + ): Span | undefined { + return PubsubSpans.createChildSpan(message, 'subscriber flow control'); + } + + static createReceiveSchedulerSpan( + message: MessageWithAttributes + ): Span | undefined { + return PubsubSpans.createChildSpan(message, 'subscribe scheduler'); + } + + static createReceiveProcessSpan( + message: MessageWithAttributes, + subName: string + ): Span | undefined { + return PubsubSpans.createChildSpan(message, `${subName} process`); + } + + static setReceiveProcessResult(span: Span, isAck: boolean) { + span.setAttribute('messaging.pubsub.result', isAck ? 'ack' : 'nack'); + } + + static createReceiveLeaseSpan( + message: MessageWithAttributes, + deadline: Duration, + isInitial: boolean + ): Span | undefined { + const span = PubsubSpans.createChildSpan(message, 'modify ack deadline'); + span?.setAttribute( + 'messaging.pubsub.modack_deadline_seconds', + deadline.totalOf('second') + ); + span?.setAttribute('messaging.pubsub.is_receipt_modack', isInitial); + return span; + } + + static createReceiveResponseSpan( + message: MessageWithAttributes, + isAck: boolean + ): Span | undefined { + const name = isAck ? 'ack' : 'nack'; + return PubsubSpans.createChildSpan(message, name); + } +} + +/** + * Injects the trace context into a Pub/Sub message (or other object with + * an 'attributes' object) for propagation. + * + * This is for the publish side. + * + * @private + */ +export function injectSpan( + span: Span, + message: MessageWithAttributes, + enabled: OpenTelemetryLevel +): void { + if (!message.attributes) { + message.attributes = {}; + } + + if (message.attributes[modernAttributeName]) { + console.warn( + `${modernAttributeName} key set as message attribute, but will be overridden.` + ); + + delete message.attributes[modernAttributeName]; + } + + // If we're in legacy mode, add that header as well. + if (enabled === OpenTelemetryLevel.Legacy) { + if (message.attributes[legacyAttributeName]) { + console.warn( + `${legacyAttributeName} key set as message attribute, but will be overridden.` + ); + } + message.attributes[legacyAttributeName] = JSON.stringify( + span.spanContext() + ); + } + + // Always do propagation injection with the trace context. + const context = trace.setSpanContext(ROOT_CONTEXT, span.spanContext()); + propagation.inject(context, message, pubsubSetter); + + // Also put the direct reference to the Span object for while we're + // passing it around in the client library. + message.parentSpan = span; +} + +/** + * Returns true if this message potentially contains a span context. + * + * @private + */ +export function containsSpanContext(message: MessageWithAttributes): boolean { + if (message.parentSpan) { + return true; + } + + if (!message.attributes) { + return false; + } + + const keys = Object.getOwnPropertyNames(message.attributes); + return !!keys.find( + n => n === legacyAttributeName || n === modernAttributeName + ); +} + +/** + * Extracts the trace context from a Pub/Sub message (or other object with + * an 'attributes' object) from a propagation, for receive processing. If no + * context was present, create a new parent span. + * + * This is for the receive side. + * + * @private + */ +export function extractSpan( + message: MessageWithAttributes, + subName: string, + enabled: OpenTelemetryLevel +): Span | undefined { + if (message.parentSpan) { + return message.parentSpan; + } + + const keys = Object.getOwnPropertyNames(message.attributes ?? {}); + + let context: Context | undefined; + + if (enabled === OpenTelemetryLevel.Legacy) { + // Only prefer the legacy attributes to no trace context attribute. + if ( + keys.includes(legacyAttributeName) && + !keys.includes(modernAttributeName) + ) { + const legacyValue = message.attributes?.[legacyAttributeName]; + if (legacyValue) { + const parentSpanContext: SpanContext | undefined = legacyValue + ? JSON.parse(legacyValue) + : undefined; + if (parentSpanContext) { + context = spanContextToContext(parentSpanContext); + } + } + } + } else { + if (keys.includes(modernAttributeName)) { + context = propagation.extract(ROOT_CONTEXT, message, pubsubGetter); + } + } + + const span = PubsubSpans.createReceiveSpan(message, subName, context); + message.parentSpan = span; + return span; +} + +// Since these were exported on the main Pub/Sub index in the previous +// version, we have to export them until the next major. +export const legacyExports = { + /** + * @deprecated + * Use the new telemetry functionality instead; see the updated OpenTelemetry + * sample for an example. + */ + createSpan: function ( + spanName: string, + kind: SpanKind, + attributes?: SpanAttributes, + parent?: SpanContext + ): Span { + return getTracer().startSpan( + spanName, + { + kind, + attributes, + }, + parent ? trace.setSpanContext(context.active(), parent) : undefined + ); + }, +}; diff --git a/src/topic.ts b/src/topic.ts index 96b33490e..5109985db 100644 --- a/src/topic.ts +++ b/src/topic.ts @@ -106,7 +106,6 @@ export type MessageOptions = PubsubMessage & {json?: any}; * ``` */ export class Topic { - name: string; parent: PubSub; pubsub: PubSub; request: typeof PubSub.prototype.request; @@ -119,11 +118,12 @@ export class Topic { constructor(pubsub: PubSub, name: string, options?: PublishOptions) { /** - * The fully qualified name of this topic. + * The fully qualified name of this topic. May have a placeholder for + * the projectId if it's not been resolved. * @name Topic#name * @type {string} */ - this.name = Topic.formatName_(pubsub.projectId, name); + this.id_ = name; this.publisher = new Publisher(this, options); /** * The parent {@link PubSub} instance of this topic instance. @@ -180,7 +180,12 @@ export class Topic { * }); * ``` */ - this.iam = new IAM(pubsub, this.name); + this.iam = new IAM(pubsub, this); + } + + private id_: string; + get name(): string { + return Topic.formatName_(this.parent.projectId, this.id_); } /** diff --git a/src/wrapping-emitter.ts b/src/wrapping-emitter.ts new file mode 100644 index 000000000..de3fbfe60 --- /dev/null +++ b/src/wrapping-emitter.ts @@ -0,0 +1,145 @@ +// Copyright 2022-2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {EventEmitter} from 'stream'; + +/** + * TypeScript alias for the built-in emitter listener callback. + * + * @private + */ +export interface EmitterCallback { + // This must match the Node built-in type. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (...args: any[]): void; +} + +/** + * Listener wrapper function type - we'll pass the event name, the original + * user callback, and any args that came with the emit. + * + * @private + */ +export interface Wrapper { + ( + eventName: string | symbol, + callback: EmitterCallback, + // Again, matching built-in types. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + args: any[] + ): Promise | void; +} + +/** + * Subclass for the standard EventEmitter that lets you wrap user listen + * handlers. In this library's particular case, this is for the purpose of + * being able to properly wrap these callbacks in OTel spans even across + * async handler callbacks. + * + * This might be overkill for this use case, but the hope is to avoid + * breaking users on a minor version change. + * + * @private + */ +export class WrappingEmitter extends EventEmitter { + private wrapper: Wrapper; + private mapper = new Map>(); + + /** + * Pass a wrapper function here, or a default pass-through will be used. + * + * @private + */ + constructor(wrapper?: Wrapper) { + super(); + this.wrapper = + wrapper ?? + ((event, cb, args) => { + cb(...args); + }); + } + + /** + * Manually set a wrapper pass-through. Because this might be exported to + * anyone using this class, the name is a bit extra verbose. + * + * @private + */ + setEmitterWrapper(wrapper: Wrapper) { + this.wrapper = wrapper; + } + + // These two are just wrappers for addListener/removeListener. + on(eventName: string | symbol, listener: EmitterCallback): this { + return this.addListener(eventName, listener); + } + + off(eventName: string | symbol, listener: EmitterCallback): this { + return this.removeListener(eventName, listener); + } + + // This addListener wrapper will create a one-off, unique wrapper function + // to pass down into super.addListener, and save a mapping of it for later removal. + addListener(eventName: string | symbol, listener: EmitterCallback): this { + const getMapper = () => { + return (...args: unknown[]) => { + return this.wrapper(eventName, listener, args); + }; + }; + const newListener = getMapper(); + const subset = + this.mapper.get(eventName) ?? new Map(); + subset.set(listener, newListener); + this.mapper.set(eventName, subset); + super.addListener(eventName, newListener); + + return this; + } + + // This removeListener wrapper translates the user-passed handler back into + // the unique wrapper function, and then passes that down to super.removeListener. + // This also tries to keep a more or less clean listener mapping list. + removeListener(eventName: string | symbol, listener: EmitterCallback): this { + let listenerToRemove = listener; + + const subset = this.mapper.get(eventName); + if (subset) { + const wrapper = subset.get(listener); + if (wrapper) { + listenerToRemove = wrapper; + } + subset.delete(listener); + if (!subset.size) { + this.mapper.delete(eventName); + } + } + + super.removeListener(eventName, listenerToRemove); + + return this; + } + + // Wrapper for removeAllListeners that also deletes any mappings we had for the event. + removeAllListeners(event?: string | symbol | undefined): this { + if (event) { + this.mapper.delete(event); + } else { + this.mapper.clear(); + } + + super.removeAllListeners(event); + + return this; + } +} diff --git a/test/iam.ts b/test/iam.ts index a2dab3ddc..d5f5763e2 100644 --- a/test/iam.ts +++ b/test/iam.ts @@ -81,10 +81,19 @@ describe('IAM', () => { assert.strictEqual(iam.request, fakeRequest); }); - it('should localize the ID', () => { + it('should localize the ID string', () => { assert.strictEqual(iam.id, ID); }); + it('should localize the ID getter', () => { + iam = new IAM(PUBSUB, { + get name() { + return 'test'; + }, + }); + assert.strictEqual(iam.id, 'test'); + }); + it('should promisify some of the things', () => { assert(promisified); }); diff --git a/test/lease-manager.ts b/test/lease-manager.ts index 2ceb17cf7..503129446 100644 --- a/test/lease-manager.ts +++ b/test/lease-manager.ts @@ -47,9 +47,22 @@ class FakeSubscriber extends EventEmitter { isExactlyOnceDelivery = false; } +class FakeSubscriberTelemetry { + flowStart() {} + flowEnd() {} + schedulerStart() {} + schedulerEnd() {} + modAckStart() {} + modAckStop() {} + processingStart() {} + processingEnd() {} +} + class FakeMessage { length = 20; received: number; + subSpans: FakeSubscriberTelemetry = new FakeSubscriberTelemetry(); + constructor() { this.received = Date.now(); } @@ -58,6 +71,7 @@ class FakeMessage { return AckResponses.Success; } ackFailed() {} + endParentSpan() {} } interface LeaseManagerInternals { @@ -132,6 +146,15 @@ describe('LeaseManager', () => { }); describe('add', () => { + it('should start a flow span', () => { + const message = new FakeMessage() as {} as Message; + const stub = sandbox.spy(message.subSpans, 'flowStart'); + + leaseManager.add(message); + + assert.strictEqual(stub.calledOnce, true); + }); + it('should update the bytes/size values', () => { const message = new FakeMessage() as {} as Message; @@ -365,6 +388,14 @@ describe('LeaseManager', () => { assert.strictEqual(leaseManager.size, 0); }); + it('should end all parent spans', () => { + const messages = [new FakeMessage(), new FakeMessage()]; + const spies = messages.map(m => sandbox.spy(m, 'endParentSpan')); + messages.forEach(m => leaseManager.add(m as {} as Message)); + leaseManager.clear(); + spies.forEach(s => assert.strictEqual(s.calledOnce, true)); + }); + it('should emit the free event if it was full', done => { leaseManager.setOptions({maxMessages: 1}); leaseManager.add(new FakeMessage() as {} as Message); @@ -421,6 +452,16 @@ describe('LeaseManager', () => { }); describe('remove', () => { + it('should end the span', () => { + const message = new FakeMessage(); + const spy = sandbox.spy(message, 'endParentSpan'); + + leaseManager.add(message as {} as Message); + leaseManager.remove(message as {} as Message); + + assert.strictEqual(spy.calledOnce, true); + }); + it('should noop for unknown messages', () => { const message = new FakeMessage(); diff --git a/test/opentelemetry-tracing.ts b/test/opentelemetry-tracing.ts deleted file mode 100644 index dc6a1423e..000000000 --- a/test/opentelemetry-tracing.ts +++ /dev/null @@ -1,60 +0,0 @@ -/*! - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import * as assert from 'assert'; -import {describe, it, beforeEach} from 'mocha'; - -import * as api from '@opentelemetry/api'; -import * as trace from '@opentelemetry/tracing'; -import {createSpan} from '../src/opentelemetry-tracing'; -import {exporter} from './tracing'; -import {SpanKind} from '@opentelemetry/api'; - -describe('OpenTelemetryTracer', () => { - let span: trace.Span; - const spanName = 'test-span'; - const spanContext: api.SpanContext = { - traceId: 'd4cda95b652f4a1592b449d5929fda1b', - spanId: '6e0c63257de34c92', - traceFlags: api.TraceFlags.SAMPLED, - }; - const spanAttributes: api.SpanAttributes = { - foo: 'bar', - }; - - beforeEach(() => { - exporter.reset(); - }); - - it('creates a span', () => { - span = createSpan( - spanName, - SpanKind.PRODUCER, - spanAttributes, - spanContext - ) as trace.Span; - span.end(); - - const spans = exporter.getFinishedSpans(); - assert.notStrictEqual(spans.length, 0); - const exportedSpan = spans.concat().pop()!; - - assert.strictEqual(exportedSpan.name, spanName); - assert.deepStrictEqual(exportedSpan.attributes, spanAttributes); - assert.strictEqual(exportedSpan.parentSpanId, spanContext.spanId); - assert.strictEqual(exportedSpan.kind, SpanKind.PRODUCER); - }); -}); diff --git a/test/publisher/flow-publisher.ts b/test/publisher/flow-publisher.ts index cdeebdb37..55e600917 100644 --- a/test/publisher/flow-publisher.ts +++ b/test/publisher/flow-publisher.ts @@ -27,10 +27,11 @@ import { } from '../../src/publisher'; import {FlowControl} from '../../src/publisher/flow-control'; import * as fp from '../../src/publisher/flow-publisher'; +import * as tracing from '../../src/telemetry-tracing'; class FakePublisher { flowControl!: FlowControl; - publishMessage() {} + async publishMessage() {} setOptions(options: PublishOptions) { this.flowControl.setOptions(options.flowControlOptions!); } @@ -49,6 +50,23 @@ describe('Flow control publisher', () => { sandbox.restore(); }); + it('should create a flow span if a parent exists', async () => { + const fcp = new fp.FlowControlledPublisher(publisher); + const message = { + data: Buffer.from('foo'), + parentSpan: tracing.PubsubSpans.createPublisherSpan({}, 'topic'), + }; + fcp.publish(message as unknown as PubsubMessage); + assert.strictEqual(!!message.parentSpan, true); + }); + + it('should not create a flow span if no parent exists', async () => { + const fcp = new fp.FlowControlledPublisher(publisher); + const message = {data: Buffer.from('foo'), parentSpan: undefined}; + fcp.publish(message as unknown as PubsubMessage); + assert.strictEqual(!message.parentSpan, true); + }); + it('should get no promise if there is flow control space left', async () => { publisher.setOptions({ flowControlOptions: { diff --git a/test/publisher/index.ts b/test/publisher/index.ts index 451bb82f2..7dc97e3d1 100644 --- a/test/publisher/index.ts +++ b/test/publisher/index.ts @@ -28,6 +28,7 @@ import {PublishError} from '../../src/publisher/publish-error'; import * as util from '../../src/util'; import {defaultOptions} from '../../src/default-options'; +import * as tracing from '../../src/telemetry-tracing'; import {exporter} from '../tracing'; import {SpanKind} from '@opentelemetry/api'; import {SemanticAttributes} from '@opentelemetry/semantic-conventions'; @@ -184,20 +185,22 @@ describe('Publisher', () => { describe('OpenTelemetry tracing', () => { let tracingPublisher: p.Publisher = {} as p.Publisher; - const enableTracing: p.PublishOptions = { - enableOpenTelemetryTracing: true, - }; const buffer = Buffer.from('Hello, world!'); beforeEach(() => { exporter.reset(); }); - it('export created spans', () => { + it('export created spans', async () => { // Setup trace exporting - tracingPublisher = new Publisher(topic, enableTracing); + tracingPublisher = new Publisher(topic); + const msg = {data: buffer} as p.PubsubMessage; + tracingPublisher.publishMessage(msg); + + // publishMessage is only the first part of the process now, + // so we need to manually end the span. + msg.parentSpan?.end(); - tracingPublisher.publish(buffer); const spans = exporter.getFinishedSpans(); assert.notStrictEqual(spans.length, 0, 'has span'); const createdSpan = spans.concat().pop()!; @@ -380,7 +383,7 @@ describe('Publisher', () => { it('should issue a warning if OpenTelemetry span context key is set', () => { const warnSpy = sinon.spy(console, 'warn'); const attributes = { - googclient_OpenTelemetrySpanContext: 'foobar', + [tracing.legacyAttributeName]: 'foobar', }; const fakeMessageWithOTKey = {data, attributes}; const publisherTracing = new Publisher(topic, { diff --git a/test/subscriber.ts b/test/subscriber.ts index cab00e286..8ef946699 100644 --- a/test/subscriber.ts +++ b/test/subscriber.ts @@ -28,14 +28,14 @@ import {google} from '../protos/protos'; import * as defer from 'p-defer'; import {HistogramOptions} from '../src/histogram'; -import {FlowControlOptions} from '../src/lease-manager'; +import {FlowControlOptions, LeaseManager} from '../src/lease-manager'; import {BatchOptions} from '../src/message-queues'; import {MessageStreamOptions} from '../src/message-stream'; import * as s from '../src/subscriber'; import {Subscription} from '../src/subscription'; import {SpanKind} from '@opentelemetry/api'; -import {SemanticAttributes} from '@opentelemetry/semantic-conventions'; import {Duration} from '../src'; +import * as tracing from '../src/telemetry-tracing'; type PullResponse = google.pubsub.v1.IStreamingPullResponse; @@ -64,6 +64,10 @@ class FakeSubscription { pubsub = new FakePubSub(); } +interface PublicInventory { + _inventory: LeaseManager; +} + class FakeHistogram { options?: HistogramOptions; constructor(options?: HistogramOptions) { @@ -756,6 +760,8 @@ describe('Subscriber', () => { }); it('should add messages to the inventory', done => { + const message = new Message(subscriber, RECEIVED_MESSAGE); + subscriber.open(); const modAckStub = sandbox.stub(subscriber, 'modAck'); @@ -766,6 +772,15 @@ describe('Subscriber', () => { const inventory: FakeLeaseManager = stubs.get('inventory'); const addStub = sandbox.stub(inventory, 'add').callsFake(() => { const [addMsg] = addStub.lastCall.args; + + // OTel is enabled during tests, so we need to delete the baggage. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const [addMsgAny, msgAny] = [addMsg as any, message as any]; + delete addMsgAny.parentSpan; + delete addMsgAny.subSpans; + delete msgAny.parentSpan; + delete msgAny.subSpans; + assert.deepStrictEqual(addMsg, message); // test for receipt @@ -850,31 +865,31 @@ describe('Subscriber', () => { it('should not instantiate a tracer when tracing is disabled', () => { subscriber = new Subscriber(subscription, {}); - assert.strictEqual(subscriber['_useOpentelemetry'], false); + assert.strictEqual(subscriber['_useLegacyOpenTelemetry'], false); }); it('should instantiate a tracer when tracing is enabled through constructor', () => { subscriber = new Subscriber(subscription, enableTracing); - assert.ok(subscriber['_useOpentelemetry']); + assert.ok(subscriber['_useLegacyOpenTelemetry']); }); it('should instantiate a tracer when tracing is enabled through setOptions', () => { subscriber = new Subscriber(subscription, {}); subscriber.setOptions(enableTracing); - assert.ok(subscriber['_useOpentelemetry']); + assert.ok(subscriber['_useLegacyOpenTelemetry']); }); it('should disable tracing when tracing is disabled through setOptions', () => { subscriber = new Subscriber(subscription, enableTracing); subscriber.setOptions(disableTracing); - assert.strictEqual(subscriber['_useOpentelemetry'], false); + assert.strictEqual(subscriber['_useLegacyOpenTelemetry'], false); }); it('exports a span once it is created', () => { subscription = new FakeSubscription() as {} as Subscription; subscriber = new Subscriber(subscription, enableTracing); message = new Message(subscriber, RECEIVED_MESSAGE); - assert.strictEqual(subscriber['_useOpentelemetry'], true); + assert.strictEqual(subscriber['_useLegacyOpenTelemetry'], true); subscriber.open(); // Construct mock of received message with span context @@ -900,18 +915,25 @@ describe('Subscriber', () => { receivedMessages: [messageWithSpanContext], }; + const openedSub = subscriber as unknown as PublicInventory; + sandbox.stub(openedSub._inventory, 'add').callsFake((m: s.Message) => { + message = m; + }); + // Receive message and assert that it was exported const msgStream = stubs.get('messageStream'); msgStream.emit('data', pullResponse); + message.endParentSpan(); + const spans = exporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1); - const firstSpan = spans.concat().shift(); + assert.strictEqual(spans.length, 2); + const firstSpan = spans.pop(); assert.ok(firstSpan); assert.strictEqual(firstSpan.parentSpanId, parentSpanContext.spanId); assert.strictEqual( firstSpan.name, - `${subscriber.name} process`, + `${subscriber.name} receive`, 'name of span should match' ); assert.strictEqual( @@ -919,42 +941,27 @@ describe('Subscriber', () => { SpanKind.CONSUMER, 'span kind should be CONSUMER' ); - assert.strictEqual( - firstSpan.attributes[SemanticAttributes.MESSAGING_OPERATION], - 'process', - 'span messaging operation should match' - ); - assert.strictEqual( - firstSpan.attributes[SemanticAttributes.MESSAGING_SYSTEM], - 'pubsub' - ); - assert.strictEqual( - firstSpan.attributes[SemanticAttributes.MESSAGING_MESSAGE_ID], - messageWithSpanContext.message.messageId, - 'span messaging id should match' - ); - assert.strictEqual( - firstSpan.attributes[SemanticAttributes.MESSAGING_DESTINATION], - subscriber.name, - 'span messaging destination should match' - ); - assert.strictEqual( - firstSpan.attributes[SemanticAttributes.MESSAGING_DESTINATION_KIND], - 'topic' - ); }); - it('does not export a span when a span context is not present on message', () => { + it('exports a span even when a span context is not present on message', () => { subscriber = new Subscriber(subscription, enableTracing); + subscriber.open(); const pullResponse: s.PullResponse = { receivedMessages: [RECEIVED_MESSAGE], }; + const openedSub = subscriber as unknown as PublicInventory; + sandbox.stub(openedSub._inventory, 'add').callsFake((m: s.Message) => { + message = m; + }); + // Receive message and assert that it was exported const stream: FakeMessageStream = stubs.get('messageStream'); stream.emit('data', pullResponse); - assert.strictEqual(exporter.getFinishedSpans().length, 0); + + message.endParentSpan(); + assert.strictEqual(exporter.getFinishedSpans().length, 2); }); }); @@ -1165,4 +1172,108 @@ describe('Subscriber', () => { }); }); }); + + describe('SubscriberSpans', () => { + const message: tracing.MessageWithAttributes = { + attributes: {}, + parentSpan: undefined, + }; + const spans = new s.SubscriberSpans(message); + const fakeSpan = { + end() {}, + } as unknown as opentelemetry.Span; + + it('starts a flow span', () => { + const stub = sandbox + .stub(tracing.PubsubSpans, 'createReceiveFlowSpan') + .returns(fakeSpan); + spans.flowStart(); + assert.strictEqual(stub.calledOnce, true); + assert.strictEqual(stub.args[0][0], message); + spans.flowStart(); + assert.strictEqual(stub.calledOnce, true); + }); + + it('ends a flow span', () => { + sandbox + .stub(tracing.PubsubSpans, 'createReceiveFlowSpan') + .returns(fakeSpan); + spans.flowStart(); + const spy = sandbox.spy(fakeSpan, 'end'); + spans.flowEnd(); + assert.strictEqual(spy.calledOnce, true); + spans.flowEnd(); + assert.strictEqual(spy.calledOnce, true); + }); + + it('starts a modAck span', () => { + const stub = sandbox + .stub(tracing.PubsubSpans, 'createModAckSpan') + .returns(fakeSpan); + spans.modAckStart(Duration.from({seconds: 10}), true); + assert.strictEqual(stub.args[0][0], message); + assert.strictEqual(stub.args[0][1].totalOf('second'), 10); + assert.strictEqual(stub.args[0][2], true); + spans.modAckStart(Duration.from({seconds: 20}), false); + assert.strictEqual(stub.calledOnce, true); + }); + + it('ends a modAck span', () => { + sandbox.stub(tracing.PubsubSpans, 'createModAckSpan').returns(fakeSpan); + spans.modAckStart(Duration.from({seconds: 10}), true); + const spy = sandbox.spy(fakeSpan, 'end'); + spans.modAckStop(); + assert.strictEqual(spy.calledOnce, true); + spans.modAckStop(); + assert.strictEqual(spy.calledOnce, true); + }); + + it('starts a scheduler span', () => { + const stub = sandbox + .stub(tracing.PubsubSpans, 'createReceiveSchedulerSpan') + .returns(fakeSpan); + spans.schedulerStart(); + assert.strictEqual(stub.args[0][0], message); + assert.strictEqual(stub.calledOnce, true); + spans.schedulerStart(); + assert.strictEqual(stub.calledOnce, true); + }); + + it('ends a scheduler span', () => { + sandbox + .stub(tracing.PubsubSpans, 'createReceiveSchedulerSpan') + .returns(fakeSpan); + spans.schedulerStart(); + const spy = sandbox.spy(fakeSpan, 'end'); + spans.schedulerEnd(); + assert.strictEqual(spy.calledOnce, true); + spans.schedulerEnd(); + assert.strictEqual(spy.calledOnce, true); + }); + + it('starts a processing span', () => { + const stub = sandbox + .stub(tracing.PubsubSpans, 'createReceiveProcessSpan') + .returns(fakeSpan); + const subName = 'foozle'; + spans.processingStart(subName); + assert.strictEqual(stub.args[0][0], message); + assert.strictEqual(stub.args[0][1], subName); + assert.strictEqual(stub.calledOnce, true); + spans.processingStart('boo'); + assert.strictEqual(stub.calledOnce, true); + }); + + it('ends a processing span', () => { + sandbox + .stub(tracing.PubsubSpans, 'createReceiveSchedulerSpan') + .returns(fakeSpan); + spans.processingStart('foozle'); + const spy = sandbox.spy(fakeSpan, 'end'); + spans.processingEnd(); + assert.strictEqual(spy.calledOnce, true); + spans.processingEnd(); + assert.strictEqual(spy.calledOnce, true); + }); + }); }); diff --git a/test/subscription.ts b/test/subscription.ts index dcf8c0bba..a26643aa7 100644 --- a/test/subscription.ts +++ b/test/subscription.ts @@ -169,7 +169,7 @@ describe('Subscription', () => { assert(subscription.iam instanceof FakeIAM); const args = (subscription.iam as {} as FakeIAM).calledWith_; assert.strictEqual(args[0], PUBSUB); - assert.strictEqual(args[1], subscription.name); + assert.strictEqual(args[1], subscription); }); it('should create a Subscriber', () => { diff --git a/test/telemetry-tracing.ts b/test/telemetry-tracing.ts new file mode 100644 index 000000000..69153957e --- /dev/null +++ b/test/telemetry-tracing.ts @@ -0,0 +1,144 @@ +/*! + * Copyright 2020-2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import {describe, it, beforeEach} from 'mocha'; + +import * as trace from '@opentelemetry/sdk-trace-base'; +import * as otel from '../src/telemetry-tracing'; +import {exporter} from './tracing'; +import {SpanKind} from '@opentelemetry/api'; +import sinon = require('sinon'); +import {PubsubMessage} from '../src/publisher'; + +describe('OpenTelemetryTracer', () => { + beforeEach(() => { + exporter.reset(); + }); + + it('creates a span', () => { + const message: PubsubMessage = {}; + const span = otel.PubsubSpans.createPublisherSpan( + message, + 'test topic' + ) as trace.Span; + span.end(); + + const spans = exporter.getFinishedSpans(); + assert.notStrictEqual(spans.length, 0); + const exportedSpan = spans.concat().pop()!; + + assert.strictEqual(exportedSpan.name, 'test topic send'); + assert.strictEqual(exportedSpan.kind, SpanKind.PRODUCER); + }); + + it('injects a trace context', () => { + const message: PubsubMessage = { + attributes: {}, + }; + const span = otel.PubsubSpans.createPublisherSpan( + message, + 'test topic' + ) as trace.Span; + + otel.injectSpan(span, message, otel.OpenTelemetryLevel.Modern); + + assert.strictEqual( + Object.getOwnPropertyNames(message.attributes).includes( + otel.modernAttributeName + ), + true + ); + }); + + it('injects a trace context and legacy baggage', () => { + const message: PubsubMessage = { + attributes: {}, + }; + const span = otel.PubsubSpans.createPublisherSpan(message, 'test topic'); + + otel.injectSpan(span, message, otel.OpenTelemetryLevel.Legacy); + + assert.strictEqual( + Object.getOwnPropertyNames(message.attributes).includes( + otel.modernAttributeName + ), + true + ); + assert.strictEqual( + Object.getOwnPropertyNames(message.attributes).includes( + otel.legacyAttributeName + ), + true + ); + }); + + it('should issue a warning if OpenTelemetry span context key is set', () => { + const message: PubsubMessage = { + attributes: { + [otel.legacyAttributeName]: 'foobar', + [otel.modernAttributeName]: 'bazbar', + }, + }; + const span = otel.PubsubSpans.createPublisherSpan(message, 'test topic'); + + const warnSpy = sinon.spy(console, 'warn'); + try { + otel.injectSpan(span, message, otel.OpenTelemetryLevel.Legacy); + assert.strictEqual(warnSpy.callCount, 2); + } finally { + warnSpy.restore(); + } + }); + + it('should be able to determine if attributes are present', () => { + let message: otel.MessageWithAttributes = { + attributes: { + [otel.legacyAttributeName]: 'foobar', + }, + }; + assert.strictEqual(otel.containsSpanContext(message), true); + + message = { + attributes: { + [otel.modernAttributeName]: 'foobar', + }, + }; + assert.strictEqual(otel.containsSpanContext(message), true); + + message = {}; + assert.strictEqual(otel.containsSpanContext(message), false); + }); + + it('extracts a trace context', () => { + const message = { + attributes: { + [otel.modernAttributeName]: + '00-d4cda95b652f4a1592b449d5929fda1b-553964cd9101a314-01', + }, + }; + + const childSpan = otel.extractSpan( + message, + 'test sub', + otel.OpenTelemetryLevel.Modern + ); + assert.strictEqual( + childSpan!.spanContext().traceId, + 'd4cda95b652f4a1592b449d5929fda1b' + ); + }); +}); diff --git a/test/topic.ts b/test/topic.ts index 1179c718b..bdbb7c808 100644 --- a/test/topic.ts +++ b/test/topic.ts @@ -188,7 +188,7 @@ describe('Topic', () => { }); it('should create an iam object', () => { - assert.deepStrictEqual(topic.iam.calledWith_, [PUBSUB, TOPIC_NAME]); + assert.deepStrictEqual(topic.iam.calledWith_, [PUBSUB, topic]); }); }); diff --git a/test/tracing.ts b/test/tracing.ts index 8b1b31146..7689253ad 100644 --- a/test/tracing.ts +++ b/test/tracing.ts @@ -18,7 +18,7 @@ import { BasicTracerProvider, InMemorySpanExporter, SimpleSpanProcessor, -} from '@opentelemetry/tracing'; +} from '@opentelemetry/sdk-trace-base'; /** * This file is used to initialise a global tracing provider and span exporter