Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Opentelemetry integration #1078

Merged
merged 36 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
af0a95c
Add opentelemetry tracing
sethmaxwl Jul 27, 2020
73835fb
build: rename _toc to toc (#1066)
yoshi-automation Jul 23, 2020
ae1ae81
build: move gitattributes files to node templates (#1070)
yoshi-automation Jul 24, 2020
f8b6602
Add opentelemetry instrumentation
sethmaxwl Aug 4, 2020
c64b1ce
Merge branch 'master' of https://github.com/googleapis/nodejs-pubsub …
sethmaxwl Aug 4, 2020
a8ede4b
Add create span test
sethmaxwl Aug 5, 2020
aec8a5c
Refactor tracing
sethmaxwl Aug 6, 2020
dbe8cdf
Add publisher key test
sethmaxwl Aug 7, 2020
d8ebcf7
Fix linting issues
sethmaxwl Aug 7, 2020
16b9aaa
Add docs
sethmaxwl Aug 7, 2020
e93dceb
Add example for opentelemetry
sethmaxwl Aug 7, 2020
fe14163
Add tracing example
sethmaxwl Aug 7, 2020
11478a7
Update headers
sethmaxwl Aug 7, 2020
3cb49e9
Merge branch 'master' into opentelemetry-integration
bcoe Aug 8, 2020
4c4482c
Merge branch 'master' into opentelemetry-integration
sethmaxwl Aug 10, 2020
dd42a6a
Add microsoft api documenter
sethmaxwl Aug 10, 2020
59b6f62
Merge branch 'master' into opentelemetry-integration
feywind Aug 10, 2020
78ce7f6
Fix linting in samples/package.json
sethmaxwl Aug 10, 2020
99a1fe1
Merge branch 'opentelemetry-integration' of https://github.com/sethma…
sethmaxwl Aug 10, 2020
5ccd513
Merge branch 'master' into opentelemetry-integration
sethmaxwl Aug 10, 2020
39ec18a
Merge branch 'master' into opentelemetry-integration
sethmaxwl Aug 11, 2020
0ad9443
Add optional tracing
sethmaxwl Aug 12, 2020
d4996b9
Merge branch 'opentelemetry-integration' of https://github.com/sethma…
sethmaxwl Aug 12, 2020
0606f83
Fix linting issues
sethmaxwl Aug 12, 2020
c9cd570
Re-add api-documenter
sethmaxwl Aug 12, 2020
c974091
Update package.json
sethmaxwl Aug 12, 2020
e343c52
Update package.json
sethmaxwl Aug 12, 2020
00c0980
Update package.json
sethmaxwl Aug 12, 2020
eccd6eb
Fix docs
sethmaxwl Aug 12, 2020
92c3a09
Merge branch 'opentelemetry-integration' of https://github.com/sethma…
sethmaxwl Aug 12, 2020
f7369e8
Merge branch 'master' into opentelemetry-integration
feywind Aug 12, 2020
c7cc67d
Add more unit tests
sethmaxwl Aug 13, 2020
af46e32
Merge branch 'opentelemetry-integration' of https://github.com/sethma…
sethmaxwl Aug 13, 2020
5fa6274
Fix linting
sethmaxwl Aug 13, 2020
ce35b88
Add disable tracing tests
sethmaxwl Aug 13, 2020
d5ac567
Update opentelemetryTracing sample
sethmaxwl Aug 13, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@google-cloud/pubsub",
"description": "Cloud Pub/Sub Client Library for Node.js",
"version": "2.3.0",
"version": "2.2.0",
"license": "Apache-2.0",
"author": "Google Inc.",
"engines": {
Expand Down Expand Up @@ -44,15 +44,15 @@
"predocs-test": "npm run docs",
"benchwrapper": "node bin/benchwrapper.js",
"prelint": "cd samples; npm link ../; npm install",
"precompile": "gts clean",
"api-extractor": "api-extractor run --local",
"api-documenter": "api-documenter yaml --input-folder=temp"
"precompile": "gts clean"
},
"dependencies": {
"@google-cloud/paginator": "^3.0.0",
"@google-cloud/precise-date": "^2.0.0",
"@google-cloud/projectify": "^2.0.0",
"@google-cloud/promisify": "^2.0.0",
"@opentelemetry/api": "^0.9.0",
"@opentelemetry/tracing": "^0.9.0",
"@types/duplexify": "^3.6.0",
"@types/long": "^4.0.0",
"arrify": "^2.0.0",
Expand Down Expand Up @@ -97,8 +97,6 @@
"uuid": "^8.0.0",
"webpack": "^4.42.0",
"webpack-cli": "^3.3.11",
"yargs": "^15.0.0",
"@microsoft/api-documenter": "^7.8.10",
sethmaxwl marked this conversation as resolved.
Show resolved Hide resolved
"@microsoft/api-extractor": "^7.8.10"
"yargs": "^15.0.0"
}
}
97 changes: 97 additions & 0 deletions samples/opentelemetryTracing.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*!
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* This sample demonstrates how to add OpenTelemetry tracing to the
* Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

'use strict';

// sample-metadata:
// title: OpenTelemetry Tracing
// description: Demonstrates how to enable OpenTelemetry tracing in
// a publisher or subscriber.
// usage: node opentelemetryTracing.js <topic-name> <subscription-name>

const SUBSCRIBER_TIMEOUT = 10;

function main(
topicName = 'YOUR_TOPIC_NAME',
subscriptionName = 'YOUR_SUBSCRIPTION_NAME',
data = {foo: 'bar'}
) {
// [START opentelemetry_tracing]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicName = 'my-topic';
// const subscriptionName = 'my-subscription';
// const data = 'Hello, world!";

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Imports the OpenTelemetry API
const {opentelemetry} = require('@opentelemetry/api');

// Imports the OpenTelemetry span handlers and exporter
const {
SimpleSpanProcessor,
BasicTracerProvider,
ConsoleSpanExporter,
} = require('@opentelemetry/tracing');

// Set up span processing and specify the console as the span exporter
const provider = new BasicTracerProvider();
const exporter = new ConsoleSpanExporter();
provider.addSpanProcessor(new SimpleSpanProcessor(exporter));

provider.register();
opentelemetry.trace.setGlobalTracerProvider(provider);

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessage() {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

const messageId = await pubSubClient.topic(topicName).publish(dataBuffer);
console.log(`Message ${messageId} published.`);
}

async function subscriptionListen() {
// Message handler for subscriber
const messageHandler = message => {
console.log(`Message ${message.id} received.`);
message.ack();
};

// Listens for new messages from the topic
pubSubClient.subscription(subscriptionName).on('message', messageHandler);
setTimeout(() => {
pubSubClient.subscription(subscriptionName).removeAllListeners();
}, SUBSCRIBER_TIMEOUT * 1000);
}

publishMessage().then(subscriptionListen());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is missing a doc end tag, though it may not matter at this point if nothing in the docsite is expecting it to be here. (We would need to canonicalize the the tag name anyway.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to include a link to this example in the blog post. How should we canonicalize the tag name?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me see if I can find the right person to ask... there were some changes in that area lately.

}

main(...process.argv.slice(2));
4 changes: 3 additions & 1 deletion samples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
"test": "mocha system-test --timeout 600000"
},
"dependencies": {
"@google-cloud/pubsub": "^2.3.0"
"@google-cloud/pubsub": "^2.3.0",
"@opentelemetry/api": "^0.10.2",
sethmaxwl marked this conversation as resolved.
Show resolved Hide resolved
"@opentelemetry/tracing": "^0.10.2"
sethmaxwl marked this conversation as resolved.
Show resolved Hide resolved
},
"devDependencies": {
"chai": "^4.2.0",
Expand Down
43 changes: 43 additions & 0 deletions src/opentelemetry-tracing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*!
* Copyright 2020 Google LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {Attributes, SpanContext, Span, trace} from '@opentelemetry/api';
import {Tracer} from '@opentelemetry/tracing';

/**
* Wrapper for creating OpenTelemetry Spans
*
* @class
*/
export class OpenTelemetryTracer {
/**
* Creates a new span with the given properties
*
* @param {string} spanName the name for the span
* @param {Attributes?} attributes an object containing the attributes to be set for the span
* @param {SpanContext?} parent the context of the parent span to link to the span
*/
createSpan(
spanName: string,
attributes?: Attributes,
parent?: SpanContext
): Span {
const tracerProvider: Tracer = trace.getTracer('default') as Tracer;
return tracerProvider.startSpan(spanName, {
parent: parent,
attributes: attributes,
});
}
}
29 changes: 29 additions & 0 deletions src/publisher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import {promisify, promisifyAll} from '@google-cloud/promisify';
import * as extend from 'extend';
import {CallOptions} from 'google-gax';
import {Span} from '@opentelemetry/api';

import {BatchPublishOptions} from './message-batch';
import {Queue, OrderedQueue} from './message-queues';
import {Topic} from '../topic';
import {RequestCallback, EmptyCallback} from '../pubsub';
import {google} from '../../protos/protos';
import {defaultOptions} from '../default-options';
import {OpenTelemetryTracer} from '../opentelemetry-tracing';

export type PubsubMessage = google.pubsub.v1.IPubsubMessage;

Expand Down Expand Up @@ -60,11 +62,13 @@ export class Publisher {
settings!: PublishOptions;
queue: Queue;
orderedQueues: Map<string, OrderedQueue>;
tracing: OpenTelemetryTracer;
constructor(topic: Topic, options?: PublishOptions) {
this.setOptions(options);
this.topic = topic;
this.queue = new Queue(this);
this.orderedQueues = new Map();
this.tracing = new OpenTelemetryTracer();
sethmaxwl marked this conversation as resolved.
Show resolved Hide resolved
}

flush(): Promise<void>;
Expand Down Expand Up @@ -136,7 +140,29 @@ export class Publisher {
* @param {PublishCallback} [callback] Callback function.
*/
publishMessage(message: PubsubMessage, callback: PublishCallback): void {
// Construct publisher span and set context as message attribute
const {data, attributes = {}} = message;
const spanAttributes = {
sethmaxwl marked this conversation as resolved.
Show resolved Hide resolved
data: message.data,
};
const span: Span = this.tracing.createSpan(
`${this.topic.name} publisher`,
spanAttributes
);
if (
message.attributes &&
message.attributes['googclient_OpenTelemetrySpanContext']
) {
console.warn(
'googclient_OpenTelemetrySpanContext key set as message attribute, but will be overridden.'
);
}
if (!message.attributes) {
message.attributes = {};
}
message.attributes['googclient_OpenTelemetrySpanContext'] = JSON.stringify(
span.context()
);

if (!(data instanceof Buffer)) {
throw new TypeError('Data must be in the form of a Buffer.');
Expand All @@ -152,6 +178,7 @@ export class Publisher {

if (!message.orderingKey) {
this.queue.add(message, callback);
span.end();
return;
}

Expand All @@ -165,6 +192,8 @@ export class Publisher {

const queue = this.orderedQueues.get(key)!;
queue.add(message, callback);

span.end();
}
/**
* Indicates to the publisher that it is safe to continue publishing for the
Expand Down
27 changes: 27 additions & 0 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {DateStruct, PreciseDate} from '@google-cloud/precise-date';
import {replaceProjectIdToken} from '@google-cloud/projectify';
import {promisify} from '@google-cloud/promisify';
import {EventEmitter} from 'events';
import {SpanContext} from '@opentelemetry/api';

import {google} from '../protos/protos';
import {Histogram} from './histogram';
Expand All @@ -27,6 +28,7 @@ import {MessageStream, MessageStreamOptions} from './message-stream';
import {Subscription} from './subscription';
import {defaultOptions} from './default-options';
import {SubscriberClient} from './v1';
import {OpenTelemetryTracer} from './opentelemetry-tracing';

export type PullResponse = google.pubsub.v1.IPullResponse;

Expand Down Expand Up @@ -237,6 +239,7 @@ export class Subscriber extends EventEmitter {
private _options!: SubscriberOptions;
private _stream!: MessageStream;
private _subscription: Subscription;
private _tracing: OpenTelemetryTracer;
constructor(subscription: Subscription, options = {}) {
super();

Expand All @@ -248,6 +251,7 @@ export class Subscriber extends EventEmitter {
this._histogram = new Histogram({min: 10, max: 600});
this._latencies = new Histogram();
this._subscription = subscription;
this._tracing = new OpenTelemetryTracer();

this.setOptions(options);
}
Expand Down Expand Up @@ -445,12 +449,35 @@ export class Subscriber extends EventEmitter {
for (const data of receivedMessages!) {
const message = new Message(this, data);

// Create a new OpenTelemetry span with the publisher span set as the parent
const spanValue = message.attributes
sethmaxwl marked this conversation as resolved.
Show resolved Hide resolved
? message.attributes['googclient_OpenTelemetrySpanContext']
: null;

const parentSpanContext: SpanContext | null = spanValue
? JSON.parse(spanValue)
: null;

const spanAttributes = {
ackId: data.ackId,
deliveryAttempt: data.deliveryAttempt,
};
const span = parentSpanContext
? this._tracing.createSpan(
this._name,
spanAttributes,
parentSpanContext
)
: null;
if (this.isOpen) {
message.modAck(this.ackDeadline);
this._inventory.add(message);
} else {
message.nack();
}
if (span) {
span.end();
}
}
}

Expand Down
18 changes: 8 additions & 10 deletions src/v1/publisher_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,8 @@ export class PublisherClient {
>
): void;
/**
* Creates the given topic with the given name. See the
* <a href="https://cloud.google.com/pubsub/docs/admin#resource_names">
* resource name rules</a>.
* Creates the given topic with the given name. See the [resource name rules](
* https://cloud.google.com/pubsub/docs/admin#resource_names).
*
* @param {Object} request
* The request object that will be sent.
Expand All @@ -404,8 +403,8 @@ export class PublisherClient {
* signs (`%`). It must be between 3 and 255 characters in length, and it
* must not start with `"goog"`.
* @param {number[]} request.labels
* See <a href="https://cloud.google.com/pubsub/docs/labels"> Creating and
* managing labels</a>.
* See [Creating and managing labels]
* (https://cloud.google.com/pubsub/docs/labels).
* @param {google.pubsub.v1.MessageStoragePolicy} request.messageStoragePolicy
* Policy constraining the set of Google Cloud Platform regions where messages
* published to the topic may be stored. If not present, then no constraints
Expand Down Expand Up @@ -1312,11 +1311,10 @@ export class PublisherClient {
): void;
/**
* Lists the names of the snapshots on this topic. Snapshots are used in
* <a href="https://cloud.google.com/pubsub/docs/replay-overview">Seek</a>
* operations, which allow
* you to manage message acknowledgments in bulk. That is, you can set the
* acknowledgment state of messages in an existing subscription to the state
* captured by a snapshot.
* [Seek](https://cloud.google.com/pubsub/docs/replay-overview) operations,
* which allow you to manage message acknowledgments in bulk. That is, you can
* set the acknowledgment state of messages in an existing subscription to the
* state captured by a snapshot.
*
* @param {Object} request
* The request object that will be sent.
Expand Down
Loading