Skip to content

Commit

Permalink
Adding autoCreate option to Topic constructor and reuseExisting optio…
Browse files Browse the repository at this point in the history
…n to subscribe.
  • Loading branch information
Jon Wayne Parrott authored and ryanseys committed Mar 27, 2015
1 parent 7077411 commit 3f8f60f
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 5 deletions.
11 changes: 9 additions & 2 deletions lib/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -196,18 +196,25 @@ PubSub.prototype.createTopic = function(name, callback) {
* @throws {Error} If a name is not provided.
*
* @param {string} name - The name of the topic.
* @param {object=} options - Configuration object.
* @param {boolean=} options.autoCreate - Automatically create topic if it
* doesn't exist. Note that messages published to a topic with no
* subscribers will not be delivered.
* @return {module:pubsub/topic}
*
* @example
* var topic = pubsub.topic('my-existing-topic');
* var topic = pubsub.topic('topic-that-maybe-exists', { autoCreate: true });
* topic.publish('New message!');
*/
PubSub.prototype.topic = function(name) {
PubSub.prototype.topic = function(name, options) {
if (!name) {
throw new Error('A name must be specified for a new topic.');
}
options = options || {};
return new Topic(this, {
name: name
name: name,
autoCreate: options.autoCreate
});
};

Expand Down
46 changes: 43 additions & 3 deletions lib/pubsub/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ function Topic(pubsub, options) {
this.name = Topic.formatName_(pubsub.projectId, options.name);
this.projectId = pubsub.projectId;
this.pubsub = pubsub;

if (options.autoCreate) {
this.unformattedName = options.name;
this.origMakeReq_ = this.makeReq_;
this.makeReq_ = this.autoCreateWrapper_;
}
}

/**
Expand Down Expand Up @@ -95,6 +101,34 @@ Topic.formatName_ = function(projectId, name) {
return 'projects/' + projectId + '/topics/' + name;
};

/**
* Wrapper for makeReq_ that automatically attempts to create a topic if it
* does not yet exist.
*
* @private
*/
Topic.prototype.autoCreateWrapper_ = function(method, path, q, body, callback) {
var self = this;

function createAndRetry() {
self.pubsub.createTopic(self.unformattedName, function(err) {
if (err) {
callback(err);
return;
}
self.origMakeReq_(method, path, q, body, callback);
});
}

this.origMakeReq_(method, path, q, body, function(err, res) {
if (err && err.code === 404 && method !== 'DELETE') {
createAndRetry();
} else {
callback(err, res);
}
});
};

/**
* Publish the provided message or array of messages. A message can be of any
* type. On success, an array of messageIds is returned in the response.
Expand Down Expand Up @@ -243,6 +277,10 @@ Topic.prototype.getSubscriptions = function(query, callback) {
* once it's pulled. (default: false)
* @param {number=} options.interval - Interval in milliseconds to check for new
* messages. (default: 10)
* @param {boolean=} options.reuseExisting - If the subscription already exists,
* reuse it. The options of the existing subscription are not changed. If
* false, attempting to create a subscription that already exists will fail.
* (default: false)
* @param {function} callback - The callback function.
*
* @example
Expand Down Expand Up @@ -276,11 +314,13 @@ Topic.prototype.subscribe = function(name, options, callback) {

var path = Subscription.formatName_(this.projectId, name);
this.makeReq_('PUT', path, null, body, function(err) {
if (err) {
if (options.reuseExisting && err && err.code === 409) {
callback(null, self.subscription(name, options));
} else if (err) {
callback(err);
return;
} else {
callback(null, self.subscription(name, options));
}
callback(null, self.subscription(name, options));
});
};

Expand Down
53 changes: 53 additions & 0 deletions test/pubsub/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,43 @@ describe('Topic', function() {
});
});

describe('publish to non-existing topic', function() {
var messageObject = { data: 'howdy' };

it('should generate 404 error without autoCreate', function(done) {
topic.makeReq_ = function(method, path, query, body, callback) {
callback({ code: 404 });
};

topic.publish(messageObject, function(err) {
assert.equal(err.code, 404);
done();
});
});

it('should publish successfully with autoCreate', function(done) {
var acTopic = new Topic(pubsubMock, {
name: TOPIC_NAME, autoCreate: true
});
var created = false;

acTopic.origMakeReq_ = function(method, path, query, body, callback) {
if (!created) {
callback({ code: 404 });
} else {
callback(null);
}
};

pubsubMock.createTopic = function(name, callback) {
created = true;
callback();
};

acTopic.publish(messageObject, done);
});
});

describe('delete', function() {
it('should delete a topic', function(done) {
topic.makeReq_ = function(method, path) {
Expand Down Expand Up @@ -276,6 +313,22 @@ describe('Topic', function() {
};
topic.subscribe(SUB_NAME, CONFIG, assert.ifError);
});

it('should re-use existing subscription if specified', function(done) {
topic.subscription = function() {
done();
};

topic.makeReq_ = function(method, path, qs, body, callback) {
callback({ code: 409 });
};

topic.subscribe(SUB_NAME, function(err) {
assert.equal(err.code, 409);
});

topic.subscribe(SUB_NAME, { reuseExisting: true }, assert.ifError);
});
});

describe('subscription', function() {
Expand Down

0 comments on commit 3f8f60f

Please sign in to comment.